大厂中Flink SQL开发流程及模板
Flink SQL开发链路全流程
先确定源,一般都是Mysql到kafka/mq,通过读binlog获取,也可以直接读后端提供kafka/mq,其次是流表,流表大多数都在中间件中存放,最后落地结果时可落starrocks/doris中。
流程如下:
流表创建
首先需要先创建流表的库就是流表kafka、olap的ods、dwd、dws、ads方便后续操作,再创建配置流表。
实时开发存在3版块内容,1flink sql实现实时sql方式查询,2.实时jar包上传,3.flink cdc来接入来源库
数据源CDC接入
方法1:Binlog获取,DTS配置,生成ODS流表
这里任务名和ODS任务保持一致即可,配置好MQ信息,及数据源(这里数据源指的是mysql实例信息,即连接串)
创建任务后,再进行Topic的订阅,即可把Mysql Binlog同步到MQ
方法2:使用CDC接生成ODS流表
实时表命名
dwd_一级域_二级域_ri
ads_一级主题域_二级主题域_ri
Flink SQL开发
选择flink sql模板创建flink sql任务
由于我们之前已经创建过流表如果需要把流表数据全部导入可以直接使用无代码模式
如果想写flink sql也可以单独去写
配置完flink参数可上线发布
实时运维
实时运维和任务运维一样可以看到当前实时任务在线上运行,具体细节需要flink webui定位,后续单独成一个板块讲。
DWD代码模板
以订单表,主订单为例,任务为dwd_trade_order_detail_ri,明细数据落kafka流表及hive中
--创建Source表 CREATE TABLE ods_trade_trade_order_ri ( `message` varchar ) WITH ( 'connector' = 'kafka', 'topic' = 'YSHOPPING_TRADE_ORDER_CONNECTOR', 'properties.bootstrap.servers' = 'xxx:9092,xxxx:9092', 'properties.group.id' = 'GID_YSHOPPING_TRADE_ORDER_CONNECTOR', 'scan.topic-partition-discovery.interval' = '20s', 'format' = 'raw', 'json.fail-on-missing-field' = 'false', 'scan.startup.mode'='timestamp'--指定开始时间 'scan.startup.timestamp-millis'='1706630400000'--指定获取数据的开始时间 ); --创建sink数据表 create table dwd_trade_order_detail_ri ( order_id varchar, --主订单号 order_status_code int, --主订单状态code order_status_name varchar, --主订单状态name product_amt int , --商品总额分 freight_amt int , --运费总额分 buyer_id varchar, create_time datetime, modify_time datetime ) WITH ( -- 这里接收ods数据就不用再写消费者组了 'connector' = 'kafka', 'topic' = 'YSHOPPING_TRADE_ORDER_CONNECTOR', 'properties.bootstrap.servers' = 'xxx:9092,xxxx:9092', 'format' = 'raw' ); create table dwd_trade_order_detail_2hive_ri ( --先在hive建好表 order_id varchar, --主订单号 order_status_code int, --主订单状态code order_status_name varchar, --主订单状态name product_amt int , --商品总额分 freight_amt int , --运费总额分 buyer_id varchar, create_time varchar, modify_time varchar ) WITH ( 'path' = ' hdfs://cluster100/user/bdms/hive_db/yx_dwd.db/dwd_trade_order_detail_ri/ds=2024-10-22', 'krb.keytab' = 'sloth.keytab', 'krb.principal' = **********', 'part.size' = '267386880', 'rollover.interval' = '900000', 'format' = 'parquet', 'compression' = 'none', 'partition.keys' = 'date', 'connector.type'='filesystem', 'connector.property-version'='1', 'update-mode'='append', 'hdfs-site'='hdfs-site.xml', 'core-site'='core-site.xml', 'krb.conf'='krb5.conf', 'is.related.mammunt'='false', 'part.prefix'='ndi-128', 'part.suffix'='-success', 'inactivity.interval'='300000' ); INSERT INTO dwd_trade_order_detail_ri SELECT COALESCE(json_value(message,'$.orderId'),'') as order_id ,cast(json_value(message,'$.orderStatusCode') as int) as order_status_code ,case when json_value(message,'$.orderStatusCode')='1' then '未付款' when json_value(message,'$.orderStatusCode')='2' then '已付款' when json_value(message,'$.orderStatusCode')='3' then '待收货' when json_value(message,'$.orderStatusCode')='4' then '已收货' when json_value(message,'$.orderStatusCode')='5' then '已完结' when json_value(message,'$.orderStatusCode')='6' then '退货' end as order_status_name ,COALESCE(cast(json_value(message,'$.productAmt') as int),0) as product_amt ,COALESCE(json_value(message,'$.freightAmt'),0) as freight_amt ,COALESCE(json_value(message,'$.buyerId'),0) as buyer_id ,TO_TIMESTAMP(json_value(message,'$.data'),'yyyy-MM-dd HH:mm:ss') as create_time ,TO_TIMESTAMP(json_value(message,'$.data'),'yyyy-MM-dd HH:mm:ss') as modify_time ,from_unixtime(cast(substring(JSON_VALUE (CAST (`message` as VARCHAR), '$.ts'),1,10) as bigint)) ts from ( select split_index(message,'orderLog:',1) as message from ods_trade_trade_order_ri where message like '%xxxxx%' ) lateral table ( json_array_to_map(message, 'data') ) as t (data) where is_del=0 ; INSERT INTO dwd_trade_order_detail_2hive_ri select order_id ,order_status_code ,order_status_name ,product_amt ,freight_amt ,buyer_id ,cast(create_time as varchar) as create_time ,cast(modify_time as varchar) as modify_time from dwd_trade_order_detail_ri