找回密码
 会员注册
查看: 14|回复: 0

Flink滑动窗口优化

[复制链接]

2万

主题

0

回帖

7万

积分

超级版主

积分
72374
发表于 2024-10-6 10:01:43 | 显示全部楼层 |阅读模式
文 ?杨诗旻 on 大数据一、前言Flink 的窗口功能非常强大,因为要支持各种各样的窗口,像滑动窗口和滚动窗口这样的对齐窗口,像会话窗口这样的非对齐窗口,复杂度也会比较高。其中在超长滑动窗口的性能上也不尽如人意。这篇文章首先会阐述为什么在超长滑动窗口下 Flink 的性能会降级的很严重,以及在有赞我们是如何解决这个问题的。此外,在优化中并没有去兼顾 Evictor 的逻辑,因为在业务中并没有相应的需求。二、Flink 滑动窗口的实现Flink Window 算子的整体概念如下图所示,可以看到有几个重要的部分,首先是 WindowAssigner 和 Trigger,还有 Evaluation Function (也就是用户定义的对窗口内元素的计算方式),Evictor 与本次优化关系不大,不会展开。2.1 WindowAssignerWindowAssigner 抽象类有一个关键的抽象方法就是 assignWindows,根据一个元素分配它所属的窗口。// 返回该元素应该被分配的窗口的集合。public abstract Collection assignWindows(T element, long timestamp, WindowAssignerContext context);对于滑动窗口来说,如下图所示,每个元素可能属于多个窗口。这个数量也可以由 n = 窗口长度 / 滑动步长 计算出来,范围在[n, n+1]。此处使用 SlidingEventTimeWindows 来举例,它的 assignWindows 实现如下,针对输入的元素和它绑定的 timestamp,计算出该元素所属的每一个窗口。@Override public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { List windows = new ArrayList((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } }2.2 Trigger抽象类对于 Trigger 来说,有三个很重要的方法,如下:// 针对流入的每一个元素的每一个所属的窗口,会调用该方法,返回 Trigger 结果。 public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception; // 当处理时间到达了注册的计时器的时间时,会调用该方法,返回 Trigger 结果。 public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception; // 当事件时间到达了注册的计时器的时间时,会调用该方法,返回 Trigger 结果。 public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;而 EventTimeTrigger 的实现如下,当 onElement 被调用时,如果 watermark 已经超过了 window 的话会直接返回 Fire,不然的话会将 window 的最大时间戳注册到定时器中。 当 onEventTime 被调用时,会比较这个定时器注册的时间和当前 window 最大时间戳,只有等于的情况下才会 Fire。因为是事件时间的 Trigger,所以 onProcessingTime 没有作为,事实上这个 Code Path 也不应该被调用到。@Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() > 1 并且 (L - m) >> 1,就变成L * m / 2 + (L - m) / 2K + m * a前后的差值约为L / 2 * ( 2 * m * a - m - (L - m) / 2K) - L - ma假设 a 分别为 1 和 100L / 2 * (m - (L - m) / 2K) - L - mL / 2 * (99 * m - (L - m) / 2K) - L - 100 * m可以看出来在 L 比较大的情形下, a 越大,m 越大读成本优化越明显。3.4 其他场景接下来从读的角度举两个比较极端的例子(不考虑缓存命中的情况下):首先是一些变量的命名,当窗口长度与滑动步长的比值为 n 时,假设总共处理的时间为 m 个窗口步长,一个窗口步长的时间内总共处理的数据 T,key 的数量 K。例子一:假设这些 Key 全部都平均分布到了各个步长中.使用 Flink 本身的滑动窗口来处理的话,当所有消息刚进入时,因为要更新状态,所以会有 n * m * T 的读取。最后当输出的时候会对每个 key 和 Window 对触发,也就是 m * K 的状态读取,总共加起来就是 n * m * T + m * K 次读取。而当使用重叠窗口的方法优化时,每条消息消息进入系统,只会对重叠窗口进行一次更新,也就是 m * T 次读取。而当每个 key 和 window 对的触发输出,都会有 n 个重叠窗口的状态读取,也就是 n * m * K,总共就是 n * m * K + m * T。将优化前读取次数减去优化后,可以得到的是Delta = n * m * (T - K) + m * (K - T) = m * (n - 1) * (T - K) ≈ m * n * (T - K)因为前面的假设这些 Key 全部平分到了各个步长中,所以 T 是大于 K 的,优化后读取次数明显减少。例子二:现实中的另一种情况就是这些 key 是稀疏的,那上述的公式也就不成立了。此处假设变了,每个 key 只会出现在 1 个步长中,还可以知道 1 << n。过程这里就不赘述了,最后 flink 自带的滑窗,至多会有 n * m * T + n * K,而对于优化之后,则变成了 m * T + n * n * K。单看后半部分,已经随着 n 做平方级的增长了,看起来性能会有很大的下滑。但是可以将这儿再拆一下,变成 m * T + n * (1 + (n - 1)) * K,此处的意义在与在每次窗口触发输出是,只有 1 个重叠窗口是有值得,另外 (n - 1) 个重叠窗口是空的,根据 RocksDB 的设计,可以借助到 Bloom Filter。Bloom Filter判断元素不在集合,那肯定不在。如果判断元素存在集合中,有一定的概率判断错误。而其中的 (n - 1) 个重叠窗口读的大部分都能够被 BloomFilter 给过滤掉,成本很低。而 1 又远小于 n。如果我们引入一个系数,这个系数 L 表示的是经过 Bloom Filter 优化后对空的重叠窗口的读成本的比例的话(L 的系数比较难以确定),所以最后得到的公式是 m * T + n * K + n * (n - 1) * K * L 约等于 m * T + n * K + n * n * K * L。将优化前减去优化后,得到的是(n - 1) * m * T - n * n * K * L ≈ n * (m * T - n * K * L)m * T 是总数据量, m * T > K 是确定的,n * L 同时取决于比值和 Bloom Filter的效率。现实的情况往往是例子一和例子二的综合,有热点,大部分又是稀疏的,再加上又有缓存的存在,并且优化后总的状态的量被减少了很多,再加上写被大大减少了,所以总体性能是有提升的,但是也会取决于用户的场景。针对问题二,其实并没有特别好的优化方案。但是因为用户在用 Flink 的 SQL 来做实时任务时,其实大部分情况下是不会配置 allowLateness 的,也就是说输出和清理状态可以用同一个定时器来触发,所以当判断到 allowLateness 为 0 时,只注册一个定时器,可以将定时器的写入成本最多降低到一半。3.5 相关工作Flink 社区也曾经提出过类似的方案(FLINK-7001),但是没有合入. 对于这种长窗口的情况,这并不是一个最优雅的解决方案,只能算是一个 Work Around。对于业务方来说,真正的诉求应该是实时的获取截止到当前的一个时间段内的统计数据。在有赞, 除了在 Flink 引擎做优化意外,目前还在朝着两个方向:借助 Druid 这个预聚合的实时 olap 解决方案;利用 Flink 实现实时指标服务, 基本思想也是预聚合的思想,将细粒度结果存储在在线存储中,做读时聚合.Druid 方案目前还存在着整点读 RT 抖动和平均 RT 过长的问题,对于 RT 敏感的应用场景,没办法很好满足.实时指标服务方案还在构建中, 初步想法是抽象实时指标转化为 Flink 实时任务,将细粒度实时指标结果写入 在线存储(HBase, Redis)中,最后做读时聚合. 方案的最终效果, 还要待上线后验证。四、小结这次优化事实上效果会取决多个因素,从线上实际效果看,视重叠系数的不同,性能上有 3~8 倍的优化.最后打个小广告,有赞大数据团队基础设施团队,主要负责有赞的数据平台(DP), 实时计算(Storm, Spark Streaming, Flink),离线计算(HDFS,YARN,HIVE, SPARK SQL),在线存储(HBase),实时 OLAP(Druid) 等数个技术产品,欢迎感兴趣的小伙伴联系 hefei@youzan.com参考FLINK-7001扩展阅读大数据开发平台(Data Platform)在有赞的最佳实践有赞数据仓库元数据系统实践How we redesigned the NSQ - 其他特性及未来计划HBase 写吞吐场景资源消耗量化分析及优化有赞大数据平台安全建设实践Flink 在有赞实时计算的实践SparkSQL 在有赞的实践Druid 在有赞的实践实时计算在有赞的实践-效率提升之路DataX在有赞大数据平台的实践-The End-Vol.183有赞技术团队为 442 万商家,150 个行业,330 亿电商交易额提供技术支持微商城|零售|美业 | 教育微信公众号:有赞coder ? ?微博:@有赞技术技术博客:tech.youzan.comThe bigger the dream,?the more important the team.
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2025-1-11 13:00 , Processed in 0.442277 second(s), 26 queries .

Powered by Discuz! X3.5

© 2001-2025 Discuz! Team.

快速回复 返回顶部 返回列表