Flink数据分流&Flink与Kafka的连续读写交互
Flink的旁路输出特性可以用来对数据进行分流,通过是通过创建一个流的标签(OutputTag),再利用这个OutputTag标签对象作为参数,调用初始/父级数据流的getSideOutput(OutputTag)方法获取子数据流。
由于每个流标签都有一个id,因此不需要创建对象,只要流标签的id相同,其中的数据就相同。因此可以通过匿名内部类的形式来获取子数据流。
例:对初始字符串进行数据分类,将字母、数字、符号分发到不同的子数据流中进行处理。
public class Producer { public static void main(String[] args) throws Exception { //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //配置kafka信息 Properties properties = new Properties(); properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092"); //创建初始数据 DataStreamSource<String> source = env.fromElements("n1!u2m%3$b@*4e5r"); //发送初始数据到kafka FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("test", new SimpleStringSchema(), properties); source.addSink(producer); //执行程序 env.execute(); } } public class Consumer { public static void main(String[] args) throws Exception { //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //配置kafka信息 Properties properties = new Properties(); properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092"); //获取初始数据 FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); DataStreamSource<String> source = env.addSource(consumer); //对初始数据进行分发,此处用匿名内部类实现,也可以单独创建类写更复杂的逻辑 SingleOutputStreamOperator<String> process = source.process(new ProcessFunction<String, String>() { @Override public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) { StringBuilder num = new StringBuilder(); StringBuilder word = new StringBuilder(); StringBuilder other = new StringBuilder(); for (char c : s.toCharArray()) { if (c >= '0' && c <= '9') { num.append(c); } else if (c >= 'a' && c <= 'z') { word.append(c); } else { other.append(c); } } //数字发送到id为num的流标签 context.output(new OutputTag<>("num", Types.STRING), num.toString()); //字母发送到id为word的流标签 context.output(new OutputTag<>("word", Types.STRING), word.toString()); //字符发送到id为other的流标签 context.output(new OutputTag<>("other", Types.STRING), other.toString()); } }); process.getSideOutput(new OutputTag<>("num", Types.STRING)).print("这是数字:"); process.getSideOutput(new OutputTag<>("word", Types.STRING)).print("这是字母:"); process.getSideOutput(new OutputTag<>("other", Types.STRING)).print("这是字符:"); //执行程序 env.execute(); } }
先启动Consumer,再启动Producer即可。控制台输出:
这是数字::7> 12345
这是字母::7> number
这是字符::7> !%$@*
例:对Kafka的初始股票字符串进行解析处理,再发送、读取到其他主题。
public class Producer { public static void main(String[] args) throws Exception { //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //配置kafka信息 Properties properties = new Properties(); properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092"); //创建初始数据 DataStreamSource<String> source = env.fromElements("name:五粮液,code:000858,price:172"); //发送初始数据到kafka FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("test", new SimpleStringSchema(), properties); source.addSink(producer); //执行程序 env.execute(); } } public class Consumer { public static void main(String[] args) throws Exception { //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //配置kafka信息 Properties properties = new Properties(); properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092"); //获取初始数据 FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); DataStreamSource<String> source = env.addSource(consumer); //把股票信息转换为Stock实体类 SingleOutputStreamOperator<String> process = source.process(new ProcessFunction<String, String>() { @Override public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) { Stock stock = new Stock(); for (String info : s.split(",")) { String[] split = info.split(":"); switch (split[0]) { case "name": stock.setName(split[1]); break; case "code": stock.setCode(split[1]); break; case "price": stock.setPrice(Integer.parseInt(split[1])); break; } } collector.collect(stock.toString()); } }); //把Stock信息发送到下一个Kafka主题 FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("stock", new SimpleStringSchema(), properties); process.addSink(producer); //从上一个主题获取stock信息 FlinkKafkaConsumer<String> consumer_1 = new FlinkKafkaConsumer<>("stock", new SimpleStringSchema(), properties); env.addSource(consumer_1).print("输出股票信息:"); //执行程序 env.execute(); } static class Stock { private String name; private String code; private Integer price; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getCode() { return code; } @Override public String toString() { return "Stock{" + "name='" + name + '\'' + ", code='" + code + '\'' + ", price=" + price + '}'; } public void setCode(String code) { this.code = code; } public Integer getPrice() { return price; } public void setPrice(Integer price) { this.price = price; } } }
先启动Consumer,再启动Producer即可。控制台输出:
输出股票信息::3> Stock{name='五粮液', code='000858', price=172}
#三八节公司福利##学习路径#