找回密码
 会员注册
查看: 20|回复: 0

哔哩哔哩数据建设之路—实时DQC篇

[复制链接]

2万

主题

0

回帖

7万

积分

超级版主

积分
72245
发表于 2024-10-6 23:38:57 | 显示全部楼层 |阅读模式
本期作者韩志华哔哩哔哩大数据平台工具负责人冯益峰哔哩哔哩资深开发工程师王鼎哔哩哔哩高级开发工程师背景数据质量是基于大数据衍生的应用有效与否的重要的前提和保障之一。B站现在高速发展的业务需求以及未来能够依靠大数据孵化出更有深度和竞争力应用的愿景,都要求我们数据平台能够提供实时的、准确的、可以被各个业务方所信赖的数据。可以说,可信赖的数据,是大数据平台核心竞争力的体现。因此,在B站的大数据平台的建设过程中,数据质量平台成为了不可或缺的一环,因为它的使命便是为大数据平台的数据质量保驾护航。?质量平台平台组件?DQC简介功能图DQC主要的工作链路与普通的监控系统如prometheus基本相似,过程都包括数据采集、数据检查、告警通知,都需要尽量减少对监控对象正常工作的影响,同时对监控对象的异常做到及时告警。功能上有离线和实时之分离线DQC:主要用来保障由离线任务生产出来的数据,触发规则主要是由离线调度任务完成后来通知DQC做采集,然后再根据阈值进行检查及告警。通常用户配置检查规则,一般会在下一次任务完成后生效。实时DQC:主要针对Kafka数据源。Kafka是大数据平台中一个非常重要的基础组件,特别是在实时数仓中,使用的则更加频繁。一般是根据规则在固定的检查周期对在一定窗口数据进行采集及检查。通常用户配置检查规则,一般会在下一个数据周期后生效。实时DQC?第一版方案公司内部流计算的主流框架是Flink,所有实时DQC也是采用Flink计算框架。实时DQC 相对离线DQC复杂不少,因为它的数据采集任务是一直运行着的,向一个已经运行的任务中去更新规则,或者新增采集对象,有一定的难度。因此在第一版方案中,用户每新增一个采集对象或一个规则,DQC服务会为用户重新生成一个新的Flink任务来完成采集,并将采集结果写入到MySQL中。同时定时触发质量规则检查,将检查结果写入到MySQL中,并根据检查结果判断是否做告警通知。这个方案架构简单,但是仍存在以下弊端:资源利用率低:?由于一个检查规则一个数据采集任务,导致Flink任务大多数时间都比较空闲,特别在一些流量小的Topic更加明显。网络带宽消耗大:?对一个Topic不同的规则,需要多个任务去消费。如果Topic流量很大,对网络带宽会有很大冲击。稳定性差:?规则的修改需要重启数据采集任务,集群资源紧张时可能因为申请不到YARN/K8s资源而导致启动失败。监控程序本身占用资源大,在降本增效为主流的现在,是不合时宜的。因此,我们迫切需要对实时DQC的方案进行改造,在资源使用上有下面三个要求:一次启动,不再重启。避免资源竞争,提高系统可用性。一个任务,多Topic共用。实际应用中,一些Topic流量很小,导致Flink任务大多数时间都比较空闲。如果一个Flink可以多消费几个这样Topic,相同的资源做更多的事,资源利用率更高。一次消费,多规则校验。避免多任务重复消费一个Topic,减少网络带宽消耗。?新的方案设计目标:实时DQC采集程序的准确性;规则检查的及时性;最大限度的减低实时DQC的资源消耗;最大限度的减低Kafka正常作业的影响;总体架构目前线上Topic 有7000+,为了便于管理,主要根据Influxdb能接受的QPS、及Flink任务利用率,实时DQC把Topic划分为大Topic、中Topic、小Topic。小Topic: 消息量 10w/s。方案分析:大中小Topic分开管理,走不同的路径来做数据采集,中小Topic中的数据全量导入到全量表中(字段会根据检查规则做过滤),然后利用Influxdb CQ功能做汇聚存入CQ表完成数据采集,而大Topic则在Flink任务中直接算出最终的汇聚数据存入CQ表;Influxdb 全量表:中小Topic使用,保存Topic中的全量数据(字段会根据检查规则做过滤),一般有效时间为一个小时;Influxdb CQ表:所有Topic都会使用, 保存了Topic的汇聚数据,用于规则检查,一般有效时间为二个星期;Topic动态化:运行时可新增或删除监控的Topic,无需重启Fllink任务;规则动态化:运行时可以新增、删除、修改Topic的监控规则,无需重启Fllink任务;DQC资源管理:这里包括两个部分,一个Flink任务和Influxdb资源,合理利用这两个资源,对Topic或者Topic的规则进行动态分配管理;中小Topic方案数据全量导入到全量表中(字段会根据检查规则做过滤)Topic、Mapper支持动态化:Topic动态化,Topic列表维护在配置中心,KafkaConsumer可以感知配置的变化从而达到动态的增删Topic;Mapper动态化,根据消费Topic 列表、Topic内部的数据格式、及对应DQC规则,动态形成Mapper的处理逻辑,推到Mapper Warpper;中小Topic方案—kafkaconsumer解决方案:因为开发人力、时间的原因,没有完全开发一个新的支持动态Topic的Kafka Consumer,而是扩展了FlinkKafkaConsumer,KafkaFetcher & KafkaConsumerThread采用hack方式处理。中小Topic方案—Mapper解决方案:根据TopicList、Topic内容、DQC规则形成Mapper的bytecode,以Base64方式存入配置中心,Mapper Wrapper动态获取,形成新的Mapper,来替换老的Mapper。大Topic方案由于大型Topic流量很大,如果还和中小型Topic使用相同的方案,不管是对网络带宽,还是对底层存储,都会造成非常大的压力。因此我们选择了另一种方案------单Topic单任务 + 规则动态。每一个Flink任务只消费一个Topic,在这个任务中,我们需要能够动态感知针对该Topic的检查规则的变化,并根据检查规则对消费到的实时数据流进行打标,对打标后的结果按窗口进行汇聚。动态逻辑:规则动态化,规则元信息维护在配置中心,FlatMap可以感知规则变化,将规则命中的记录向后输出进行聚合规则动态为了降低用户的学习成本,在该方案中,我们与离线DQC配置保持一致,用户可以通过SQL自定义where子句对数据进行过滤,实现更加精细的质量检查。规则解析器会对SQL中的where子句部分进行词法解析,生成对应的过滤器,用于数据过滤,对符合规则的记录,打上规则ID后,根据规则ID向后输出后由聚合算子进行局部的聚合。数据膨胀在规则打标后,输出到窗口进行聚合的过程中,可能会产生数据膨胀。如上图所示,当数据进入FlatMap后,FlatMap会根据规则过滤器对数据打上规则ID标签,再将数据向后发送给聚合算子,聚合算子根据规则进行聚合。如果一条记录没有命中任何规则,则不会向后输出。从上图可以看出,输入条数为1,输出条数为4,数据膨胀4倍。这是由于我们向后输出的数据格式为,使用规则ID作为分组条件。一条记录命中多个规则,则会输出多条记录。由于该方案本身是针对大Topic的,流量本身就很大,在经过规则过滤器放大,带宽压力很重,不符合设计初衷。针对这种情况,我们做了两个优化:FlatMap数据输出格式调整为 。将数据命中的所有规则ID合并在一起,向后输出。聚合算子收到数据后,根据RuleIdList进行业务计算。使用map -> reduce -> reduce架构,对规则打标后输出的结果,会先进行一次局部聚合,最后再进行全局规约数据汇总。但是,这个设计并不能彻底解决问题。规则处理逻辑分组key表行数类规则局部累加,汇总时将局部计算结果累加得到汇总值数据hash汇总值类规则局部累加,汇总时将局部计算结果累加得到汇总值数据hash最大值/最小值类规则局部取最大/小值,汇总时将局部计算结果取最大/小值得到全局的最大/小值数据hash平均数类规则局部计算,汇总时将局部计算结果合并计算全局的得到全局的数据hash字段去重(Distinct)类规则局部对字段值去重后向后输出一遍,汇总时累加字段数量字段值经过对实时DQC质量规则分析,除了字段去重(Distinct)类规则外,其他所有规则,对分组的key的具体取值是完全不依赖,因为局部聚合与最终汇总时的业务逻辑是一致的。然而字段去重(Distinct)类规则是在局部进行去重,最终汇总时则是将计算去重后的记录数量,因此在局部聚合时分组key必须使用字段值。对此,我们调整了第一步的优化方案:当Topic没有字段去重(Distinct)类规则,将所有规则合并输出,输出格式为当Topic有一个字段去重(Distinct)类规则,则将所有规则合并输出,输出格式为 当Topic有多个字段去重(Distinct)类规则,且使用不同的字段时。将其中一个与其他所有非去重类规则合并输出。剩余的数值字段去重(Distinct)类规则单独输出。最终形态如下图所示:在优化之前,由于FlatMap的数据膨胀率很高,聚合算子经常会出现背压情况,导致消费性能下降。在优化之后,数据膨胀率大大降低,但是仍然存在。而膨胀率取决于字段去重(Distinct)类规则类规则的数量,膨胀率是可预知且可控的。目前在B站大数据平台中,除了特殊业务外,基本没用使用该类规则的情况,但仍需要通过监控该规则使用情况,适时增加资源或规则下线方式保障任务稳定性。Influxdb proxy方案为了更好的适配和处理读写请求和Influxdb之间的联系,我们引入了Influxdb proxy代理服务。后端的Influxdb集群包含多组实例,每一组实例中的数据都不分片,只互为备份,最终一致性由proxy双写来保证。双写过程中,如有写入失败,Influxdb proxy将失败请求内容,记录到本地文件中并进行重试,直至成功写入。每一个Influxdb实例节点包含分配到该实例的完整的全量表和CQ表数据。优化读请求查询时,proxy会选择后端的最优Influxdb节点(数据完整性、节点性能)来查询。如果后端的Influxdb节点都有问题情况时,那么这次查询会降级。优化写请求由于实时DQC写入数据量大且极为频繁,所以为了减少网络IO流量开销,Influxdb proxy使用gzip压缩、批次写入的方式提升写入性能。在Influxdb proxy上线后,由于实际运行时,实时写入的数据量十分大,导致网络IO一直处于峰值状态。经过排查,发现原本数据输入都是一次请求包含5000+的完整Influxdb数据插入语句,且大多都是相同db和相同measurement,同时每条插入语句有太多的无效tag。随即我们优化的数据写入的接口协议,每次请求只能写入同一个db,且把相同的measurement抽出来,同时删除插入语句中无效tag。在上述改造之后,网络IO流量开销有了明显的改善和下降。运维保障在Influxdb proxy中,引入了Prometheus监控,实时监控读写请求的qps、写入的db,measurement数量和分布情况等,更好的加强相关资源的利用。支持对后端的Influxdb集群管理,包括节点数据同步,新节点加入、删除,节点数据恢复等。Influxdb方案全量表在中小型Topic的处理上,我们选择落库全部数据,每个Topic都会单独存储为一张Influxdb的measurement。measurement的结构如下所示:time:数据时间subtask:业务默认字段,消费的flink子任务号sinknum:业务默认字段,子任务消费顺序扩展tag:topic字段,做索引用record_num:业务默认字段,值为1,用于计算行数类规则扩展field:topic字段业务默认字段:在全量表的设计中,我们额外增加了两个tag字段:subtask和sinknum。由于Influxdb的特性,当两条记录的时间和tag完全相同时,此时后写入的记录会覆盖前一条。当计算表行数规则时,这种情况可能导致误告。因此,为了保障数据可以写入,我们增加了subtask、sinknum两个字段,subtask使用的flink上下文中的子任务号,sinknum我们使用环形数组实现,取值范围是1-200,用于确保每条记录都是独一无二的,不会发生覆盖的问题。TTL时间:全量表的作用,主要是为CQ表做准备。Influxdb会定时读取全量表,根据质量规则进行数据聚合,并将聚合结果写入到CQ表。而实时DQC的特性决定了已经被计算进入CQ表的全量表数据便已经过期了。因此,为了避免数据条数、序列数等资源无限制增长,全量表的TTL时间我们设计为1小时,超过1小时的全量表记录会被Influxdb删除。Tag和Field的关系:tag在Influxdb中是被作为索引使用的,通常用来作为查询条件使用,而field则是在某个时间点产生的事件的具体值。在Influxdb中,一次查询如果没有查询任何一个field字段,那么这次查询是没有意义的,不会返回任何结果。因为只有field才能反应那个时间点的信息。因此我们业务默认保留一个record_num字段作为field,表行数类规则会优先使用该字段。什么字段会被存储到Influxdb?一个Topic的字段数量无法确定,几个或上百个,都有可能。起初对这些字段,我们并没有筛选,选择全部落库,在运行过程中发现调用Influxdb-proxy服务屡屡发生异常。在问题定位中发现这样的异常现象,Influxdb-proxy机器网络带宽消耗大,CPU使用率高。最终结论是,Influxdb-proxy写入Influxdb时会将数据压缩,减少网络带宽消耗,提高写入性能,但由于写入数据的冗余信息过多,压缩过程导致CPU负载非常高。Influxdb-porxy机器由于负载过高,无法继续向外提供服务。因此,去除冗余数据使我们需要优化的点。经过业务分析,我们任务一个Topic中大部分字段,都是不需要落库的。以直播业务某TopicA(40个字段)为例,计算某个时间窗口下的质量规则:规则ID规则业务SOL需要存储的表行数select count(1) from TopicA无2来自IOS平台的记录数select count(1) from TopicA where platform = 'IOS'platform3来自安卓平台的不同mid数select count(distinct mid) from TopicA where platform = 'android'platform,mid根据规则发现,我们实际使用的字段只有platform和mid字段,其余字段对当前业务而言,都是冗余的。针对目前实时质量规则使用的现状,去除掉冗余字段,只存储必须的信息,网络带宽消耗可以减少90%以上,Influxdb-proxy的使用率一直维持在一个稳定的变化范围。在后续的使用中,该类异常再也未曾复现。什么字段会被应用为tag?tag在Influxdb中是作为索引被使用的,一条记录所有的tag被称为序列Series,根据机器性能的不同,每台机器可以承载的序列数量是有限的,序列数膨胀会造成Influxdb的读写性能急剧下降。刚上线初期,由于接入Topic时未进行严格的SOP流程,发生过这样一起事故。某Topic中包含XxxId字段,运维人员误认为该字段是某字典信息,取值范围是有限的。因此直接作为Tag字段进行接入,结果该字段是高基数的,短时间造成Influxdb序列数急速膨胀到300W,Influxdb几乎不可对外提供服务。在总结会议上,为避免此类情况再次发生,我们制定了Topic接入SOP,对Tag字段进行严格校验。由于Tag字段的选取对Influxdb的稳定性十分重要。我们是如何判断什么字段会被应用为tag?我们认为tag字段的使用需要满足以下两点:where子句中被用作过滤条件,如 where platform = 'ios' 子句中,platform字段会被选中继续进行筛选字段取值范围可枚举,不存在高基,如 platform 字段的取值范围在 ios、android、web中,符合tag字段的选取特征。在上面的例子中,platform字段会被应用为tag,而mid字段由于存在高基,会被应用为field。实际应用中,不能作为tag又需要使用的字段,都会被存储为field。新增规则,字段如何扩展?类似mongodb这种非结构化数据库,Influxdb中的measurement也不需要预先定义schema,拥有良好的扩展性。当新增字段时,只需要在写入语句中指定新增字段与字段值即可完成字段新增。CQ表CQ表存储的是实时数据根据业务规则聚合后的结果,它的结构简单且固定,如下图所示:time:计算窗口开始时间rule_id:质量规则IDvalue:时间窗口内质量规则的计算结果CQ的表数据来源有两种:全量表定时汇聚写入:依靠Influxdb自身提供的基础能力Continuous Query实现,依靠该能力,Influxdb能够对实时数据自动周期运行查询,并将查询结果写入指定的CQ表中,目前Continuous Query运行 99分位耗时
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2025-1-11 06:05 , Processed in 0.686835 second(s), 25 queries .

Powered by Discuz! X3.5

© 2001-2025 Discuz! Team.

快速回复 返回顶部 返回列表