|
贝壳找房基于Flink+Paimon进行全量数据实时分组排序的实践
贝壳找房基于Flink+Paimon进行全量数据实时分组排序的实践
吴俊迪@贝壳找房
Apache Flink
Apache Flink Apache Flink 中文社区唯一官微,由 Flink PMC 维护; 490篇内容
2024年06月24日 20:01
湖南
摘要:本文投稿自贝壳家装数仓团队,在结合家装业务场景下所探索出的一种基于 Flink+Paimon 的排序方案。这种方案可以在实时环境对全量数据进行准确的分组排序,同时减少对内存资源的消耗。在这一方案中,引入了“事件时间分段”的概念,以避免 Flink State 中冗余数据对排序结果的干扰,在保证排序结果准确性的同时,减少了对内存的消耗。并且基于数据湖组件 Paimon 的聚合模型和 Audit Log 数据在数据湖内构建了拉链表,为排序结果提供了灵活的历史数据基础。内容主要分为以下四个部分:1. 背景2. 现有的实时分组排序的一些做法及其特点3.Flink+Paimon进行全量数据实时分组排序4.总结与展望Tips:点击「阅读原文」跳转阿里云实时计算 Flink~01背景家装家居业务,作为贝壳“一体三翼”战略的重要组成部分,在房屋回归居住属性的大背景下,正在迅猛发展。在过去的 2023 年中,家装家居业务合同额为 133 亿元,可比口径同比增长 93%;净收入取得 74% 的可比口径同比增幅,达到 109 亿元。其中,北京和杭州的合同额突破 20 亿,上海全年合同额超 10 亿,合同额超过 5 亿的城市共有 6 个。此外,家装家居业务在更多城市跑出正循环,全年有11个城市实现运营利润为正。在家装业务中,随着设计师对客户需求和期望的逐步了解和把握,一个装修项目往往会产生多份合同。在合同签订的过程中,业务上期望通过合同的顺序、内容、金额等信息实时匹配相应的风控和运营策略,以实现智能额度、动态尾款等等功能,为客户提供更加优质的服务体验。要实现这些功能,就需要对全量合同数据进行分组排序。然而,在实时场景中,这是一个不小的挑战。为了解决这个问题,我们进行了一系列的探索和尝试。02现有的实时分组排序的一些做法及其特点首先,结合以往的经验,我们梳理了现有在实时链路中进行分组排序的方法。第一种方法是基于 Flink 引擎自带的排序能力进行分组排序。该方法实现简单,逻辑直观,但是比较依赖内存资源。随着数据量的增长,排序任务所需要的内存资源会不断增加,这会给本就紧张的内存资源带来更大的压力。第二种方法是基于离线处理结果和实时处理结果的融合计算分组排序。这种方法可以在一定程度上缓解内存压力,并且复用已有的离线计算结果。然而,同时维护并迭代离线和实时两套数据链路,会带来不小的运维压力。第三种方法则是通过 Redis、Hbase 等 NoSQL 组件来存储和累计实时排序结果。这种占用内存少,不依赖批处理结果。然而,由于无法提供类似事务的 ACID 保证,存在数据脏读,进而伴随一定程度的统计误差。通过上诉分析可以看到,在实时链路中,现有的分组排序方案在处理全量数据时,很难同时兼顾内存资源、数据链路和数据准确性三个方面。 03Flink+Paimon进行全量数据实时分组排序在实时数据的处理中,我们希望在需要保证数据时效性的前提下,避免依赖额外的批处理逻辑,同时减少对内存的消耗。以 Paimon 为代表的数据湖组件的兴起为我们提供了全新的思路。它具备强大的数据存储和加工能力,并且能够顺滑地与 Flink 引擎配合,进一步扩大了 Flink 生态的实时处理领域的领先优势。接下来,我们将详细介绍基于 Flink+Paimon 进行全量数据实时排序的新方式。整体思路可以概括为两个核心步骤:第一步,利用 Flink 引擎接收增量流入的数据,依托其 State 在内存中对数据进行排序,并将排序结果存储到数据湖中;第二步,从数据湖中读取历史时点的累计结果,结合第一步得到的内存排序结果一起计算得出最终的准确结果。这两个步骤循环配合,互为起止,形成了一个完整的数据处理流程。为了方便表述说明,在介绍第一步的时候,我们假设第二步所产出的数据已经就绪。而在介绍第二步的时候,我们将展开讲解究竟如何产出了第一步所依赖的数据。3.1 基于历史数据的实时分组排序在不引入额外的批处理逻辑,并且不把全量数据保留在内存中的前提下,想要实现分组排序,需要解决两个问题。其一,需要引入其他组件暂存结果,并且需要保证 Flink 引擎对于暂存结果的写入和读取不能同时针对同一条记录,否则会造成脏读;其二,未及时过期的 Flink State 数据与暂存结果之间不能出现重叠,否则会造成双算,导致结果偏高。我们借鉴了 Flink 引擎中的滚动窗口思路,引入一个“事件时间分段”的概念,将实时数据根据事件时间分段,以得到准确的分组排序结果。接下来,我们举个例子来具体说明数据的处理过程。现在假设一个项目A,它在历史上已经签约过两份合同,分别为 C1 和 C2 ,它们的签约时间对应为T1和T2。实时数据中新流入一条合同签约记录 C3,对应的时间为 T3。将事件时间进行取整处理后,我们得到它们分别归属于起点为 TG1 和 TG2 的两个时间分段内。 SQL 代码可以参考如下样例:selectproject_id--项目id,contract_id--合同id,sign_time--签约时间,FROM_UNIXTIME(UNIX_TIMESTAMP(sign_time,'yyyy-MM-ddHH:mm:ss')/(24*60*60)*(24*60*60),'yyyy-MM-ddHH:mm:ss')asevent_time_group_start_point--事件时间分段起点fromdefault_catalog.default_database.mysql_project_contract_table--项目合同信息接下来,在 Flink SQL 的排序代码中,将项目id和事件时间段的起点一同作为分组主键,对数据按照事件时间的进行排序。SQL代码可以参考如下样例:insertintopaimon_catalog.paimon_db.dwd_project_contract_memory_result_table--实时数据排序结果selectproject_id--项目id ,contract_id -- 合同id ,sign_time -- 签约时间 ,event_time ,event_time_group_start_point -- 事件时间分段起点`,DENSE_RANK()over(PARTITIONBYproject_id,event_time_group_start_pointORDERBYevent_timeasc)asmem_rn--实时数据排序结果frompaimon_catalog.paimon_db.dwd_project_contract_table--项目合同信息`然后,我们需要根据每个事件时间分段的起点,找到对应的累计结果快照。累计结果快照代表了截至某个时点,各个装修项目已签约的合同数。(累计结果快照具体的实现方式将在3.2详细展开。) 读取到的累计结果快照代表截止各分段起点的历史结果,而实时排序结果则代表了自各分段起点起之后的增量结果。将这两个结果相加,我们就得到了准确的排序结果。这个方法中,读入的累计结果是之前时刻的快照,它已经是既定事实,不会再被更改,进而避免了脏读;各个累计结果与增量结果之间以事件时间分段起点为边界,进而避免了双算问题。此外,在实际应用中,我们还需要考虑如何处理历史State的逐步失效问题。由于 State 的生命周期是有限的,随着时间的推移,一些早期的State可能会被清除或失效。为了应对这种情况,我们可以将 State 的生命周期设置成略长于事件时间分段的间隔。这样,即使一些早期的 State 失效了,我们仍然能够确保最新的事件时间分段内的数据能够在内存中进行完整的排序,进而持续得到准确结果。 当然,对于那些已经失效的历史事件时间分段,其排序结果可能会因为数据清除而逐渐变小。为了解决这个问题,我们需要在排序操作的输出端取得每行数据排序结果的最大值。因此,在实际应用中,我们使用 Paimon 聚合模型来承接排序结果。到目前为止,我们已经基于历史数据和有限的内存消耗在实时环境中实现了全量数据的分组排序。但是,这一操作的基础,是我们一直都是在假设的这样一份“随叫随到”的累计结果快照。那么,这种历史数据究竟是以如何生成、存储、和使用的呢?接下来我们将详细讨论这一部分。3.2排序如何通过 Paimon 存储和呈现历史累计数据接下来,让我们讨论如何生成并且呈现出各个事件时间分段起点的历史累计结果。基于分组排序结果,我们可以在 Paimon 中采用聚合模型,以业务分组字段作为主键,获取各个分组中最大的事件时间和最大的排序结果。在我们的例子中,就是以项目 id 作为主键,得到项目A最新一份合同的签约时间,以及当前项目 A 所对应的合同中,最大的分组排序结果是多少。 SQL 代码可以参考如下样例:--分组内聚合的累计结果CREATETABLEifnotexistspaimon_catalog.paimon_db.dwd_project_contract_final_result_max_table(project_idbigintCOMMENT'项目id',max_rnbigintCOMMENT'该项目下当前最大的rn值',max_event_timestringCOMMENT'该项目下当前最大的event_time',PRIMARYKEY(project_id)NOTENFORCED)WITH('merge-engine' ='aggregation','changelog-producer' ='lookup','fields.max_rn.aggregate-function' ='max','fields.max_rn.ignore-retract'='true','fields.max_event_time.aggregate-function' ='max','fields.max_event_time.ignore-retract'='true');insertintopaimon_catalog.paimon_db.dwd_project_contract_final_result_max_tableselectproject_id--项目id,asc_rn--分组排序结果,sign_time--合同签约时间(事件时间)frompaimon_catalog.paimon_db.dwd_project_contract_final_result_table--分组排序结果随着数据的不断流入,累计结果表中的记录会被不断地更新,呈现出一个又一个版本。为了把累计结果的各个版本反映到历史时间轴上,我们想到了离线数据开发中的拉链表概念。构建拉链表的关键步骤,是将相邻版本的数据记录拼接到同一行,以得到一个版本的起止时间。Paimon 提供了一系列系统表,其中的Audit Log表记录了数据变更的完整日志,为构建累计结果的版本变更信息提供了可能。基于累计结果表的 Audit Log 记录,我们创建了一张 Paimon 表来存储累计结果每个版本的起止时间信息。这张表包含两组字段:第一组字段用于表示累计结果每个版本的起始时刻,我们通过不断解析累计结果的-U日志来更新这些字段;第二组字段则用于表示累计结果每个版本的终止时间,我们利用累计结果的+U日志来实时更新这些字段。通过这种方式,我们得以追踪并体现出了累计结果的版本变更信息。 SQL 代码可以参考如下样例:CREATETABLEifnotexistspaimon_catalog.paimon_db.dwd_project_contract_final_result_table_max_history_table(project_idbigintCOMMENT'项目id',pre_max_rnbigintCOMMENT'该项目下当前最大的rn值',pre_max_event_timestringCOMMENT'该项目下当前最大的event_time' ,next_max_event_time string COMMENT '被更新后该项目下当前最大的event_time' ,PRIMARY KEY (project_id) NOT ENFORCED`)WITH('merge-engine'='aggregation','changelog-producer'='lookup','fields.pre_max_rn.aggregate-function'='max','fields.pre_max_rn.ignore-retract'='true','fields.pre_max_event_time.aggregate-function'='max','fields.pre_max_event_time.ignore-retract'='true','fields.next_max_event_time.aggregate-function'='max','fields.next_max_event_time.ignore-retract'='true');INSERTINTOpaimon_catalog.paimon_db.dwd_project_contract_final_result_table_max_history_tableselectproject_id ,max_rn ,max_event_time ,'0000-00-0000:00:00'frompaimon_catalog.paimon_db.dwd_project_contract_final_result_max_table$audit_logwhererowkind='-U'unionallselectproject_id,0,'0000-00-0000:00:00' ,max_event_time frompaimon_catalog.paimon_db.dwd_project_contract_final_result_max_table$audit_logwhererowkind='+U'现在,我们已经成功记录了累计结果的版本变更信息。接下来的目标,就是展现历史上所有的版本变更记录。为此,我们再次将目光投向了 Audit Log 表。累计结果的版本变更信息表对应的 Audit Log 表中记录了其中各个版本出现的痕迹。因此,我们决定读取其中的 +I 和 +U 数据。在读取这些数据时,我们还需要进行一定的过滤操作。因为在数据更新的过程中,会出现起止时间相等的版本。这种情况通常是由于 Audit Log 表中下一次的 -U 和上一次 +U 所包含的信息相同所导致的。为了避免冗余信息的干扰,我们需要在读取数据时进行甄别,过滤掉这些起止时间相等的版本。经过这样的处理,就能够将所有的历史版本平铺呈现出来。除了这些历史版本之外,拉链表中仍然需要包含当前位置的最新版本。所以,在展示历史版本的同时,还需要将当前最新结果合并到拉链表中,以确保数据的完整性和准确性。 SQL 代码可以参考如下样例:CREATETABLEifnotexistspaimon_catalog.paimon_db.dwd_project_contract_base_version_table(project_idbigintCOMMENT'项目id',base_ver_start_timestringCOMMENT'该版本开始时间',base_ver_end_timestringCOMMENT'该版本结束时间',base_ver_max_rnbigintCOMMENT'该版本最大的rn',PRIMARYKEY(project_id,base_ver_start_time)NOTENFORCED)WITH('merge-engine' ='deduplicate','changelog-producer' ='lookup');INSERTINTOpaimon_catalog.paimon_db.dwd_project_contract_base_version_table--累计结果拉链表selectproject_id,pre_max_event_timeasbase_ver_start_time,next_max_event_timeasbase_ver_end_time,pre_max_rnasbase_ver_max_rnfrompaimon_catalog.paimon_db.dwd_project_contract_final_result_max_history_table$audit_log--累计结果的版本变更信息的auditlogwhererowkindin('+I','+U')andpre_max_event_timenext_max_event_time`unionallselectproject_id,max_event_timeasbase_ver_start_time,'9999-99-9999:99:99'asbase_ver_end_time,max_rnasbase_ver_max_rnfrompaimon_catalog.paimon_db.dwd_project_contract_final_result_max_table--累计结果3.3Flink+Paimon 全量数据实时分组排序链路全景在深入剖析了整体方案的两大核心操作思路及其具体实现后,让我们从整体角度审视整个方案的架构和各个组件之间的交互关系。整个方案由若干 Flink 任务和 Paimon 模型组成,整体链路如下图。在这个方案中,实时流入的数据是整个流程的起点。这些数据在经过一系列的排序和处理后,会不断更新数据湖中的累计结果拉链表。这个拉链表不仅记录了累计结果的历史变化,还为后续的实时任务提供了历史时点的累计结果快照。通过这种方式,数据在整个方案中实现了循环流转和持续更新。在这个流程中,通过实时数据与 Paimon 表进行 look up join 的方式,实现内存排序结果与累计结果快照的关联。在关联过程中,需要在匹配分组业务主键的同时,将事件时间分段起点与拉链表中各个版本的起止时间进行比较。SQL 代码可以参考如下样例:CREATETABLEifnotexistspaimon_catalog.paimon_db.dwd_project_contract_final_result_table(project_idbigintCOMMENT'项目id',contract_idbigintCOMMENT'合同id',sign_timestringCOMMENT'合同签约时间',event_timeTIMESTAMP(3)COMMENT'sign_time作为event_time',event_time_group_start_pointSTRINGCOMMENT'业务时间分段起点',mem_rnbigintCOMMENT'这条数据在内存排序中排出来的rn',base_ver_start_timestringCOMMENT'该版本开始时间',base_ver_end_timestringCOMMENT'该版本结束时间',base_ver_max_rnbigintCOMMENT'该版本最大的rn',asc_rnbigintCOMMENT'最终排序结果',PRIMARYKEY(project_id,contract_id,sign_time,event_time,event_time_group_start_point)NOTENFORCED)WITH('merge-engine' ='aggregation','changelog-producer' ='lookup','fields.mem_rn.aggregate-function'='max','fields.mem_rn.ignore-retract'='true','fields.base_ver_start_time.ignore-retract'='true','fields.base_ver_end_time.ignore-retract'='true','fields.base_ver_max_rn.aggregate-function' ='max' ,'fields.base_ver_max_rn.ignore-retract' = 'true' ,'fields.asc_rn.aggregate-function'= 'max','fields.asc_rn.ignore-retract'='true');insertintopaimon_catalog.paimon_db.dwd_project_contract_final_result_table--实时数据lookupjoin累计结果的历史快照selects.project_id,s.contract_id,s.sign_time,s.event_time,s.event_step_start_point,s.mem_rn,coalesce(base.base_ver_start_time,'0000-00-0000:00:00')asbase_ver_start_time,coalesce(base.base_ver_end_time,'0000-00-0000:00:00')asbase_ver_end_time ,coalesce(base.base_ver_max_rn ,0) as base_ver_max_rn ,s.mem_rn+coalesce(base.base_ver_max_rn,0)asasc_rnfrompaimon_catalog.paimon_db.dwd_project_contract_memory_result_tables--实时数据的内存排序结果leftjoinpaimon_catalog.paimon_db.dwd_project_contract_base_version_tableFORSYSTEM_TIMEASOFs.proctimeASbase--累计结果拉链表ons.project_id=base.project_idands.event_step_start_point>=base.base_ver_start_timeands.event_step_start_point
|
|