Flink数据分流&Flink与Kafka的连续读写交互

Flink的旁路输出特性可以用来对数据进行分流,通过是通过创建一个流的标签(OutputTag),再利用这个OutputTag标签对象作为参数,调用初始/父级数据流的getSideOutput(OutputTag)方法获取子数据流。

由于每个流标签都有一个id,因此不需要创建对象,只要流标签的id相同,其中的数据就相同。因此可以通过匿名内部类的形式来获取子数据流。

例:对初始字符串进行数据分类,将字母、数字、符号分发到不同的子数据流中进行处理。

public class Producer {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //配置kafka信息
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
        //创建初始数据
        DataStreamSource<String> source = env.fromElements("n1!u2m%3$b@*4e5r");
        //发送初始数据到kafka
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("test", new SimpleStringSchema(), properties);
        source.addSink(producer);
        //执行程序
        env.execute();
    }
}



public class Consumer {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //配置kafka信息
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
        //获取初始数据
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        DataStreamSource<String> source = env.addSource(consumer);
        //对初始数据进行分发,此处用匿名内部类实现,也可以单独创建类写更复杂的逻辑
        SingleOutputStreamOperator<String> process = source.process(new ProcessFunction<String, String>() {
            @Override
            public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) {
                StringBuilder num = new StringBuilder();
                StringBuilder word = new StringBuilder();
                StringBuilder other = new StringBuilder();
                for (char c : s.toCharArray()) {
                    if (c >= '0' && c <= '9') {
                        num.append(c);
                    } else if (c >= 'a' && c <= 'z') {
                        word.append(c);
                    } else {
                        other.append(c);
                    }
                }
                //数字发送到id为num的流标签
                context.output(new OutputTag<>("num", Types.STRING), num.toString());
                //字母发送到id为word的流标签
                context.output(new OutputTag<>("word", Types.STRING), word.toString());
                //字符发送到id为other的流标签
                context.output(new OutputTag<>("other", Types.STRING), other.toString());
            }
        });
        process.getSideOutput(new OutputTag<>("num", Types.STRING)).print("这是数字:");
        process.getSideOutput(new OutputTag<>("word", Types.STRING)).print("这是字母:");
        process.getSideOutput(new OutputTag<>("other", Types.STRING)).print("这是字符:");
        //执行程序
        env.execute();
    }
}

先启动Consumer,再启动Producer即可。控制台输出:

这是数字::7> 12345

这是字母::7> number

这是字符::7> !%$@*

例:对Kafka的初始股票字符串进行解析处理,再发送、读取到其他主题。

public class Producer {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //配置kafka信息
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
        //创建初始数据
        DataStreamSource<String> source = env.fromElements("name:五粮液,code:000858,price:172");
        //发送初始数据到kafka
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("test", new SimpleStringSchema(), properties);
        source.addSink(producer);
        //执行程序
        env.execute();
    }
}



public class Consumer {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //配置kafka信息
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
        //获取初始数据
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        DataStreamSource<String> source = env.addSource(consumer);
        //把股票信息转换为Stock实体类
        SingleOutputStreamOperator<String> process = source.process(new ProcessFunction<String, String>() {
            @Override
            public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) {
                Stock stock = new Stock();
                for (String info : s.split(",")) {
                        String[] split = info.split(":");
                        switch (split[0]) {
                            case "name":
                                stock.setName(split[1]);
                                break;
                            case "code":
                                stock.setCode(split[1]);
                                break;
                            case "price":
                                stock.setPrice(Integer.parseInt(split[1]));
                                break;
                        }
                }
                collector.collect(stock.toString());
            }
        });
        //把Stock信息发送到下一个Kafka主题
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("stock", new SimpleStringSchema(), properties);
        process.addSink(producer);
        //从上一个主题获取stock信息
        FlinkKafkaConsumer<String> consumer_1 = new FlinkKafkaConsumer<>("stock", new SimpleStringSchema(), properties);
        env.addSource(consumer_1).print("输出股票信息:");
        //执行程序
        env.execute();
    }

    static class Stock {
        private String name;
        private String code;
        private Integer price;

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getCode() {
            return code;
        }

        @Override
        public String toString() {
            return "Stock{" +
                    "name='" + name + '\'' +
                    ", code='" + code + '\'' +
                    ", price=" + price +
                    '}';
        }

        public void setCode(String code) {
            this.code = code;
        }

        public Integer getPrice() {
            return price;
        }

        public void setPrice(Integer price) {
            this.price = price;
        }
    }
}

先启动Consumer,再启动Producer即可。控制台输出:

输出股票信息::3> Stock{name='五粮液', code='000858', price=172}


#三八节公司福利##学习路径#
全部评论
Java也得写flink了嘛
点赞 回复 分享
发布于 2022-03-10 20:47
需要考软考证书吗?
点赞 回复 分享
发布于 2022-03-29 13:07

相关推荐

什么时候才能有offer啊_:十年前我还在刺激战场研究跳伞的底层原理呢
投递牛客等公司
点赞 评论 收藏
分享
01-15 17:34
保定学院 Java
数学转码崽:学历没优势就得卷项目和实习啊,但是我看了一下你这个项目,什么雪花算法,搜索引擎,Docker,minio这些都属于通用的东西啊,根本不算亮点,没有任何业务相关性。 还有第二个看到统一鉴权,分片上传估计面试官都不想看了。连我一个偶尔刷刷牛客简历的都看多了,面试官估计早都看吐了。。。 秋招结束了,就尽量找找中小厂吧,毕竟你现在转行已经没时间了,高低有一段实习经历
点赞 评论 收藏
分享
评论
2
3
分享

创作者周榜

更多
牛客网
牛客企业服务