|
这是第344篇不掺水的,想要了解更多,请戳下方卡片关注我们吧~问题背景在大数据行业内,尤其是数仓建设中,一直有一个绕不开的难题,就是大表的分析计算(这里的大表指亿级以上)。特别是大表之间的 Join 分析,对任何公司数据部门都是一个挑战!主要有以下挑战:由于数据量大,分析计算时会耗费更多 CPU、内存和 IO,占用大量的集群资源。由于数据量大,分析计算过程缓慢,挤占其它任务资源使用,从而影响数仓整体任务产出时间。由于数据量大,长时间占用资源,会造成该任务在时间、资源和财务各方面成本巨大。当前业内流行的优化方案1.增加集群资源优点:简单粗暴,对业务和数据开发人员友好,不用调整。缺点:费钱,看你公司是否有钱。2.采用增量计算优点:可以在不大幅增加计算集群成本的情况下,完成日常计算任务。缺点:对数据和业务都有一定要求,数据一般要求是日志类数据。或者具有一定的生命周期数据(历史数据可归档)。问题场景和 Spark 算法分析Spark 经典算法 SortMergeJoin(以大表间的 Join 分析为例)。对两张表分别进行 Shuffle 重分区,之后将相同Key的记录分到对应分区,每个分区内的数据在 Join 之前都要进行排序,这一步对应 Exchange 节点和 Sort 节点。也就是 Spark 的 Sort Merge Shuffle 过程。遍历流式表,对每条记录都采用顺序查找的方式从查找表中搜索,每遇到一条相同的 Key 就进行 Join 关联。每次处理完一条记录,只需从上一次结束的位置开始继续查找。该算法也可以简化流程为: Map 一> Shuffle 一> Sort 一> Merge 一> Reduce该算法的性能瓶颈主要在 Sort Merge Shuffle 阶段(红色流程部分),数据量越大,资源要求越高,性能越低。大表问题思考大数据计算优化思路,核心无非就三条:增加计算资源;减少被计算数据量;优化计算算法。其中前两条是我们普通人最常用的方法。两个大表的 Join ,是不是真的每天都有大量的数据有变更呢?如果是的话,那我们的业务就应该思考一下是否合理了。其实在我们的日常实践场景中,大部分是两个表里面的数据每天只有少量(十万百万至千万级)数据随机变化,大部分数据是不变的。说到这里,很多人的第一想法是,我们增加分区,按数据是否有变化进行区分,计算有变化的(今日有更新的业务数据),合并未变化的(昨日计算完成的历史数据),不就可以解决问题了。其实这个想法存在以下问题:由于每个表的数据是随机变化的,那就存在,第一个表中变化的数据在第二个表中是未变的,反之亦然(见图片示例)。并且可能后续计算还有第三个表、第四个表等等呢?这种分区是难以构建的。变化的数据如果是百万至千万级,那这里也是一个较大规模的数据量了,既要关联计算变化的,也要关联计算未变化的,这里的计算成本也很大。问题读到这里,如果我们分别把表 A、表 B 的有变化记录的关联主键取出来合并在一起,形成一个数组变量。计算的时候用这个变量分别从表 A 和表 B 中过滤出有变化的数据进行计算,并从未变化的表(昨日计算完成的历史数据)中过滤出不存在的(即未变化历史结果数据)。这样两份数据简单合并到一起,不就是表 A 和表 B 全量 Join 计算的结果了吗!那什么样的数组可以轻易的存下这百万千万级的数据量呢?我们第一个想到的答案: 布隆过滤器!使用布隆过滤器的优化方案构建布隆过滤器:分别读取表 A 和表 B 中有变化的数据的关联主键。使用布隆过滤器:分别过滤表 A 和表 B 中的数据(即关联主键命中布隆过滤器),然后进行 join 分析。使用布隆过滤器:从未变化的表(昨日计算完成的历史数据)中过滤出数据(即没有命中布隆过滤器)。合并 2、 3 步骤的数据结果。也许这里有人会有疑惑,不是说布隆过滤器是命中并不代表一定存在,不命中才代表一定不存在!其实这个命中不代表一定存在,是一个极少量概率问题,即极少量没有更新的数据也会命中布隆过滤器,从而参与了接下来的数据计算,实际上只要所有变化的数据能命中即可。这个不影响它已经帮我买过滤了绝大部分不需要计算的数据。回看我们的 Spark 经典算法 SortMergeJoin,我们可以看出,该方案是在 Map 阶段就过滤了数据,大大减少了数据量的,提升了计算效率,减少了计算资源使用!Spark 函数 Java 代码实现大家可以根据需要参考、修改和优化,有更好的实现方式欢迎大家分享交流。程序流程图Spark 函数 Java 代码实现。packageorg.example;importorg.apache.curator.shaded.com.google.common.hash.BloomFilter;importorg.apache.curator.shaded.com.google.common.hash.Funnels;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileStatus;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.spark.sql.api.java.*;importorg.apache.spark.SparkConf;importjava.io.BufferedReader;importjava.io.IOException;importjava.io.InputStream;importjava.io.InputStreamReader;importjava.nio.charset.StandardCharsets;importjava.util.concurrent.ConcurrentHashMap;importorg.apache.lucene.util.RamUsageEstimator;/***addbychengwansheng*/classMyBloomFilter{privateBloomFilterbloomFilter;publicMyBloomFilter(BloomFilterb){bloomFilter=b;}publicBloomFiltergetBloomFilter(){returnbloomFilter;}}publicclassBloomUdfimplementsUDF2{//最大记录限制,安全起见privatestaticintmaxSize=50000000;//布隆过滤器是否开启配置,1开启,0关闭privatestaticintudfBloomFilterEnable;//布隆过滤器是否开启参数,默认开启privatestaticStringbloomFilterConfKey="spark.myudf.bloom.enable";//加配置配置参数,目前不起作用static{SparkConfsparkConf=newSparkConf();udfBloomFilterEnable=sparkConf.getInt(bloomFilterConfKey,1);System.out.println("thespark.myudf.bloom.enablevalue"+udfBloomFilterEnable);}//布隆过滤器列表,支持多个布隆过滤器privatestaticConcurrentHashMapbloomFilterMap=newConcurrentHashMap();/***布隆过滤器核心构建方法*通过读取表的hdfs文件信息,构建布隆过滤器*一个jvm只加载一次*@paramkey*@parampath*@throwsIOException*/privatesynchronizedstaticvoidbuildBloomFilter(Stringkey,Stringpath)throwsIOException{if(!bloomFilterMap.containsKey(key)){BloomFilterbloomFilter;Configurationconf=newConfiguration();FileSystemhdfs=FileSystem.get(conf)athpathDf=newPath(path);FileStatus[]stats=hdfs.listStatus(pathDf);//获取记录总数longsum=0;for(inti=0;imaxSize){//如果数据量大于期望值,则将布隆过滤器置空(即布隆过滤器不起作用)System.out.println("themaxnumberis"+maxSize+",buttargetnumistoobig,the"+key+"bloomwillbeinvalid");bloomFilter=null;}else{//默认1000W,超过取样本数据2倍的量。这里取2倍是为了提高布隆过滤器的效果,2倍是一个比较合适的值longexceptSize=sum*2>10000000sum*2:10000000;bloomFilter=BloomFilter.create(Funnels.stringFunnel(StandardCharsets.UTF_8),(int)exceptSize);for(inti=0;i='2023-04-22'unionallselectitem_idfromdefault.Bwhereupdate_time>='2023-04-22')wherelength(item_id)>0groupbyitem_id;--增量数据计算insertoverwritetabledefault.otpartition(pt='20230422')selectB.item_id,B.sku_id,B.sku_price,A.item_pricefromdefault.Bleftjoindefault.Aon(A.item_id=B.item_idandbloom_filter(A.item_id,"tmp.tmp_primary_key"))wherebloom_filter(B.item_id,"tmp.tmp_primary_key")unionall--合并历史未变更数据selectitem_id,sku_id,sku_price,item_pricefromdefault.otwherenotbloom_filter(item_id,"tmp.tmp_primary_key")andpt='20230421'从上面代码可以看出,使用布隆过滤器的 SQL,核心业务逻辑代码只是在原来全量计算的逻辑中增加了过滤条件而已,使用起来还是比较方便的。实测效果以我司的 “dim.dim_itm_sku_info_detail_d” 和 “dim.dim_itm_info_detail_d” 任务为例,使用引擎 Spark2。总结从理论分析和实测效果来看,使用布隆过滤器的解决方案可以大幅提升任务的性能,并减少集群资源的使用。该方案不仅适用大表间 Join 分析计算,也适用大表相关的其它分析计算需求,核心思想就是计算有必要的数据,排除没必要数据,减小无效的计算损耗。看完两件事如果你觉得这篇内容对你挺有启发,我想邀请你帮我两件小事1.点个「在看」,让更多人也能看到这篇内容(点了「在看」,bug -1 )2.关注公众号「政采云技术」,持续为你推送精选好文招贤纳士政采云技术团队(Zero),Base 杭州,一个富有激情和技术匠心精神的成长型团队。规模 500 人左右,在日常业务开发之外,还分别在云原生、区块链、人工智能、低代码平台、中间件、大数据、物料体系、工程平台、性能体验、可视化等领域进行技术探索和实践,推动并落地了一系列的内部技术产品,持续探索技术的新边界。此外,团队还纷纷投身社区建设,目前已经是 google flutter、scikit-learn、Apache Dubbo、Apache Rocketmq、Apache Pulsar、CNCF Dapr、Apache DolphinScheduler、alibaba Seata 等众多优秀开源社区的贡献者。如果你想改变一直被事折腾,希望开始折腾事;如果你想改变一直被告诫需要多些想法,却无从破局;如果你想改变你有能力去做成那个结果,却不需要你;如果你想改变你想做成的事需要一个团队去支撑,但没你带人的位置;如果你想改变本来悟性不错,但总是有那一层窗户纸的模糊……如果你相信相信的力量,相信平凡人能成就非凡事,相信能遇到更好的自己。如果你希望参与到随着业务腾飞的过程,亲手推动一个有着深入的业务理解、完善的技术体系、技术创造价值、影响力外溢的技术团队的成长过程,我觉得我们该聊聊。任何时间,等着你写点什么,发给 zcy-tc@cai-inc.com
|
|