|
Li Wanghong本文首先介绍了 Disruptor 高性能内存队列的基本概念、使用 Demo、高性能原理及源码分析,最后通过两个例子介绍了 Disruptor 在i主题业务中的应用。一、i主题及 Disruptor 简介i主题是 vivo 旗下的一款主题商店 app,用户可以通过下载主题、壁纸、字体等,实现对手机界面风格的一键更换和自定义。Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能的内存队列(用于系统内部线程间传递消息,不同于 RocketMQ、Kafka 这种分布式消息队列),基于 Disruptor 开发的系统单线程能支撑每秒600万订单。目前,包括 Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了 Disruptor 以获取高性能。在 vivo 内部它也有不少应用,比如自定义监控中使用 Disruptor 队列来暂存通过监控 SDK 上报的监控数据,i主题中也使用它来统计本地内存指标数据。接下来从 Disruptor 和 JDK 内置队列的对比、Disruptor 核心概念、Disruptor 使用Demo、Disruptor 核心源码、Disruptor 高性能原理、Disruptor 在 i主题业务中的应用几个角度来介绍 Disruptor。二、和 JDK 中内置的队列对比下面来看下 JDK 中内置的队列和 Disruptor 的对比。队列的底层实现一般分为三种:数组、链表和堆,其中堆一般是为了实现带有优先级特性的队列,暂不考虑。另外,像ConcurrentLinkedQueue 、LinkedTransferQueue属于无界队列,在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列。这样 JDK 中剩下可选的线程安全的队列还有ArrayBlockingQueue和LinkedBlockingQueue。由于 LinkedBlockingQueue 是基于链表实现的,由于链表存储的数据在内存里不连续,对于高速缓存并不友好,而且 LinkedBlockingQueue 是加锁的,性能较差。ArrayBlockingQueue 有同样的问题,它也需要加锁,另外,ArrayBlockingQueue 存在伪共享问题,也会导致性能变差。而今天要介绍的 Disruptor 是基于数组的有界无锁队列,符合空间局部性原理,可以很好的利用 CPU 的高速缓存,同时它避免了伪共享,大大提升了性能。三、Disruptor 核心概念如下图,从数据流转的角度先对 Disruptor 有一个直观的概念。Disruptor 支持单(多)生产者、单(多)消费者模式。消费时支持广播消费(HandlerA 会消费处理所有消息,HandlerB 也会消费处理所有消息)、集群消费(HandlerA 和 HandlerB 各消费部分消息),HandlerA 和HandlerB 消费完成后会把消息交给 HandlerC 继续处理。下面结合 Disruptor 官方的架构图介绍下 Disruptor 的核心概念:RingBuffer:前文说 Disruptor 是一个高性能内存内存队列,而 RingBuffer 就是该内存队列的数据结构,它是一个环形数组,是承载数据的载体。Producer:Disruptor 是典型的生产者消费者模型。因此生产者是 Disruptor 编程模型中的核心组成,可以是单生产者,也可以多生产者。Event:具体的数据实体,生产者生产 Event ,存入 RingBuffer,消费者从 RingBuffer 中消费它进行逻辑处理。Event Handler:开发者需要实现 EventHandler 接口定义消费者处理逻辑。Wait Strategy:等待策略,定义了当消费者无法从 RingBuffer 获取数据时,如何等待。Event Processor:事件循环处理器,EventProcessor 继承了 Runnable 接口,它的子类实现了 run 方法,内部有一个 while 循环,不断尝试从 RingBuffer 中获取数据,交给 EventHandler 去处理。Sequence:RingBuffer 是一个数组,Sequence (序号)就是用来标记生产者数据生产到哪了,消费者数据消费到哪了。Sequencer:分为单生产者和多生产者两种实现,生产者发布数据时需要先申请下可用序号,Sequencer 就是用来协调申请序号的。Sequence Barrier:见下文分析。四、Disruptor使用Demo4.1 定义 EventEvent 是具体的数据实体,生产者生产 Event ,存入 RingBuffer,消费者从 RingBuffer 中消费它进行逻辑处理。Event 就是一个普通的 Java 对象,无需实现 Disruptor 内定义的接口。public class OrderEvent { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; }}4.2 定义 EventFactory用于创建 Event 对象。public class OrderEventFactory implements EventFactory { public OrderEvent newInstance() { return new OrderEvent(); }}4.3 定义生产者可以看到,生成者主要是持有 RingBuffer 对象进行数据的发布。这里有几个点需要注意:RingBuffer 内部维护了一个 Object 数组(也就是真正存储数据的容器),在 RingBuffer 初始化时该 Object 数组就已经使用 EventFactory 初始化了一些空 Event,后续就不需要在运行时来创建了,提高性能。因此这里通过 RingBuffer 获取指定序号得到的是一个空对象,需要对它进行赋值后,才能进行发布。这里通过 RingBuffer 的 next 方法获取可用序号,如果 RingBuffer 空间不足会阻塞。通过 next 方法获取序号后,需要确保接下来使用 publish 方法发布数据。public class OrderEventProducer { private RingBuffer ringBuffer; public OrderEventProducer(RingBuffer ringBuffer) { this.ringBuffer = ringBuffer; } public void sendData(ByteBuffer data) { // 1、在生产者发送消息的时候, 首先需要从我们的ringBuffer里面获取一个可用的序号 long sequence = ringBuffer.next(); try { //2、注意此时获取的OrderEvent对象是一个没有被赋值的空对象 OrderEvent event = ringBuffer.get(sequence); //3、进行实际的赋值处理 event.setValue(data.getLong(0)); } finally { //4、 提交发布操作 ringBuffer.publish(sequence); } }}4.4 定义消费者消费者可以实现 EventHandler 接口,定义自己的处理逻辑。public class OrderEventHandler implements EventHandler { public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println("消费者: " + event.getValue()); }}4.5 主流程首先初始化一个 Disruptor 对象,Disruptor 有多个重载的构造函数。支持传入 EventFactory 、ringBufferSize (需要是2的幂次方)、executor(用于执行EventHandler 的事件处理逻辑,一个 EventHandler 对应一个线程,一个线程只服务于一个 EventHandler )、生产者模式(支持单生产者、多生产者)、阻塞等待策略。在创建 Disruptor 对象时,内部会创建好指定 size 的 RingBuffer 对象。定义 Disruptor 对象之后,可以通过该对象添加消费者 EventHandler。启动 Disruptor,会将第2步添加的 EventHandler 消费者封装成 EventProcessor(实现了 Runnable 接口),提交到构建 Disruptor 时指定的 executor 对象中。由于 EventProcessor 的 run 方法是一个 while 循环,不断尝试从RingBuffer 中获取数据。因此可以说一个 EventHandler 对应一个线程,一个线程只服务于一个EventHandler。拿到 Disruptor 持有的 RingBuffer,然后就可以创建生产者,通过该RingBuffer就可以发布生产数据了,然后 EventProcessor 中启动的任务就可以消费到数据,交给 EventHandler 去处理了。public static void main(String[] args) { OrderEventFactory orderEventFactory = new OrderEventFactory(); int ringBufferSize = 4; ExecutorService executor = Executors.newFixedThreadPool(1); /** * 1. 实例化disruptor对象 1) eventFactory: 消息(event)工厂对象 2) ringBufferSize: 容器的长度 3) executor: 4) ProducerType: 单生产者还是多生产者 5) waitStrategy: 等待策略 */ Disruptor disruptor = new Disruptor(orderEventFactory, ringBufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy()); // 2. 添加消费者的监听 disruptor.handleEventsWith(new OrderEventHandler()); // 3. 启动disruptor disruptor.start(); // 4. 获取实际存储数据的容器: RingBuffer RingBuffer ringBuffer = disruptor.getRingBuffer(); OrderEventProducer producer = new OrderEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long i = 0; i eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy){ this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);}接下来分析 RingBuffer 的创建过程,分为单生产者与多生产者。public static RingBuffer create( ProducerType producerType, EventFactory factory, int bufferSize, WaitStrategy waitStrategy){ switch (producerType){ case SINGLE: // 单生产者 return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: // 多生产者 return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); }}不论是单生产者还是多生产者,最终都会创建一个 RingBuffer 对象,只是传给 RingBuffer 的 Sequencer 对象不同。可以看到,RingBuffer 内部最终创建了一个Object 数组来存储 Event 数据。这里有几点需要注意:RingBuffer 是用数组实现的,在创建该数组后紧接着调用 fill 方法调用 EventFactory 工厂方法为数组中的元素进行初始化,后续在使用这些元素时,直接通过下标获取并给对应的属性赋值,这样就避免了 Event 对象的反复创建,避免频繁 GC。RingBuffe 的数组中的元素是在初始化时一次性全部创建的,所以这些元素的内存地址大概率是连续的。消费者在消费时,是遵循空间局部性原理的。消费完第一个Event 时,很快就会消费第二个 Event,而在消费第一个 Event 时,CPU 会把内存中的第一个 Event 的后面的 Event 也加载进 Cache 中,这样当消费第二个 Event时,它已经在 CPU Cache 中了,所以就不需要从内存中加载了,这样可以大大提升性能。public static RingBuffer createSingleProducer( EventFactory factory, int bufferSize, WaitStrategy waitStrategy){ SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy); return new RingBuffer(factory, sequencer);}RingBufferFields( EventFactory eventFactory, Sequencer sequencer){ // 省略部分代码... // 额外创建2个填充空间的大小, 首尾填充, 避免数组的有效载荷和其它成员加载到同一缓存行 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; fill(eventFactory);} private void fill(EventFactory eventFactory){ for (int i = 0; i handleEventsWith( final EventHandler... handlers){ // 通过disruptor对象直接调用handleEventsWith方法时传的是空的Sequence数组 return createEventProcessors(new Sequence[0], handlers);}EventHandlerGroup createEventProcessors( final Sequence[] barrierSequences, final EventHandler[] eventHandlers) { // 收集添加的消费者的序号 final Sequence[] processorSequences = new Sequence[eventHandlers.length]; // 本批次消费由于添加在同一个节点之后, 因此共享该屏障 final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); // 为每个EventHandler创建一个BatchEventProcessor for (int i = 0, eventHandlersLength = eventHandlers.length; i eventHandler = eventHandlers[i]; final BatchEventProcessor batchEventProcessor = new BatchEventProcessor(ringBuffer, barrier, eventHandler); if (exceptionHandler != null){ batchEventProcessor.setExceptionHandler(exceptionHandler); } // 添加到消费者信息仓库中 consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } // 更新网关序列(生产者只需要关注所有的末端消费者节点的序列) updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup(this, consumerRepository, processorSequences);}创建完 Disruptor 对象之后,可以通过 Disruptor 对象添加 EventHandler,这里有一需要注意:通过 Disruptor 对象直接调用 handleEventsWith 方法时传的是空的 Sequence 数组,这是什么意思?可以看到 createEventProcessors 方法接收该空 Sequence 数组的字段名是 barrierSequences,翻译成中文就是栅栏序号。怎么理解这个字段?比如通过如下代码给 Disruptor 添加了两个handler,记为 handlerA 和 handlerB,这种是串行消费,对于一个 Event,handlerA 消费完后才能轮到 handlerB 去消费。对于 handlerA 来说,它没有前置消费者(生成者生产到哪里,消费者就可以消费到哪里),因此它的 barrierSequences 是一个空数组。而对于 handlerB 来说,它的前置消费者是 handlerA,因此它的 barrierSequences 就是A的消费进度,也就是说 handlerB 的消费进度是要小于 handlerA 的消费进度的。disruptor.handleEventsWith(handlerA).handleEventsWith(handlerB);如果是通过如下方式添加的 handler,则 handlerA 和handlerB 会消费所有 Event 数据,类似 MQ 消息中的广播消费,而 handlerC 的 barrierSequences 数组就是包含了 handlerA 的消费进度和 handlerB 的消费进度,这也是为什么 barrierSequences 是一个数组,后续 handlerC 在消费数据时,会取A和B消费进度的较小值进行判断,比如A消费到进度6,B消费到进度4,那么C只能去消费下标为3的数据,这也是 barrierSequences 的含义。disruptor.handleEventsWith(handlerA, handlerB).handleEventsWith(handlerC);5.3 启动 DisruptorDisruptor的启动逻辑比较简洁,就是遍历consumerRepository 中收集的 EventProcessor(实现了Runnable接口),将它提交到创建 Disruptor 时指定的executor 中,EventProcessor 的 run 方法会启动一个while 循环,不断尝试从 RingBuffer 中获取数据进行消费。disruptor.start();public RingBuffer start() { checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); } return ringBuffer;} public void start(final Executor executor) { executor.execute(eventprocessor);}5.4 发布数据在分析 Disruptor 的发布数据的源码前,先来回顾下发布数据的整体流程。调用 next 方法获取可用序号,该方法可能会阻塞。通过上一步获得的序号从 RingBuffer 中获取对应的 Event,因为 RingBuffer 中所有的 Event 在初始化时已经创建好了,这里获取的只是空对象。因此接下来需要对该空对象进行业务赋值。调用 next 方法需要在 finally 方法中进行最终的发布,标记该序号数据已实际生产完成。public void sendData(ByteBuffer data) { long sequence = ringBuffer.next(); try { OrderEvent event = ringBuffer.get(sequence); event.setValue(data.getLong(0)); } finally { ringBuffer.publish(sequence); }}5.4.1 获取序号next 方法默认申请一个序号。nextValue 表示已分配的序号,nextSequence 表示在此基础上再申请n个序号(此处n为1),cachedValue 表示缓存的消费者的最小消费进度。假设有一个 size 为8的 RingBuffer,当前下标为6的数据已经发布好(nextValue为6),消费者一直未开启消费(cachedValue 和cachedGatingSequence 为-1),此时生产者想继续发布数据,调用 next() 方法申请获取序号为7的位置(nextSequence为7),计算得到的 wrapPoint 为7-8=-1,此时 wrapPoint 等于cachedGatingSequence,可以继续发布数据,如左图。最后将 nextValue 赋值为7,表示序号7的位置已经被生产者占用了。接着生产者继续调用 next() 方法申请序号为0的数据,此时 nextValue为7,nextSequence 为8,wrapPoint 等于0,由于消费者迟迟未消费(cachedGatingSequence为-1),此时 wrapPoint 大于了 cachedGatingSequence,因此 next 方法的if判断成立,会调用 LockSupport.parkNanos 阻塞等待消费者进行消费。其中 getMinimumSequence 方法是获取多个消费者的最小消费进度。public long next() { return next(1);}public long next(int n) { /** * 已分配的序号的缓存(已分配到这里), 初始-1. 可以看该方法的返回值nextSequence, * 接下来生产者就会该往该位置写数据, 它赋值给了nextValue, 所以下一次调用next方 * 法时, nextValue位置就是表示已经生产好了数据, 接下来要申请nextSequece的数据 */ long nextValue = this.nextValue; // 本次申请分配的序号 long nextSequence = nextValue + n; // 构成环路的点:环形缓冲区可能追尾的点 = 等于本次申请的序号-环形缓冲区大小 // 如果该序号大于最慢消费者的进度, 那么表示追尾了, 需要等待 long wrapPoint = nextSequence - bufferSize; // 上次缓存的最小网关序号(消费最慢的消费者的进度) long cachedGatingSequence = this.cachedValue; // wrapPoint > cachedGatingSequence 表示生产者追上消费者产生环路(追尾), 即缓冲区已满, // 此时需要获取消费者们最新的进度, 以确定是否队列满 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { // 插入StoreLoad内存屏障/栅栏, 保证可见性。 // 因为publish使用的是set()/putOrderedLong, 并不保证其他消费者能及时看见发布的数据 // 当我再次申请更多的空间时, 必须保证消费者能消费发布的数据 cursor.setVolatile(nextValue); long minSequence; // minSequence是多个消费者的最小序号, 要等所有消费者消费完了才能继续生产 while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { LockSupport.parkNanos(1L); } // 缓存生产者们最新的消费进度 this.cachedValue = minSequence; } // 这里只写了缓存, 并未写volatile变量, 因为只是预分配了空间但是并未被发布数据, // 不需要让其他消费者感知到。消费者只会感知到真正被发布的序号 this.nextValue = nextSequence; return nextSequence;}5.4.2 根据序号获取 Event直接通过 Unsafe 工具类获取指定序号的 Event 对象,此时获取的是空对象,因此接下来需要对该 Event 对象进行业务赋值,赋值完成后调用 publish 方法进行最终的数据发布。OrderEvent event = ringBuffer.get(sequence);public E get(long sequence) { return elementAt(sequence);}protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));}5.4.3 发布数据生产者获取到可用序号后,首先对该序号处的空 Event 对象进行业务赋值,接着调用 RingBuffer 的 publish 方法发布数据,RingBuffer 会委托给其持有的 sequencer(单生产者和多生产者对应不同的 sequencer)对象进行真正发布。单生产者的发布逻辑比较简单,更新下 cursor 进度(cursor 表示生产者的生产进度,该位置已实际发布数据,而 next 方法中的 nextSequence 表示生产者申请的最大序号,可能还未实际发布数据),接着唤醒等待的消费者。waitStrategy 有不同的实现,因此唤醒逻辑也不尽相同,如采用 BusySpinWaitStrategy 策略时,消费者获取不到数据时自旋等待,然后继续判断是否有新数据可以消费了,因此 BusySpinWaitStrategy 策略的 signalAllWhenBlocking 就是一个空实现,啥也不做。ringBuffer.publish(sequence); public void publish(long sequence) { sequencer.publish(sequence);}public void publish(long sequence) { // 更新生产者进度 cursor.set(sequence); // 唤醒等待的消费者 waitStrategy.signalAllWhenBlocking();}5.4.4 消费数据前面提到,Disruptor 启动时,会将封装 EventHandler 的EventProcessor(此处以 BatchEventProcessor 为例)提交到线程池中运行,BatchEventProcessor 的 run 方法会调用 processEvents 方法不断尝试从 RingBuffer 中获取数据进行消费,下面分析下 processEvents 的逻辑(代码做了精简)。它会开启一个 while 循环,调用 sequenceBarrier.waitFor 方法获取最大可用的序号,比如获取序号一节所提的,生产者持续生产,消费者一直未消费,此时生产者已经将整个 RingBuffer 数据都生产满了,生产者无法再继续生产,生产者此时会阻塞。假设这时候消费者开始消费,因此 nextSequence 为0,而availableSequence 为7,此时消费者可以批量消费,将这8条已生产者的数据全部消费完,消费完成后更新下消费进度。更新消费进度后,生产者通过 Util.getMinimumSequence 方法就可以感知到最新的消费进度,从而不再阻塞,继续发布数据了。private void processEvents() { T event = null; // sequence记录消费者的消费进度, 初始为-1 long nextSequence = sequence.get() + 1L; // 死循环,因此不会让出线程,需要独立的线程(每一个EventProcessor都需要独立的线程) while (true) { // 通过屏障获取到的最大可用序号 final long availableSequence = sequenceBarrier.waitFor(nextSequence); // 批量消费 while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } // 更新消费进度(批量消费, 每次消费只更新一次Sequence, 减少性能消耗) sequence.set(availableSequence); }}下面分析下 SequenceBarrier 的 waitFor 方法。首先它会调用 waitStrategy 的 waitFor 方法获取最大可用序号,以 BusySpinWaitStrategy 策略为例,它的 waitFor 方法的三个参数的含义分别是:sequence:消费者期望获得的序号,也就是当前消费者已消费的进度+1cursor:当前生产者的生成进度dependentSequence:消费者依赖的前置消费者的消费进度。该字段是在添加 EventHandler,创建 BatchEventProcessor时创建的。如果当前消费者没有前置依赖的消费者,那么它只需要关心生产者的进度,生产者生产到哪里,它就可以消费到哪里,因此 dependentSequence 就是 cursor。而如果当前消费者有前置依赖的消费者,那么dependentSequence就是FixedSequenceGroup(dependentSequences)。因为 dependentSequence 分为两种情况,所以 waitFor 的逻辑也可以分为两种情况讨论:当前消费者无前置消费者:假设 cursor 为6,也就是序号为6的数据已经发布了数据,此时传入的sequence为6,则waitFor方法可以直接返回availableSequence(6),可以正常消费。序号为6的数据消费完成后,消费者继续调用 waitFor 获取数据,传入的 sequence为7,而此时 availableSequence 还是未6,因此消费者需要自旋等待。当生产者继续发布数据后,因为dependentSequence 持有的就是生产者的生成进度,因此消费者可以感知到,继续消费。当前消费者有前置消费者:假设 cursor 为6,当前消费者C有两个前置依赖的消费者A(消费进度为5)、B(消费进度为4),那么此时availableSequence(FixedSequenceGroup实例,它的 get 方法是获取A、B的最小值,也就是4)为4。如果当前消费者C期望消费下标为4的数据,则可以正常消费,但是消费下标为5的数据就不行了,它需要等待它的前置消费者B消费完进度为5的数据后才能继续消费。在 waitStrategy 的 waitFor 方法返回,得到最大可用的序号 availableSequence 后,最后需要再调用下 sequencer 的getHighestPublishedSequence获取真正可用的最大序号,这和生产者模型有关系,如果是单生产者,因为数据是连续发布的,直接返回传入的 availableSequence。而如果是多生产者,因为多生产者是有多个线程在生产数据,发布的数据是不连续的,因此需要通过getHighestPublishedSequence 方法获取已发布的且连续的最大序号,因为获取序号进行消费时需要是顺序的,不能跳跃。public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException { /** * sequence: 消费者期望获取的序号 * cursorSequence: 生产者的序号 * dependentSequence: 消费者需要依赖的序号 */ long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } // 目标sequence已经发布了, 这里获取真正的最大序号(和生产者模型有关) return sequencer.getHighestPublishedSequence(sequence, availableSequence);}public long waitFor( final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; // 确保该序号已经被我前面的消费者消费(协调与其他消费者的关系) while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); // 自旋等待 ThreadHints.onSpinWait(); } return availableSequence;}六、Disruptor 高性能原理分析6.1 空间预分配前文分析源码时介绍到,RingBuffer 内部维护了一个 Object 数组(也就是真正存储数据的容器),在 RingBuffer 初始化时该 Object 数组就已经使用EventFactory 初始化了一些空 Event,后续就不需要在运行时来创建了,避免频繁GC。另外,RingBuffe 的数组中的元素是在初始化时一次性全部创建的,所以这些元素的内存地址大概率是连续的。消费者在消费时,是遵循空间局部性原理的。消费完第一个Event 时,很快就会消费第二个 Event,而在消费第一个 Event 时,CPU 会把内存中的第一个 Event 的后面的 Event 也加载进 Cache 中,这样当消费第二个 Event 时,它已经在 CPU Cache 中了,所以就不需要从内存中加载了,这样也可以大大提升性能。6.2、避免伪共享6.2.1 一个伪共享的例子如下代码所示,定义了一个 Pointer 类,它有2个 long 类型的成员变量x、y,然后在 main 方法中其中2个线程分别对同一个 Pointer 对象的x和y自增 100000000 次,最后统计下方法耗时,在我本机电脑上测试多次,平均约为3600ms。public class Pointer { volatile long x; volatile long y; @Override public String toString() { return new StringJoiner(", ", Pointer.class.getSimpleName() + "[", "]") .add("x=" + x) .add("y=" + y) .toString(); }}public static void main(String[] args) throws InterruptedException { Pointer pointer = new Pointer(); int num = 100000000; long start = System.currentTimeMillis(); Thread t1 = new Thread(() -> { for(int i = 0; i { for(int i = 0; i cachedGatingSequence || cachedGatingSequence > current) { // 重新计算下所有消费者里的最小消费进度 long gatingSequence = Util.getMinimumSequence(gatingSequences, current); // 依然没有足够的空间, 让出CPU使用权 if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); continue; } // 更新下最新的最小的消费进度 gatingSequenceCache.set(gatingSequence); } // 第二步:看见空间足够时尝试CAS竞争空间 else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next;}6.4、支持批量消费定义 Event这个比较好理解,在前文分析消费数据的逻辑时介绍了,消费者会获取下最大可用的序号,然后批量消费这些消息。七、Disruptor 在i主题业务中的使用很多开源项目都使用了 Disruptor,比如日志框架 Log4j2 使用它来实现异步日志。HBase、Storm 等项目中也使用了到了 Disruptor。vivo 的 i主题业务也使用了 Disruptor,下面简单介绍下它的2个使用场景。7.1、监控数据上报业务监控系统对于企业来说非常重要,可以帮助企业及时发现和解决问题,可以方便的检测业务指标数据,改进业务决策,从而保证业务的可持续发展。i主题使用 Disruptor(多生产者单消费者)来暂存待上报的业务指标数据,然后有定时任务不断提取数据上报到监控平台,如下图所示。7.2、本地缓存 key 统计分析i主题业务中大量使用了本地缓存,为了统计本地缓存中key 的个数(去重)以及每种缓存模式 key 的数量,考虑使用 Disruptor 来暂存并消费处理数据。因为业务代码里很多地方涉及到本地缓存的访问,也就是说,生产者是多线程的。考虑到消费处理比较简单,而如果使用多线程消费的话又涉及到加锁同步,因此消费者采用单线程模式。整体流程如下图所示,首先在缓存访问工具类中增加缓存访问统计上报的调用,缓存访问数据进入到 RingBuffer 后,单线程消费者使用 HyperLogLog 来去重统计不同 key的个数,使用正则匹配来统计每种模式key的数量。然后有异步任务定时获取统计结果,进行展示。需要注意的是,因为 RingBuffer 队列大小是固定的,如果生产者生产过快而消费者消费不过来,如果使用 next 方法申请序号,如果剩余空间不够会导致生产者阻塞,因此建议使用 tryPublishEvent 方法去发布数据,它内部是使用 tryNext 方法申请序号,该方法如果申请不到可用序号会抛出异常,这样生产者感知到了就可以做兼容处理,而不是阻塞等待。八、使用建议Disruptor 是基于生产者消费者模式,如果生产快消费慢,就会导致生产者无法写入数据。因此,不建议在 Disruptor 消费线程中处理耗时较长的业务。一个 EventHandler 对应一个线程,一个线程只服务于一个 EventHandler。Disruptor 需要为每一个 EventHandler(EventProcessor) 创建一个线程。因此在创建 Disruptor 时不推荐传入指定的线程池,而是由 Disruptor 自身根据 EventHandler 数量去创建对应的线程。生产者调用 next 方法申请序号时,如果获取不到可用序号会阻塞,这一点需要注意。推荐使用 tryPublishEvent 方法,生产者在申请不到可用序号时会立即返回,不会阻塞业务线程。如果使用 next 方法申请可用序号,需要确保在 finally 方法中调用 publish 真正发布数据。合理设置等待策略。消费者在获取不到数据时会根据设置的等待策略进行等待,BlockingWaitStrategry 是最低效的策略,但其对 CPU消耗最小。YieldingWaitStrategy 有着较低的延迟、较高的吞吐量,以及较高 CPU 占用率。当 CPU 数量足够时,可以使用该策略。九、总结本文首先通过对比 JDK 中内置的线程安全的队列和Disruptor 的特点,引入了高性能无锁内存队列 Disruptor。接着介绍了 Disruptor 的核心概念和基本使用,使读者对 Disruptor 建立起初步的认识。接着从源码和原理角度介绍了 Disruptor 的核心实现以及高性能原理(空间预分配、避免伪共享、无锁、支持批量消费)。其次,结合i主题业务介绍了 Disruptor 在实践中的应用。最后,基于上述原理分析及应用实战,总结了一些 Disruptor 最佳实践策略。参考文章:https://time.geekbang.org/column/article/132477https://lmax-exchange.github.io/disruptor/END猜你喜欢 基于 Three.js 的 3D 模型加载优化数据特征采样在 MySQL 同步一致性校验中的实践HBase 在统一内容平台业务的优化实践
|
|