大厂中Flink SQL开发流程及模板

Flink SQL开发链路全流程

先确定源,一般都是Mysql到kafka/mq,通过读binlog获取,也可以直接读后端提供kafka/mq,其次是流表,流表大多数都在中间件中存放,最后落地结果时可落starrocks/doris中。

流程如下:

流表创建

首先需要先创建流表的库就是流表kafka、olap的ods、dwd、dws、ads方便后续操作,再创建配置流表。

实时开发存在3版块内容,1flink sql实现实时sql方式查询,2.实时jar包上传,3.flink cdc来接入来源库

数据源CDC接入

方法1:Binlog获取,DTS配置,生成ODS流表

这里任务名和ODS任务保持一致即可,配置好MQ信息,及数据源(这里数据源指的是mysql实例信息,即连接串)

创建任务后,再进行Topic的订阅,即可把Mysql Binlog同步到MQ

方法2:使用CDC接生成ODS流表

实时表命名

dwd_一级域_二级域_ri

ads_一级主题域_二级主题域_ri

Flink SQL开发

选择flink sql模板创建flink sql任务

由于我们之前已经创建过流表如果需要把流表数据全部导入可以直接使用无代码模式

如果想写flink sql也可以单独去写

配置完flink参数可上线发布

实时运维

实时运维和任务运维一样可以看到当前实时任务在线上运行,具体细节需要flink webui定位,后续单独成一个板块讲。

DWD代码模板

以订单表,主订单为例,任务为dwd_trade_order_detail_ri,明细数据落kafka流表及hive中

--创建Source表
CREATE TABLE ods_trade_trade_order_ri (
    `message`   varchar
) WITH (
  'connector' = 'kafka',
  'topic' = 'YSHOPPING_TRADE_ORDER_CONNECTOR',
  'properties.bootstrap.servers' = 'xxx:9092,xxxx:9092',
  'properties.group.id' = 'GID_YSHOPPING_TRADE_ORDER_CONNECTOR',
  'scan.topic-partition-discovery.interval' = '20s',
  'format' = 'raw',
  'json.fail-on-missing-field' = 'false',
  'scan.startup.mode'='timestamp'--指定开始时间
  'scan.startup.timestamp-millis'='1706630400000'--指定获取数据的开始时间
);


--创建sink数据表
create table dwd_trade_order_detail_ri (
    order_id      varchar, --主订单号
    order_status_code int, --主订单状态code
    order_status_name   varchar, --主订单状态name
    product_amt int , --商品总额分
    freight_amt int  , --运费总额分
    buyer_id varchar,
    create_time datetime,
    modify_time datetime
) WITH ( 
-- 这里接收ods数据就不用再写消费者组了
   'connector' = 'kafka',
  'topic' = 'YSHOPPING_TRADE_ORDER_CONNECTOR',
  'properties.bootstrap.servers' = 'xxx:9092,xxxx:9092',
  'format' = 'raw'
);


create table dwd_trade_order_detail_2hive_ri (
--先在hive建好表
    order_id      varchar, --主订单号
    order_status_code int, --主订单状态code
    order_status_name   varchar, --主订单状态name
    product_amt int , --商品总额分
    freight_amt int  , --运费总额分
    buyer_id varchar,
    create_time varchar,
    modify_time varchar
)
WITH (
'path' = ' hdfs://cluster100/user/bdms/hive_db/yx_dwd.db/dwd_trade_order_detail_ri/ds=2024-10-22',
'krb.keytab' = 'sloth.keytab',
'krb.principal' = **********',
'part.size' = '267386880',
'rollover.interval' = '900000',
'format' = 'parquet',
'compression' = 'none',
'partition.keys' = 'date',
'connector.type'='filesystem',
'connector.property-version'='1',
'update-mode'='append',
'hdfs-site'='hdfs-site.xml',
'core-site'='core-site.xml',
'krb.conf'='krb5.conf',
'is.related.mammunt'='false',
'part.prefix'='ndi-128',
'part.suffix'='-success',
'inactivity.interval'='300000'
);


