数据表全量改增量操作及拉链表设计
问题背景
有同学咨询语兴说全量改增量在企业中都是怎么去做的,拉链表又是怎么做的。
全量:分区下所有数据2024-10-07 1000w
增量:分区只记录当天的数据,下所有数据2024-10-07 10w 2022-2024 总计1000w
随着企业快速增长数据量也不段变多,可能一天有10w增量,但过了10年后,数据量过亿,如果直接同步数据库会很慢(如果是只读库还好,如果是生产库直接拖挂了),同步任务一般跑半小时附近,所以我们要对全量数据改造成增量。
数据导入
建表语句
CREATE TABLE mammut_user.ods_trade_trade_order_di ( id BIGINT COMMENT '主键', order_no STRING COMMENT '主订单号,外部生成,唯一键', order_status BIGINT COMMENT '主订单状态 1待支付 2支付成功 3发货中 4已收货 5交易关闭 6退货', deliver_status BIGINT COMMENT '发货状态;1未发货,2部分发货,3已发货', product_amount BIGINT COMMENT '商品总额', freight_amount BIGINT COMMENT '运费总额', pay_amount BIGINT COMMENT '用户实付总现金', discount_amount BIGINT COMMENT '折扣总额', buyer_id BIGINT COMMENT '买家id', buyer_name STRING COMMENT '买家昵称', address_info STRING COMMENT '买家地址信息', platform STRING COMMENT '平台,安卓,ios', source_name STRING COMMENT '渠道', feature STRING COMMENT '拓展信息', is_del BIGINT COMMENT '删除标记,0正常,1删除', create_time STRING COMMENT '创建时间', modify_time STRING COMMENT '更新时间' ) COMMENT 'ods-正向订单系统-主订单-增量表' PARTITIONED BY (pt STRING COMMENT '业务日期') STORED AS PARQUET TBLPROPERTIES ('table.source'='自定义', 'table.creator'='yuxing', 'SYNC_METASTORE'='on');
数据传输任务改造
之前我们都是按照全量把数据任务同步到数仓中,现在得进行改造,在数据传输模块,先建设新任务,表名也同时修改为di(增量)模式
之前的任务
改造后的任务
ODS初始数据导入(T-3)
insert overwrite table mammut_user.ods_trade_trade_order_di partition (pt='2024-10-04') select 1 as id , '001232131' as order_no --STRING COMMENT '主订单号,外部生成,唯一键', , 1 order_status --BIGINT COMMENT '主订单状态 1待支付 2支付成功 3发货中 4已收货 5交易关闭 6退货', , 1 deliver_status--BIGINT COMMENT '发货状态;1未发货,2部分发货,3已发货', , 1000 product_amount --BIGINT COMMENT '商品总额', , 0 freight_amount--BIGINT COMMENT '运费总额', , 1000 pay_amount --BIGINT COMMENT '用户实付总现金', , 0 discount_amount--BIGINT COMMENT '折扣总额', , 001 buyer_id --BIGINT COMMENT '买家id', , '语兴01' buyer_name --STRING COMMENT '买家昵称', , '{"provinceCode":"0570","province":"浙江","cityCode":"0571","city":"杭州","districtCode":"1","district":"滨江区","streetCode":"1","street":"xxx街"}' address_info --STRING COMMENT '买家地址信息', , '安卓' platform --STRING COMMENT '平台,安卓,ios', , 'APP' source_name --STRING COMMENT '渠道', , '' feature --STRING COMMENT '拓展信息', , 0 is_del --BIGINT COMMENT '删除标记,0正常,1删除', , '2024-10-04 12:17:24' create_time --STRING COMMENT '创建时间', , '2024-10-04 12:17:24' modify_time --STRING COMMENT '更新时间'
ODS初始数据导入(T-2)
insert overwrite table mammut_user.ods_trade_trade_order_di partition (pt='2024-10-05') select 2 as id , '001232131' as order_no --STRING COMMENT '主订单号,外部生成,唯一键', , 2 order_status --BIGINT COMMENT '主订单状态 1待支付 2支付成功 3发货中 4已收货 5交易关闭 6退货', , 1 deliver_status--BIGINT COMMENT '发货状态;1未发货,2部分发货,3已发货', , 1000 product_amount --BIGINT COMMENT '商品总额', , 0 freight_amount--BIGINT COMMENT '运费总额', , 1000 pay_amount --BIGINT COMMENT '用户实付总现金', , 0 discount_amount--BIGINT COMMENT '折扣总额', , 001 buyer_id --BIGINT COMMENT '买家id', , '语兴01' buyer_name --STRING COMMENT '买家昵称', , '{"provinceCode":"0570","province":"浙江","cityCode":"0571","city":"杭州","districtCode":"1","district":"滨江区","streetCode":"1","street":"xxx街"}' address_info --STRING COMMENT '买家地址信息', , '安卓' platform --STRING COMMENT '平台,安卓,ios', , 'APP' source_name --STRING COMMENT '渠道', , '' feature --STRING COMMENT '拓展信息', , 0 is_del --BIGINT COMMENT '删除标记,0正常,1删除', , '2024-10-04 12:17:24' create_time --STRING COMMENT '创建时间', , '2024-10-05 14:25:54' modify_time --STRING COMMENT '更新时间'
ODS初始数据导入(T-1)
insert overwrite table mammut_user.ods_trade_trade_order_di partition (pt='2024-10-06') select 3 as id , '001232131' as order_no --STRING COMMENT '主订单号,外部生成,唯一键', , 3 order_status --BIGINT COMMENT '主订单状态 1待支付 2支付成功 3发货中 4已收货 5交易关闭 6退货', , 3 deliver_status--BIGINT COMMENT '发货状态;1未发货,2部分发货,3已发货', , 1000 product_amount --BIGINT COMMENT '商品总额', , 0 freight_amount--BIGINT COMMENT '运费总额', , 1000 pay_amount --BIGINT COMMENT '用户实付总现金', , 0 discount_amount--BIGINT COMMENT '折扣总额', , 001 buyer_id --BIGINT COMMENT '买家id', , '语兴01' buyer_name --STRING COMMENT '买家昵称', , '{"provinceCode":"0570","province":"浙江","cityCode":"0571","city":"杭州","districtCode":"1","district":"滨江区","streetCode":"1","street":"xxx街"}' address_info --STRING COMMENT '买家地址信息', , '安卓' platform --STRING COMMENT '平台,安卓,ios', , 'APP' source_name --STRING COMMENT '渠道', , '' feature --STRING COMMENT '拓展信息', , 0 is_del --BIGINT COMMENT '删除标记,0正常,1删除', , '2024-10-04 12:17:24' create_time --STRING COMMENT '创建时间', , '2024-10-06 21:10:31' modify_time --STRING COMMENT '更新时间'
创建交易域-订单域-父订单明细表
CREATE TABLE `mammut_user`.`dwd_trade_order_detail_di` ( order_id STRING COMMENT '主订单号,外部生成,唯一键', order_status_code BIGINT COMMENT '主订单状态code', deliver_status_code BIGINT COMMENT '发货状态code(未发货,部分发货,已发货)', product_amt BIGINT COMMENT '商品总额分', freight_amt BIGINT COMMENT '运费总额分', pay_amount BIGINT COMMENT '用户实付总现金分', discount_amt BIGINT COMMENT '折扣总额分', buyer_id string COMMENT '买家id', buyer_name STRING COMMENT '买家昵称', address_json STRING COMMENT '买家地址信息json', address_province_code STRING COMMENT '买家地址-省code', address_province_name STRING COMMENT '买家地址-省名称', address_city_code STRING COMMENT '买家地址-市code', address_city_name STRING COMMENT '买家地址-市名称', address_district_code STRING COMMENT '买家地址-区code', address_district_name STRING COMMENT '买家地址-区名称', address_street_code STRING COMMENT '买家地址-街道code', address_street_name STRING COMMENT '买家地址-街道名称', platform_name STRING COMMENT '设备来源(平台,安卓,ios)', source_name STRING COMMENT '渠道', extra STRING COMMENT '拓展信息', create_time STRING COMMENT '创建时间', modify_time STRING COMMENT '更新时间' ) COMMENT 'dwd-交易域-订单域-主订单-增量表' PARTITIONED BY (`pt` STRING COMMENT '业务日期') STORED AS PARQUET TBLPROPERTIES ('table.source'='自定义', 'table.creator'='yuxing', 'SYNC_METASTORE'='on');
之前全量数据代码
INSERT OVERWRITE TABLE mammut_user.dwd_trade_order_detail_df PARTITION(pt='${bizdate}') select t0.order_no as order_id--主订单号,外部生成,唯一键 ,t0.order_status as order_status_code--主订单状态code(主订单状态 1待支付 2支付成功 3发货中 4已收货 5交易关闭 6退货) ,t0.deliver_status as deliver_status_code --发货状态code(发货状态;1未发货,2部分发货,3已发货) ,t0.product_amount as product_amt--商品总额分 ,t0.freight_amount as freight_amt--运费总额分 ,t0.pay_amount as pay_amt--用户实付总现金分 ,t0.discount_amount as discount_amt--折扣总额分 ,cast(t0.buyer_id as string) as buyer_id--买家id ,t0.buyer_name --买家昵称 ,t0.address_info as address_json--买家地址信息json ,get_json_object(t0.address_info,'$.provinceCode') as address_province_code--买家地址-省code ,get_json_object(t0.address_info,'$.province') as address_province_name--买家地址-省名称 ,get_json_object(t0.address_info,'$.cityCode') as address_city_code--买家地址-市code ,get_json_object(t0.address_info,'$.city') as address_province_name--买家地址-买家地址-市名称 ,get_json_object(t0.address_info,'$.districtCode') as address_district_code--买家地址-区code ,get_json_object(t0.address_info,'$.district') as address_district_name--买家地址-区名称 ,get_json_object(t0.address_info,'$.streetCode') as address_street_code--买家地址-街道code ,get_json_object(t0.address_info,'$.street') as address_street_name--买家地址-街道名称 ,t0.platform as platform_name --设备来源(平台,安卓,ios) ,t0.source_name--渠道 ,t0.feature as extra--拓展信息 ,t0.create_time--创建时间 ,t0.modify_time--更新时间 from mammut_user.ods_trade_trade_order_df t0--df是之前的表 where t0.pt='${bizdate}' and is_del=0 ;
全量改增量操作
订单/交易等数据量较大的数据表设计办法有三种
拉链表
做成拉链模式,限制分区为订单创建日期(pt=substr(create_time,1,10)),并通过订单状态去做两种状态记录,状态1为active(没完结的订单),状态2为history(历史订单),并生成2个拉链时间,zipper_start_time,zipper_end_time,如果是完成的历史订单则zipper_end_time为订单完结时间,并且统一存放到history分区,如果是没完结的订单则会当到active+pt分区中存放,并且生成拉链,这样坏处则是使用比较麻烦,很有可能下游不会用这张表导致扫描很长时间。
订单id | 订单状态 | status | zipper_start_time | zipper_end_time | pt |
001 | 1 | active | 2024-01-01 14:00:14 | 2024-01-02 15:00:14 | 2024-01-01 |
001 | 2 | active | 2024-01-02 15:00:14 | 2024-01-03 15:00:14 | 2024-01-01 |
001 | 5 | history | 2024-01-03 15:00:14 | 2099-01-01 00:00:00 | 2024-01-01 |
创建交易域-订单域-父订单明细表-拉链
--拉链建表语句 CREATE TABLE `mammut_user`.`dwd_trade_order_detail_zipper_di` ( order_id STRING COMMENT '主订单号,外部生成,唯一键', order_status_code BIGINT COMMENT '主订单状态code', deliver_status_code BIGINT COMMENT '发货状态code(未发货,部分发货,已发货)', product_amt BIGINT COMMENT '商品总额分', freight_amt BIGINT COMMENT '运费总额分', pay_amt BIGINT COMMENT '用户实付总现金分', discount_amt BIGINT COMMENT '折扣总额分', buyer_id string COMMENT '买家id', buyer_name STRING COMMENT '买家昵称', address_info STRING COMMENT '买家地址信息json', address_province_code STRING COMMENT '买家地址-省code', address_province_name STRING COMMENT '买家地址-省名称', address_city_code STRING COMMENT '买家地址-市code', address_city_name STRING COMMENT '买家地址-市名称', address_district_code STRING COMMENT '买家地址-区code', address_district_name STRING COMMENT '买家地址-区名称', address_street_code STRING COMMENT '买家地址-街道code', address_street_name STRING COMMENT '买家地址-街道名称', platform_name STRING COMMENT '设备来源(平台,安卓,ios)', source_name STRING COMMENT '渠道', extra STRING COMMENT '拓展信息', create_time STRING COMMENT '创建时间', modify_time STRING COMMENT '更新时间', zipper_start_time STRING COMMENT '开链时间', zipper_end_time STRING COMMENT '关链时间', zipper_status STRING COMMENT '拉链状态 active活跃,history历史' ) COMMENT 'dwd-交易域-订单域-主订单-增量表' PARTITIONED BY (`pt` STRING COMMENT '订单日期') STORED AS PARQUET TBLPROPERTIES ('table.source'='自定义', 'table.creator'='yuxing', 'SYNC_METASTORE'='on');
创建交易域-订单域-父订单明细表-拉链代码
--订单新增 set hive.exec.dynamic.partition.mode=nonstrict; INSERT OVERWRITE TABLE mammut_user.dwd_trade_order_detail_zipper_di PARTITION(pt) select t0.order_no as order_id--主订单号,外部生成,唯一键 ,t0.order_status as order_status_code--主订单状态code(主订单状态 1待支付 2支付成功 3发货中 4已收货 5交易关闭 6退货) ,t0.deliver_status as deliver_status_code --发货状态code(发货状态;1未发货,2部分发货,3已发货) ,t0.product_amount as product_amt--商品总额分 ,t0.freight_amount as freight_amt--运费总额分 ,t0.pay_amount as pay_amt--用户实付总现金分 ,t0.discount_amount as discount_amt--折扣总额分 ,cast(t0.buyer_id as string) as buyer_id--买家id ,t0.buyer_name --买家昵称 ,t0.address_info as address_json--买家地址信息json ,get_json_object(t0.address_info,'$.provinceCode') as address_province_code--买家地址-省code ,get_json_object(t0.address_info,'$.province') as address_province_name--买家地址-省名称 ,get_json_object(t0.address_info,'$.cityCode') as address_city_code--买家地址-市code ,get_json_object(t0.address_info,'$.city') as address_city_name--买家地址-买家地址-市名称 ,get_json_object(t0.address_info,'$.districtCode') as address_district_code--买家地址-区code ,get_json_object(t0.address_info,'$.district') as address_district_name--买家地址-区名称 ,get_json_object(t0.address_info,'$.streetCode') as address_street_code--买家地址-街道code ,get_json_object(t0.address_info,'$.street') as address_street_name--买家地址-街道名称 ,t0.platform as platform_name --设备来源(平台,安卓,ios) ,t0.source_name--渠道 ,t0.feature as extra--拓展信息 ,t0.create_time--创建时间 ,t0.modify_time--更新时间 ,t0.create_time as zipper_start_time ,'2099-01-01 00:00:00' as zipper_end_time ,'active' as zipper_status ,substr(create_time,1,10) as pt from mammut_user.ods_trade_trade_order_di t0 where t0.pt='${azkaban.flow.1.days.ago}' ; --后续变更 set hive.exec.dynamic.partition.mode=nonstrict; INSERT OVERWRITE TABLE mammut_user.dwd_trade_order_detail_zipper_di PARTITION(pt) select t0.order_id--主订单号,外部生成,唯一键 ,t0.order_status_code--主订单状态code(主订单状态 1待支付 2支付成功 3发货中 4已收货 5交易关闭 6退货) ,t0.deliver_status_code --发货状态code(发货状态;1未发货,2部分发货,3已发货) ,t0.product_amt--商品总额分 ,t0.freight_amt--运费总额分 ,t0.pay_amt--用户实付总现金分 ,t0.discount_amt--折扣总额分 ,t0.buyer_id--买家id ,t0.buyer_name --买家昵称 ,t0.address_info as address_json--买家地址信息json ,t0.address_province_code--买家地址-省code ,t0.address_province_name--买家地址-省名称 ,t0.address_city_code--买家地址-市code ,t0.address_city_name--买家地址-买家地址-市名称 ,t0.address_district_code--买家地址-区code ,t0.address_district_name--买家地址-区名称 ,t0.address_street_code--买家地址-街道code ,t0.address_street_name--买家地址-街道名称 ,t0.platform_name --设备来源(平台,安卓,ios) ,t0.source_name--渠道 ,t0.extra--拓展信息 ,t0.create_time--创建时间 ,t0.modify_time--更新时间 ,t0.zipper_start_time as zipper_start_time ,least(t1.modify_time,lead(zipper_start_time) over(partition by t0.order_id order by t0.zipper_start_time asc)) as zipper_end_time ,if(t1.order_status not in ('5','6') ,'active','history') as zipper_status ,SUBSTR(t0.create_time,1,10) as pt from mammut_user.dwd_trade_order_detail_zipper_di t0 left join mammut_user.ods_trade_trade_order_di t1 on t1.pt='${azkaban.flow.1.days.ago}' and t0.order_id=t1.order_no and t1.is_del=0 where t1.modify_time<>t1.create_time and t0.pt>='${azkaban.flow.60.days.ago}' union select t0.order_id--主订单号,外部生成,唯一键 ,t1.order_status as order_status_code--主订单状态code(主订单状态 1待支付 2支付成功 3发货中 4已收货 5交易关闭 6退货) ,t1.deliver_status as deliver_status_code --发货状态code(发货状态;1未发货,2部分发货,3已发货) ,t1.product_amount as product_amt--商品总额分 ,t1.freight_amount as freight_amt--运费总额分 ,t1.pay_amount as pay_amt--用户实付总现金分 ,t1.discount_amount as discount_amt--折扣总额分 ,t0.buyer_id--买家id ,t1.buyer_name --买家昵称 ,t1.address_info as address_json--买家地址信息json ,get_json_object(t1.address_info,'$.provinceCode') as address_province_code--买家地址-省code ,get_json_object(t1.address_info,'$.province') as address_province_name--买家地址-省名称 ,get_json_object(t1.address_info,'$.cityCode') as address_city_code--买家地址-市code ,get_json_object(t1.address_info,'$.city') as address_city_name--买家地址-买家地址-市名称 ,get_json_object(t1.address_info,'$.districtCode') as address_district_code--买家地址-区code ,get_json_object(t1.address_info,'$.district') as address_district_name--买家地址-区名称 ,get_json_object(t1.address_info,'$.streetCode') as address_street_code--买家地址-街道code ,get_json_object(t1.address_info,'$.street') as address_street_name--买家地址-街道名称 ,t0.platform_name --设备来源(平台,安卓,ios) ,t0.source_name--渠道 ,t1.feature as extra--拓展信息 ,t0.create_time--创建时间 ,t1.modify_time--更新时间 ,t1.modify_time as zipper_start_time ,'2099-01-01 00:00:00' as zipper_end_time ,if(t1.order_status not in ('5','6') ,'active','history') as zipper_status ,SUBSTR(t0.create_time,1,10) as pt from mammut_user.dwd_trade_order_detail_zipper_di t0 left join mammut_user.ods_trade_trade_order_di t1 on t1.pt='${azkaban.flow.1.days.ago}' and t0.order_id=t1.order_no and t1.is_del=0 where t0.pt>='${azkaban.flow.60.days.ago}' and t1.modify_time<>t1.create_time ;
创建订单T-3
创建拉链T-2
创建拉链T-1
回刷分区模式
回刷分区模式,限制分区pt=substr(create_time,1,10),通过动态分区每日回刷近半年订单数据,好处在于直接使用pt分区即可,坏处在于看不到中间变化状态,注意一定要先回刷再创建。
--历史订单 set hive.exec.dynamic.partition.mode=nonstrict; INSERT OVERWRITE TABLE mammut_user.dwd_trade_order_detail_di PARTITION(pt) select t0.order_id --主订单号,外部生成,唯一键 ,coalesce(t1.order_status,t0.order_status_code) as order_status_code--主订单状态code(10初始化,1000待支付,2000交易中,4000交易成功,7000交易失败,7500定金已支付,交易关闭,8010已取消) ,coalesce(t1.deliver_status,t0.deliver_status_code) as deliver_status_code --发货状态code(0卖家未发货,1卖家已发货,2平台已收货,3平台已发货,4买家已收货,5卖家已收货) ,coalesce(t1.product_amount,t0.product_amt) as product_amt--商品总额分 ,coalesce(t1.freight_amount,t0.freight_amt) as freight_amt--运费总额分 ,coalesce(t1.pay_amount,t0.pay_amt) as pay_amt--用户实付总现金分 ,coalesce(t1.discount_amount,t0.discount_amt) as discount_amt--折扣总额分 ,cast(t0.buyer_id as string) as buyer_id--买家id ,coalesce(t1.buyer_name,t0.buyer_name) as buyer_name --买家昵称 ,coalesce(t1.address_info,t0.address_json) as address_json--买家地址信息json ,coalesce(get_json_object(t1.address_info,'$.provinceCode'),t0.address_province_code) as address_province_code--买家地址-省code ,coalesce(get_json_object(t1.address_info,'$.province'),t0.address_province_name) as address_province_name--买家地址-省名称 ,coalesce(get_json_object(t1.address_info,'$.cityCode'),t0.address_city_code) as address_city_code--买家地址-市code ,coalesce(get_json_object(t1.address_info,'$.city'),t0.address_province_name) as address_province_name--买家地址-买家地址-市名称 ,coalesce(get_json_object(t1.address_info,'$.districtCode'),t0.address_district_code) as address_district_code--买家地址-区code ,coalesce(get_json_object(t1.address_info,'$.district'),t0.address_district_name) as address_district_name--买家地址-区名称 ,coalesce(get_json_object(t1.address_info,'$.streetCode'),t0.address_street_code) as address_street_code--买家地址-街道code ,coalesce(get_json_object(t1.address_info,'$.street'),t0.address_street_name) as address_street_name--买家地址-街道名称 ,coalesce(t1.platform,t0.platform_name) as platform_name --设备来源(平台,安卓,ios) ,coalesce(t1.source_name,t0.source_name) as source_name--渠道 ,coalesce(t1.feature,t0.extra) as extra--拓展信息 ,t0.create_time--创建时间 ,coalesce(t1.modify_time,t0.modify_time) as modify_time--更新时间 ,t0.pt from mammut_user.dwd_trade_order_detail_di t0 left join mammut_user.ods_trade_trade_order_di t1 on t0.order_id=t1.order_no and t1.pt='${azkaban.flow.1.days.ago}' where t0.pt>='${azkaban.flow.60.days.ago}' union all --当日订单 select t0.order_no as order_id--主订单号,外部生成,唯一键 ,t0.order_status as order_status_code--主订单状态code(10初始化,1000待支付,2000交易中,4000交易成功,7000交易失败,7500定金已支付,交易关闭,8010已取消) ,t0.deliver_status as deliver_status_code --发货状态code(0卖家未发货,1卖家已发货,2平台已收货,3平台已发货,4买家已收货,5卖家已收货) ,t0.product_amount as product_amt--商品总额分 ,t0.freight_amount as freight_amt--运费总额分 ,t0.pay_amount as pay_amt--用户实付总现金分 ,t0.discount_amount as discount_amt--折扣总额分 ,cast(t0.buyer_id as string) as buyer_id--买家id ,t0.buyer_name --买家昵称 ,t0.address_info as address_json --买家地址信息json ,get_json_object(t0.address_info,'$.provinceCode') as address_province_code--买家地址-省code ,get_json_object(t0.address_info,'$.province') as address_province_name--买家地址-省名称 ,get_json_object(t0.address_info,'$.cityCode') as address_city_code--买家地址-市code ,get_json_object(t0.address_info,'$.city') as address_province_name--买家地址-买家地址-市名称 ,get_json_object(t0.address_info,'$.districtCode') as address_district_code--买家地址-区code ,get_json_object(t0.address_info,'$.district') as address_district_name--买家地址-区名称 ,get_json_object(t0.address_info,'$.streetCode') as address_street_code--买家地址-街道code ,get_json_object(t0.address_info,'$.street') as address_street_name--买家地址-街道名称 ,t0.platform as platform_name --设备来源(平台,安卓,ios) ,t0.source_name--渠道 ,t0.feature as extra--拓展信息 ,t0.create_time--创建时间 ,t0.modify_time--更新时间 ,substr(t0.create_time,1,10) AS PT from mammut_user.ods_trade_trade_order_di t0 where t0.pt='${azkaban.flow.3.days.ago}' and substr(t0.create_time,1,10)='${azkaban.flow.3.days.ago}' and is_del=0 ;