INSERT INTO dwd_trade_order_detail_ri
SELECT
    COALESCE(json_value(message,'$.orderId'),'')  as order_id
    ,cast(json_value(message,'$.orderStatusCode') as int) as order_status_code
    ,case when json_value(message,'$.orderStatusCode')='1'
          then '未付款'
          when json_value(message,'$.orderStatusCode')='2'
          then '已付款'
          when json_value(message,'$.orderStatusCode')='3'
          then '待收货'
          when json_value(message,'$.orderStatusCode')='4'
          then '已收货'
          when json_value(message,'$.orderStatusCode')='5'
          then '已完结'
          when json_value(message,'$.orderStatusCode')='6'
          then '退货'
     end as order_status_name
    ,COALESCE(cast(json_value(message,'$.productAmt') as int),0) as product_amt
    ,COALESCE(json_value(message,'$.freightAmt'),0) as freight_amt
    ,COALESCE(json_value(message,'$.buyerId'),0) as buyer_id
    ,TO_TIMESTAMP(json_value(message,'$.data'),'yyyy-MM-dd HH:mm:ss') as create_time
    ,TO_TIMESTAMP(json_value(message,'$.data'),'yyyy-MM-dd HH:mm:ss') as modify_time
    ,from_unixtime(cast(substring(JSON_VALUE (CAST (`message` as VARCHAR), '$.ts'),1,10) as bigint)) ts 
from ( 
select split_index(message,'orderLog:',1) as message
from 
ods_trade_trade_order_ri
where message like '%xxxxx%'
)
lateral table (
    json_array_to_map(message, 'data')
  ) as t (data)
where is_del=0
;

INSERT INTO dwd_trade_order_detail_2hive_ri
select order_id
      ,order_status_code
      ,order_status_name
      ,product_amt
      ,freight_amt
      ,buyer_id
      ,cast(create_time as varchar) as create_time
      ,cast(modify_time as varchar) as modify_time      
from dwd_trade_order_detail_ri


全部评论

相关推荐

讲解你都学了哪些?巴拉巴拉写数仓,springboot 写个http接口微服务了解吗?--不了解面试官说:我的理解是  巴拉巴拉虽然我也不干这个spark和flink的区别:spark批,flink实时,spark stream 微批flink实时是一条一条的嘛?他会不会很慢?  --讲了是,但是他有window 巴拉巴拉不知道讲的对不对flume  +sqoop是干什么用的;--flume 数据日志采集等等  sqoop不知道你比较熟悉什么架构,--spark,flink讲一讲flink是怎么运行的--想了一下不知道, 面试官讲就其实spark,flink就是在客户端怎么样和集群怎么样计算怎么样   (架构还是得更加了解)flink env中都有什么?--集群配置,什么名字来着、并行度  讲的不全dophinshedule了解吗?他的底层实现了解吗?你这个项目是什么网上做的吗?开源的吗哪里学的?讲一讲java基础:微服务了解吗 ?你知道java有哪些集合吗?--set  --hashset有什么区别?还有吗?面试官讲其实hashset 就是底层是hashmap实现的  巴拉巴拉  set其实是java的一个基础类bala什么时候用hashset 什么时候用hashmap--hashset通常指针对于处理去重操作,hashma用于需要存储数据时,并且hashset对比list他的查找插入时间复杂度是o(1)hashset 存放一个已有数据的时候会怎么办?--我说不会放进去 ,面试官问只是不会放进去吗?代码:链表反转  本来想让我写集合相关的,看我集合不熟就改这个了。--注意面向对象的代码书写细节反问:你们做什么业务?--主要就是flume-sqoop一些数据日志采集  一些简单的etl   包括dophinshedule一些流式管理;还有一些仓库维护;对我有什么建议吗?--实习面试侧重于一些基础一些的面试,不会问太多项目,校招侧重于具体的框架,最好能够重点了解某一个框架能够阅读源码,对于基础的语言需要重点了解不一定非得是java,但是肯定需要了解,技术栈不一定要宽,要深。没问hive kafka这个我也不会总结:要注重基础语言知识和深挖某一框架,加强基础语言代码了解
查看17道真题和解析
点赞 评论 收藏
分享
评论
点赞
6
分享

创作者周榜

更多
牛客网
牛客企业服务