百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

Disruptor—3.核心源码实现分析二

liuian 2025-06-15 17:36 6 浏览

大纲

1.Disruptor的生产者源码分析

2.Disruptor的消费者源码分析

3.Disruptor的WaitStrategy等待策略分析

4.Disruptor的高性能原因

5.Disruptor高性能之数据结构(内存预加载机制)

6.Disruptor高性能之内核(使用单线程写)

7.Disruptor高性能之系统内存优化(内存屏障)

8.Disruptor高性能之系统缓存优化(消除伪共享)

9.Disruptor高性能之序号获取优化(自旋 + CAS)


2.Disruptor的消费者源码分析

Disruptor的消费者主要由BatchEventProcessor类和WorkProcessor类来实现,并通过Disruptor的handleEventsWith()方法或者
handleEventsWithWorkerPool()方法和start()方法来启动。


执行Disruptor的handleEventsWith()方法绑定消费者时,会创建BatchEventProcessor对象,并将其添加到Disruptor的consumerRepository属性


执行Disruptor的
handleEventsWithWorkerPool()方法
绑定消费者时,则会创建WorkProcessor对象,并将该对象添加到Disruptor的consumerRepository属性


执行Disruptor的start()方法启动Disruptor实例时,便会通过线程池执行BatchEventProcessor里的run()方法,或者通过线程池执行WorkProcessor里的run()方法


执行BatchEventProcessor的run()方法时,会通过修改BatchEventProcessor的sequence来实现消费RingBuffer的数据


执行WorkProcessor的run()方法时,会通过修改WorkProcessor的sequence来实现消费RingBuffer的数据

public class Main {
    public static void main(String[] args) {
        //参数准备
        OrderEventFactory orderEventFactory = new OrderEventFactory();
        int ringBufferSize = 4;
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  
        //参数一:eventFactory,消息(Event)工厂对象
        //参数二:ringBufferSize,容器的长度
        //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler
        //参数四:ProducerType,单生产者还是多生产者
        //参数五:waitStrategy,等待策略
        //1.实例化Disruptor对象
        Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
            orderEventFactory,
            ringBufferSize,
            executor,
            ProducerType.SINGLE,
            new BlockingWaitStrategy()
        );
  
        //2.添加Event处理器,用于处理事件
        //也就是构建Disruptor与消费者的一个关联关系
        //方式一:使用handleEventsWith()方法
        disruptor.handleEventsWith(new OrderEventHandler());
        //方式二:使用handleEventsWithWorkerPool()方法
        //disruptor.handleEventsWithWorkerPool(workHandlers);
  
        //3.启动disruptor
        disruptor.start();
  
        //4.获取实际存储数据的容器: RingBuffer
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        OrderEventProducer producer = new OrderEventProducer(ringBuffer);
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long i = 0; i < 5; i++) {
            bb.putLong(0, i);
            //向容器中投递数据
            producer.sendData(bb);
        }
        disruptor.shutdown();
        executor.shutdown();
    }
}
public class Disruptor<T> {
    private final RingBuffer<T> ringBuffer;
    private final Executor executor;
    private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
    private final AtomicBoolean started = new AtomicBoolean(false);
    private ExceptionHandler<? super T> exceptionHandler;
    ...
    
    //绑定消费者,设置EventHandler,创建EventProcessor
    //Set up event handlers to handle events from the ring buffer. 
    //These handlers will process events as soon as they become available, in parallel.
    //This method can be used as the start of a chain. 
    //For example if the handler A must process events before handler B: dw.handleEventsWith(A).then(B); 
    //@param handlers the event handlers that will process events.
    //@return a EventHandlerGroup that can be used to chain dependencies.
    @SuppressWarnings("varargs")
    public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
        return createEventProcessors(new Sequence[0], handlers);
    }
    
    //创建BatchEventProcessor,添加到consumerRepository中
    EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) {
        checkNotStarted();
        final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
            final EventHandler<? super T> eventHandler = eventHandlers[i];
            //创建BatchEventProcessor对象
            final BatchEventProcessor<T> batchEventProcessor = 
                new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
            if (exceptionHandler != null) {
                batchEventProcessor.setExceptionHandler(exceptionHandler);
            }
            //添加BatchEventProcessor对象到consumerRepository中
            consumerRepository.add(batchEventProcessor, eventHandler, barrier);
            //一个消费者线程对应一个batchEventProcessor
            //每个batchEventProcessor都会持有一个Sequence对象来表示当前消费者线程的消费进度
            processorSequences[i] = batchEventProcessor.getSequence();
        }
        //将每个消费者线程持有的Sequence对象添加到生产者Sequencer的gatingSequences属性中(Sequence[]属性)
        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
        return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
    }

    private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {
        if (processorSequences.length > 0) {
            ringBuffer.addGatingSequences(processorSequences);
            for (final Sequence barrierSequence : barrierSequences) {
                ringBuffer.removeGatingSequence(barrierSequence);
            }
            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
        }
    }
    
    private void checkNotStarted() {
        //线程的开关会使用CAS实现
        if (started.get()) {
            throw new IllegalStateException("All event handlers must be added before calling starts.");
        }
    }
    ...
    
    //Starts the event processors and returns the fully configured ring buffer.
    //The ring buffer is set up to prevent overwriting any entry that is yet to be processed by the slowest event processor.
    //This method must only be called once after all event processors have been added.
    //@return the configured ring buffer.
    public RingBuffer<T> start() {
        checkOnlyStartedOnce();
        for (final ConsumerInfo consumerInfo : consumerRepository) {
             //在执行Disruptor.handleEventsWith()方法,调用Disruptor.createEventProcessors()方法时,
             //会将新创建的BatchEventProcessor对象封装成EventProcessorInfo对象(即ConsumerInfo对象),
             //然后通过add()方法添加到consumerRepository中
             //所以下面会调用EventProcessorInfo.start()方法
             consumerInfo.start(executor);
        }
        return ringBuffer;
    }
    
    private void checkOnlyStartedOnce() {
        //线程的开关使用CAS实现
        if (!started.compareAndSet(false, true)) {
            throw new IllegalStateException("Disruptor.start() must only be called once.");
        }
    }
    ...
}

//Provides a repository mechanism to associate EventHandlers with EventProcessors
class ConsumerRepository<T> implements Iterable<ConsumerInfo> {
    private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler = new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();
    private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence = new IdentityHashMap<Sequence, ConsumerInfo>();
    private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();
    
    //添加BatchEventProcessor对象到consumerRepository中
    public void add(final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier) {
        //将传入的BatchEventProcessor对象封装成EventProcessorInfo对象,即ConsumerInfo对象
        final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
        eventProcessorInfoByEventHandler.put(handler, consumerInfo);
        eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
        consumerInfos.add(consumerInfo);
    }
    ...
}

class EventProcessorInfo<T> implements ConsumerInfo {
    private final EventProcessor eventprocessor;
    private final EventHandler<? super T> handler;
    private final SequenceBarrier barrier;
    private boolean endOfChain = true;

    EventProcessorInfo(final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier) {
        this.eventprocessor = eventprocessor;
        this.handler = handler;
        this.barrier = barrier;
    }
    ...
    
    @Override
    public void start(final Executor executor) {
        //通过传入的线程池,执行BatchEventProcessor对象的run()方法
        //传入的线程池,其实就是初始化Disruptor时指定的线程池
        executor.execute(eventprocessor);
    }
    ...
}

//Convenience class for handling the batching semantics of consuming entries from 
//a RingBuffer and delegating the available events to an EventHandler.
//If the EventHandler also implements LifecycleAware it will be notified just after 
//the thread is started and just before the thread is shutdown.
//@param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
public final class BatchEventProcessor<T> implements EventProcessor {
    private final AtomicBoolean running = new AtomicBoolean(false);
    private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
    private final DataProvider<T> dataProvider;
    private final SequenceBarrier sequenceBarrier;
    private final EventHandler<? super T> eventHandler;
    private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    private final TimeoutHandler timeoutHandler;

    //Construct a EventProcessor that will automatically track the progress by 
    //updating its sequence when the EventHandler#onEvent(Object, long, boolean) method returns.
    //@param dataProvider to which events are published.
    //@param sequenceBarrier on which it is waiting.
    //@param eventHandler is the delegate to which events are dispatched.
    public BatchEventProcessor(final DataProvider<T> dataProvider, final SequenceBarrier sequenceBarrier, final EventHandler<? super T> eventHandler) {
        //传入的dataProvider其实就是Disruptor的ringBuffer
        this.dataProvider = dataProvider;
        this.sequenceBarrier = sequenceBarrier;
        this.eventHandler = eventHandler;
        if (eventHandler instanceof SequenceReportingEventHandler) {
            ((SequenceReportingEventHandler<?>)eventHandler).setSequenceCallback(sequence);
        }
        timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
    }
    ...
    
    //It is ok to have another thread rerun this method after a halt().
    //通过对sequence进行修改来实现消费RingBuffer里的数据
    @Override
    public void run() {
        if (running.compareAndSet(IDLE, RUNNING)) {
            sequenceBarrier.clearAlert();
            notifyStart();
            try {
                if (running.get() == RUNNING) {
                    processEvents();
                }
            } finally {
                notifyShutdown();
                running.set(IDLE);
            }
        } else {
            //This is a little bit of guess work.  
            //The running state could of changed to HALTED by this point.  
            //However, Java does not have compareAndExchange which is the only way to get it exactly correct.
            if (running.get() == RUNNING) {
                throw new IllegalStateException("Thread is already running");
            } else {
                earlyExit();
            }
        }
    }
    
    private void processEvents() {
        T event = null;
        long nextSequence = sequence.get() + 1L;

        while (true) {
            try {
                //通过sequenceBarrier.waitFor()方法看看消费者是否需要等待生产者投递消息
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                if (batchStartAware != null) {
                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                }
                while (nextSequence <= availableSequence) {
                    //从RingBuffer中获取要消费的数据
                    event = dataProvider.get(nextSequence);
                    //执行消费者实现的onEvent()方法来消费数据
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }
                //设置消费者当前的消费进度
                sequence.set(availableSequence);
            } catch (final TimeoutException e) {
                notifyTimeout(sequence.get());
            } catch (final AlertException ex) {
                if (running.get() != RUNNING) {
                    break;
                }
            } catch (final Throwable ex) {
                handleEventException(ex, nextSequence, event);
                sequence.set(nextSequence);
                nextSequence++;
            }
        }
    }

    private void earlyExit() {
        notifyStart();
        notifyShutdown();
    }

    private void notifyTimeout(final long availableSequence) {
        try {
            if (timeoutHandler != null) {
                timeoutHandler.onTimeout(availableSequence);
            }
        } catch (Throwable e) {
            handleEventException(e, availableSequence, null);
        }
    }

    //Notifies the EventHandler when this processor is starting up
    private void notifyStart() {
        if (eventHandler instanceof LifecycleAware) {
            try {
                ((LifecycleAware) eventHandler).onStart();
            } catch (final Throwable ex) {
                handleOnStartException(ex);
            }
        }
    }

    //Notifies the EventHandler immediately prior to this processor shutting down
    private void notifyShutdown() {
        if (eventHandler instanceof LifecycleAware) {
            try {
                ((LifecycleAware) eventHandler).onShutdown();
            } catch (final Throwable ex) {
                handleOnShutdownException(ex);
            }
        }
    }
    ...
}
public class Disruptor<T> {
    private final RingBuffer<T> ringBuffer;
    private final Executor executor;
    private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
    private final AtomicBoolean started = new AtomicBoolean(false);
    private ExceptionHandler<? super T> exceptionHandler;
    ...
    
    //设置WorkHandler,创建WorkProcessor
    //Set up a WorkerPool to distribute an event to one of a pool of work handler threads.
    //Each event will only be processed by one of the work handlers.
    //The Disruptor will automatically start this processors when #start() is called.
    //@param workHandlers the work handlers that will process events.
    //@return a {@link EventHandlerGroup} that can be used to chain dependencies.
    @SafeVarargs
    @SuppressWarnings("varargs")
    public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers) {
        return createWorkerPool(new Sequence[0], workHandlers);
    }
    
    //创建WorkerPool,添加到consumerRepository中
    EventHandlerGroup<T> createWorkerPool(final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) {
        final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
        //创建WorkerPool对象,以及根据workHandlers创建WorkProcessor
        final WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
        //添加WorkerPool对象到consumerRepository中  
        consumerRepository.add(workerPool, sequenceBarrier);
        final Sequence[] workerSequences = workerPool.getWorkerSequences();
        //将每个消费者线程持有的Sequence对象添加到生产者Sequencer的gatingSequences属性中(Sequence[]属性)
        updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
        return new EventHandlerGroup<>(this, consumerRepository, workerSequences);
    }
    
    private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {
        if (processorSequences.length > 0) {
            ringBuffer.addGatingSequences(processorSequences);
            for (final Sequence barrierSequence : barrierSequences) {
                ringBuffer.removeGatingSequence(barrierSequence);
            }
            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
        }
    }
    ...
    
    //Starts the event processors and returns the fully configured ring buffer.
    //The ring buffer is set up to prevent overwriting any entry that is yet to be processed by the slowest event processor.
    //This method must only be called once after all event processors have been added.
    //@return the configured ring buffer.
    public RingBuffer<T> start() {
        checkOnlyStartedOnce();
        for (final ConsumerInfo consumerInfo : consumerRepository) {
            //在执行Disruptor.handleEventsWithWorkerPool()方法,调用Disruptor.createWorkerPool()方法时,
            //会将新创建的WorkerPool对象封装成WorkerPoolInfo对象(即ConsumerInfo对象),
            //然后通过add()方法添加到consumerRepository中
            //所以下面会调用WorkerPoolInfo.start()方法
            consumerInfo.start(executor);
        }
        return ringBuffer;
    }
    
    private void checkOnlyStartedOnce() {
        //线程的开关使用CAS实现
        if (!started.compareAndSet(false, true)) {
            throw new IllegalStateException("Disruptor.start() must only be called once.");
        }
    }
    ...
}

//Provides a repository mechanism to associate EventHandlers with EventProcessors
class ConsumerRepository<T> implements Iterable<ConsumerInfo> {
    private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler = new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();
    private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence = new IdentityHashMap<Sequence, ConsumerInfo>();
    private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();
    
    //添加WorkerPool对象到consumerRepository中
    public void add(final WorkerPool<T> workerPool, final SequenceBarrier sequenceBarrier) {
        final WorkerPoolInfo<T> workerPoolInfo = new WorkerPoolInfo<>(workerPool, sequenceBarrier);
        consumerInfos.add(workerPoolInfo);
        for (Sequence sequence : workerPool.getWorkerSequences()) {
            eventProcessorInfoBySequence.put(sequence, workerPoolInfo);
        }
    }
    ...
}

class WorkerPoolInfo<T> implements ConsumerInfo {
    private final WorkerPool<T> workerPool;
    private final SequenceBarrier sequenceBarrier;
    private boolean endOfChain = true;
   
    WorkerPoolInfo(final WorkerPool<T> workerPool, final SequenceBarrier sequenceBarrier) {
        this.workerPool = workerPool;
        this.sequenceBarrier = sequenceBarrier;
    }
    
    @Override
    public void start(Executor executor) {
        workerPool.start(executor);
    }
    ...
}

public final class WorkerPool<T> {
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    private final RingBuffer<T> ringBuffer;
    //WorkProcessors are created to wrap each of the provided WorkHandlers
    private final WorkProcessor<?>[] workProcessors;
    
    //Create a worker pool to enable an array of WorkHandlers to consume published sequences.
    //This option requires a pre-configured RingBuffer which must have RingBuffer#addGatingSequences(Sequence...) called before the work pool is started.
    //@param ringBuffer       of events to be consumed.
    //@param sequenceBarrier  on which the workers will depend.
    //@param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s.
    //@param workHandlers     to distribute the work load across.
    @SafeVarargs
    public WorkerPool(final RingBuffer<T> ringBuffer, final SequenceBarrier sequenceBarrier, final ExceptionHandler<? super T> exceptionHandler, final WorkHandler<? super T>... workHandlers) {
        this.ringBuffer = ringBuffer;
        final int numWorkers = workHandlers.length;
        //根据workHandlers创建WorkProcessor
        workProcessors = new WorkProcessor[numWorkers];
        for (int i = 0; i < numWorkers; i++) {
            workProcessors[i] = new WorkProcessor<>(ringBuffer, sequenceBarrier, workHandlers[i], exceptionHandler, workSequence);
        }
    }
    
    //Start the worker pool processing events in sequence.
    //@param executor providing threads for running the workers.
    //@return the {@link RingBuffer} used for the work queue.
    //@throws IllegalStateException if the pool has already been started and not halted yet
    public RingBuffer<T> start(final Executor executor) {
        if (!started.compareAndSet(false, true)) {
            throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
        }
  
        final long cursor = ringBuffer.getCursor();
        workSequence.set(cursor);
  
        for (WorkProcessor<?> processor : workProcessors) {
            processor.getSequence().set(cursor);
            //通过传入的线程池,执行WorkProcessor对象的run()方法
            executor.execute(processor);
        }
        return ringBuffer;
    }
    ...
}

public final class WorkProcessor<T> implements EventProcessor {
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    private final RingBuffer<T> ringBuffer;
    private final SequenceBarrier sequenceBarrier;
    private final WorkHandler<? super T> workHandler;
    private final ExceptionHandler<? super T> exceptionHandler;
    private final Sequence workSequence;
    private final EventReleaser eventReleaser = new EventReleaser() {
        @Override
        public void release() {
            sequence.set(Long.MAX_VALUE);
        }
    };
    private final TimeoutHandler timeoutHandler;

    //Construct a {@link WorkProcessor}.
    //@param ringBuffer       to which events are published.
    //@param sequenceBarrier  on which it is waiting.
    //@param workHandler      is the delegate to which events are dispatched.
    //@param exceptionHandler to be called back when an error occurs
    //@param workSequence     from which to claim the next event to be worked on.  It should always be initialised as Sequencer#INITIAL_CURSOR_VALUE
    public WorkProcessor(final RingBuffer<T> ringBuffer, final SequenceBarrier sequenceBarrier, final WorkHandler<? super T> workHandler, final ExceptionHandler<? super T> exceptionHandler, final Sequence workSequence) {
        this.ringBuffer = ringBuffer;
        this.sequenceBarrier = sequenceBarrier;
        this.workHandler = workHandler;
        this.exceptionHandler = exceptionHandler;
        this.workSequence = workSequence;
        if (this.workHandler instanceof EventReleaseAware) {
            ((EventReleaseAware) this.workHandler).setEventReleaser(eventReleaser);
        }
        timeoutHandler = (workHandler instanceof TimeoutHandler) ? (TimeoutHandler) workHandler : null;
    }
    
    //通过对sequence进行修改来实现消费RingBuffer里的数据
    @Override
    public void run() {
        if (!running.compareAndSet(false, true)) {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();
        notifyStart();
  
        boolean processedSequence = true;
        long cachedAvailableSequence = Long.MIN_VALUE;
        long nextSequence = sequence.get();
        T event = null;
        while (true) {
            try {
                if (processedSequence) {
                    processedSequence = false;
                    do {
                        nextSequence = workSequence.get() + 1L;
                        //设置消费者当前的消费进度
                        sequence.set(nextSequence - 1L);
                    } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }
  
                if (cachedAvailableSequence >= nextSequence) {
                    //从RingBuffer中获取要消费的数据
                    event = ringBuffer.get(nextSequence);
                    //执行消费者实现的onEvent()方法来消费数据
                    workHandler.onEvent(event);
                    processedSequence = true;
                } else {
                    //通过sequenceBarrier.waitFor()方法看看消费者是否需要等待生产者投递消息
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            } catch (final TimeoutException e) {
                notifyTimeout(sequence.get());
            } catch (final AlertException ex) {
                if (!running.get()) {
                    break;
                }
            } catch (final Throwable ex) {
                //handle, mark as processed, unless the exception handler threw an exception
                exceptionHandler.handleEventException(ex, nextSequence, event);
                processedSequence = true;
            }
        }
        notifyShutdown();
        running.set(false);
    }
    ...
}
public class Disruptor<T> {
    private final RingBuffer<T> ringBuffer;
    
    private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {
        if (processorSequences.length > 0) {
            ringBuffer.addGatingSequences(processorSequences);
            for (final Sequence barrierSequence : barrierSequences) {
                ringBuffer.removeGatingSequence(barrierSequence);
            }
            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
        }
    }
    ...
}

abstract class RingBufferPad {
    protected long p1, p2, p3, p4, p5, p6, p7;
}

abstract class RingBufferFields<E> extends RingBufferPad {
    ...
    private static final Unsafe UNSAFE = Util.getUnsafe();
    private final long indexMask;
    //环形数组存储事件消息
    private final Object[] entries;
    protected final int bufferSize;
    //RingBuffer的sequencer属性代表了当前线程对应的生产者
    protected final Sequencer sequencer;
    
    RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();
        if (bufferSize < 1) {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        this.indexMask = bufferSize - 1;
        //初始化数组
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        //内存预加载
        fill(eventFactory);
    }
    
    private void fill(EventFactory<E> eventFactory) {
        for (int i = 0; i < bufferSize; i++) {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }
    
    protected final E elementAt(long sequence) {
        return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
    }
    ...
}

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {
    ...
    //Add the specified gating sequences to this instance of the Disruptor.  
    //They will safely and atomically added to the list of gating sequences.
    //@param gatingSequences The sequences to add.
    public void addGatingSequences(Sequence... gatingSequences) {
        sequencer.addGatingSequences(gatingSequences);
    }
    ...
}

public interface Sequencer extends Cursored, Sequenced {
    ...
    //Add the specified gating sequences to this instance of the Disruptor.  
    //They will safely and atomically added to the list of gating sequences.
    //@param gatingSequences The sequences to add.
    void addGatingSequences(Sequence... gatingSequences);
    ...
}

public abstract class AbstractSequencer implements Sequencer {
    private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
        AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
    ...
    @Override
    public final void addGatingSequences(Sequence... gatingSequences) {
        SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
    }
    ...
}

class SequenceGroups {
    static <T> void addSequences(final T holder, final AtomicReferenceFieldUpdater<T, Sequence[]> updater, final Cursored cursor, final Sequence... sequencesToAdd) {
        long cursorSequence;
        Sequence[] updatedSequences;
        Sequence[] currentSequences;

        do {
            currentSequences = updater.get(holder);
            updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
            cursorSequence = cursor.getCursor();

            int index = currentSequences.length;
            for (Sequence sequence : sequencesToAdd) {
                sequence.set(cursorSequence);
                updatedSequences[index++] = sequence;
            }
        } while (!updater.compareAndSet(holder, currentSequences, updatedSequences));

        cursorSequence = cursor.getCursor();
        for (Sequence sequence : sequencesToAdd) {
            sequence.set(cursorSequence);
        }
    }
    ...
}


3.Disruptor的WaitStrategy等待策略分析

在生产者发布消息时,会调用WaitStrategy的signalAllWhenBlocking()方法唤醒阻塞的消费者。在消费者消费消息时,会调用WaitStrategy的waitFor()方法阻塞消费过快的消费者


当然,不同的策略不一定就是阻塞消费者,比如BlockingWaitStrategy会通过ReentrantLock来阻塞消费者,而YieldingWaitStrategy则通过yield切换线程来实现让消费者无锁等待,即通过Thread的yield()方法切换线程让另一个线程继续执行自旋判断操作


所以YieldingWaitStrategy等待策略的效率是最高的 + 最耗费CPU资源,当然效率次高、比较耗费CPU资源的是BusySpinWaitStrategy等待策略


Disruptor提供了如下几种等待策略:

一.完全阻塞的等待策略BlockingWaitStrategy

二.切换线程自旋的等待策略YieldingWaitStrategy

三.繁忙自旋的等待策略BusySpinWaitStrategy

四.轻微阻塞的等待策略LiteBlockingWaitStrategy
也就是唤醒阻塞线程时,通过GAS避免并发获取锁的等待策略

五.最小睡眠 + 切换线程的等待策略SleepingWaitStrategy

总结:

为了达到最高效率,有大量CPU资源,可切换线程让多个线程自旋判断
为了保证高效的同时兼顾CPU资源,可以让单个线程自旋判断
为了保证比较高效更加兼顾CPU资源,可以切换线程自旋 + 最少睡眠
为了完全兼顾CPU资源不考虑效率问题,可以采用重入锁实现阻塞唤醒
为了完全兼顾CPU资源但考虑一点效率,可以采用重入锁 + GAS唤醒
//完全阻塞的等待策略
//Blocking strategy that uses a lock and condition variable for EventProcessors waiting on a barrier.
//This strategy can be used when throughput and low-latency are not as important as CPU resource.
public final class BlockingWaitStrategy implements WaitStrategy {
    private final Lock lock = new ReentrantLock();
    private final Condition processorNotifyCondition = lock.newCondition();
    
    @Override
    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException {
        long availableSequence;
        if ((availableSequence = cursorSequence.get()) < sequence) {
            lock.lock();
            try {
                while ((availableSequence = cursorSequence.get()) < sequence) {
                    barrier.checkAlert();
                    processorNotifyCondition.await();
                }
            } finally {
                lock.unlock();
            }
        }
        while ((availableSequence = dependentSequence.get()) < sequence) {
            barrier.checkAlert();
        }
        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking() {
        lock.lock();
        try {
            processorNotifyCondition.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

//切换线程自旋的等待策略
//Yielding strategy that uses a Thread.yield() for EventProcessors waiting on a barrier after an initially spinning.
//This strategy is a good compromise between performance and CPU resource without incurring significant latency spikes.
public final class YieldingWaitStrategy implements WaitStrategy {
    private static final int SPIN_TRIES = 100;

    @Override
    public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException {
        long availableSequence;
        int counter = SPIN_TRIES;
        while ((availableSequence = dependentSequence.get()) < sequence) {
            counter = applyWaitMethod(barrier, counter);
        }
        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking() {
    
    }

    private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException {
        barrier.checkAlert();
        if (0 == counter) {
            //切换线程,让另一个线程继续执行自旋操作
            Thread.yield();
        } else {
            --counter;
        }
        return counter;
    }
}

//繁忙自旋的等待策略
//Busy Spin strategy that uses a busy spin loop for EventProcessors waiting on a barrier.
//This strategy will use CPU resource to avoid syscalls which can introduce latency jitter.
//It is best used when threads can be bound to specific CPU cores.
public final class BusySpinWaitStrategy implements WaitStrategy {
    @Override
    public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException {
        long availableSequence;
        while ((availableSequence = dependentSequence.get()) < sequence) {
            barrier.checkAlert();
        }
        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking() {
    
    }
}

//轻微阻塞的等待策略(唤醒阻塞线程时避免了并发获取锁)
//Variation of the BlockingWaitStrategy that attempts to elide conditional wake-ups when the lock is uncontended.
//Shows performance improvements on microbenchmarks.
//However this wait strategy should be considered experimental as I have not full proved the correctness of the lock elision code.
public final class LiteBlockingWaitStrategy implements WaitStrategy {
    private final Lock lock = new ReentrantLock();
    private final Condition processorNotifyCondition = lock.newCondition();
    private final AtomicBoolean signalNeeded = new AtomicBoolean(false);

    @Override
    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException {
        long availableSequence;
        if ((availableSequence = cursorSequence.get()) < sequence) {
            lock.lock();
            try {
                do {
                    signalNeeded.getAndSet(true);
                    if ((availableSequence = cursorSequence.get()) >= sequence) {
                        break;
                    }
                    barrier.checkAlert();
                    processorNotifyCondition.await();
                } while ((availableSequence = cursorSequence.get()) < sequence);
            } finally {
                lock.unlock();
            }
        }
        while ((availableSequence = dependentSequence.get()) < sequence) {
            barrier.checkAlert();
        }
        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking() {
        if (signalNeeded.getAndSet(false)) {
            lock.lock();
            try {
                processorNotifyCondition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    }
}

//最小睡眠 + 切换线程的等待策略SleepingWaitStrategy
//Sleeping strategy that initially spins, then uses a Thread.yield(), 
//and eventually sleep LockSupport.parkNanos(1) for the minimum number of nanos the OS 
//and JVM will allow while the EventProcessors are waiting on a barrier.
//This strategy is a good compromise between performance and CPU resource.
//Latency spikes can occur after quiet periods.
public final class SleepingWaitStrategy implements WaitStrategy {
    private static final int DEFAULT_RETRIES = 200;
    private final int retries;

    public SleepingWaitStrategy() {
        this(DEFAULT_RETRIES);
    }
    
    public SleepingWaitStrategy(int retries) {
        this.retries = retries;
    }

    @Override
    public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException {
        long availableSequence;
        int counter = retries;
        
        while ((availableSequence = dependentSequence.get()) < sequence) {
            counter = applyWaitMethod(barrier, counter);
        }
        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking() {
    
    }

    private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException {
        barrier.checkAlert();
        if (counter > 100) {
            --counter;
        } else if (counter > 0) {
            --counter;
            Thread.yield();
        } else {
            LockSupport.parkNanos(1L);
        }
        return counter;
    }
}


4.Disruptor的高性能原因

一.使用了环形结构 + 数组 + 内存预加载

二.使用了单线程写的方式并配合内存屏障

三.消除伪共享(填充缓存行)

四.序号栅栏和序号配合使用来消除锁

五.提供了多种不同性能的等待策略


5.Disruptor高性能之数据结构(内存预加载机制)

(1)RingBuffer使用环形数组来存储元素

(2)采用了内存预加载机制


(1)RingBuffer使用环形数组来存储元素

环形数组可以避免数组扩容和缩容带来的性能损耗。


(2)RingBuffer采用了内存预加载机制

初始化RingBuffer时,会将entries数组里的每一个元素都先new出来。比如RingBuffer的大小设置为8,那么初始化RingBuffer时,就会先将entries数组的8个元素分别指向新new出来的空的Event对象。往RingBuffer填充元素时,只是将对应的Event对象进行赋值。所以RingBuffer中的Event对象是一直存活着的,也就是说它能最小程度减少系统GC频率,从而提升性能。

public class Main {
    public static void main(String[] args) {
        //参数准备
        OrderEventFactory orderEventFactory = new OrderEventFactory();
        int ringBufferSize = 4;
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
        //参数一:eventFactory,消息(Event)工厂对象
        //参数二:ringBufferSize,容器的长度
        //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler
        //参数四:ProducerType,单生产者还是多生产者
        //参数五:waitStrategy,等待策略
        //1.实例化Disruptor对象
        Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
            orderEventFactory,
            ringBufferSize,
            executor,
            ProducerType.SINGLE,
            new BlockingWaitStrategy()
        );
    
        //2.添加Event处理器,用于处理事件
        //也就是构建Disruptor与消费者的一个关联关系
        disruptor.handleEventsWith(new OrderEventHandler());
    
        //3.启动disruptor
        disruptor.start();
    
        //4.获取实际存储数据的容器: RingBuffer
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        OrderEventProducer producer = new OrderEventProducer(ringBuffer);
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long i = 0; i < 5; i++) {
            bb.putLong(0, i);
            //向容器中投递数据
            producer.sendData(bb);
        }
        disruptor.shutdown();
        executor.shutdown();
    }
}

public class Disruptor<T> {
    private final RingBuffer<T> ringBuffer;
    private final Executor executor;
    ...
    
    //Create a new Disruptor.
    //@param eventFactory   the factory to create events in the ring buffer.
    //@param ringBufferSize the size of the ring buffer, must be power of 2.
    //@param executor       an Executor to execute event processors.
    //@param producerType   the claim strategy to use for the ring buffer.
    //@param waitStrategy   the wait strategy to use for the ring buffer.
    public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) {
        this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);
    }
    
    //Private constructor helper
    private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) {
        this.ringBuffer = ringBuffer;
        this.executor = executor;
    }
    ...
}

//Ring based store of reusable entries containing the data representing an event being exchanged between event producer and EventProcessors.
//@param <E> implementation storing the data for sharing during exchange or parallel coordination of an event.
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {
    //值为-1
    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
    protected long p1, p2, p3, p4, p5, p6, p7;
    ...
    
    //Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
    public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
        switch (producerType) {
            case SINGLE:
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
        }
    }
    
    //Create a new single producer RingBuffer with the specified wait strategy.
    public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
        return new RingBuffer<E>(factory, sequencer);
    }
    
    //Construct a RingBuffer with the full option set.
    //@param eventFactory to newInstance entries for filling the RingBuffer
    //@param sequencer sequencer to handle the ordering of events moving through the RingBuffer.
    RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {
        super(eventFactory, sequencer);
    }
    ...
}

abstract class RingBufferFields<E> extends RingBufferPad {
    private final long indexMask;
    //环形数组存储事件消息
    private final Object[] entries;
    protected final int bufferSize;
    //RingBuffer的sequencer属性代表了当前线程对应的生产者
    protected final Sequencer sequencer;
    ...
    
    RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();
        if (bufferSize < 1) {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        this.indexMask = bufferSize - 1;
        //初始化数组
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        //内存预加载
        fill(eventFactory);
    }
    
    private void fill(EventFactory<E> eventFactory) {
        for (int i = 0; i < bufferSize; i++) {
            //设置一个空的数据对象
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }
    ...
}

abstract class RingBufferPad {
    protected long p1, p2, p3, p4, p5, p6, p7;
}


6.Disruptor高性能之内核(使用单线程写)

Disruptor的RingBuffer之所以可以做到完全无锁是因为单线程写。离开单线程写,没有任何技术可以做到完全无锁。Redis和Netty等高性能技术框架也是利用单线程写来实现的。


具体就是:单生产者时,固然只有一个生产者线程在写。多生产者时,每个生产者线程都只会写各自获取到的Sequence序号对应的环形数组的元素,从而使得多个生产者线程相互之间不会产生写冲突。


7.Disruptor高性能之系统内存优化(内存屏障)

要正确实现无锁,还需要另外一个关键技术——内存屏障。对应到Java语言,就是valotile变量与Happens Before语义。


内存屏障:Linux的smp_wmb()/smp_rmb()。


8.Disruptor高性能之系统缓存优化(消除伪共享)

CPU缓存是以缓存行(Cache Line)为单位进行存储的。缓存行是2的整数幂个连续字节,一般为32-256个字节,最常见的缓存行大小是64个字节


多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会对这个缓存行形成竞争,从而无意中影响彼此性能,这就是伪共享


消除伪共享:利用了空间换时间的思想。


由于代表着一个序号的Sequence其核心字段value是一个long型变量(占8个字节),所以有可能会出现多个Sequence对象的value变量共享同一个缓存行。因此,需要对Sequence对象的value变量消除伪共享。具体做法就是:对Sequence对象的value变量前后增加7个long型变量


注意:伪共享与Sequence的静态变量无关,因为静态变量本身就是多个线程共享的,而不是多个线程隔离独立的。

class LhsPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
    protected volatile long value;
}

class RhsPadding extends Value {
    protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding {
    static final long INITIAL_VALUE = -1L;
    private static final Unsafe UNSAFE;
    private static final long VALUE_OFFSET;

    static {
        UNSAFE = Util.getUnsafe();
        try {
            VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
        } catch (final Exception e) {
            throw new RuntimeException(e);
        }
    }

    //Create a sequence initialised to -1.
    public Sequence() {
        this(INITIAL_VALUE);
    }

    //Create a sequence with a specified initial value.
    public Sequence(final long initialValue) {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
    }

    //Perform a volatile read of this sequence's value.
    public long get() {
        return value;
    }

    //Perform an ordered write of this sequence.  
    //The intent is a Store/Store barrier between this write and any previous store.
    public void set(final long value) {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
    }
    ...
}


9.Disruptor高性能之序号获取优化(自旋 + CAS)

生产者投递Event时会使用"long sequence = ringBuffer.next()"获取序号,而序号栅栏SequenceBarrier和会序号Sequence搭配起来一起使用,用来协调和管理消费者和生产者的工作节奏,避免锁的使用。


各个消费者和生产者都持有自己的序号,这些序号需满足如下条件以避免生产者速度过快,将还没来得及消费的消息覆盖。

一.消费者序号数值必须小于生产者序号数值
二.消费者序号数值必须小于其前置消费者的序号数值
三.生产者序号数值不能大于消费者中最小的序号数值

高性能的序号获取优化:为避免生产者每次执行next()获取序号时,都要查询消费者的最小序号,Disruptor采取了自旋 + LockSupport挂起线程 + 缓存最小序号 + CAS来优化。既避免了锁,也尽量在不耗费CPU的情况下提升了性能。


单生产者的情况下,只有一个线程添加元素,此时没必要使用锁。多生产者的情况下,会有多个线程并发获取Sequence序号添加元素,此时会通过自旋 + CAS避免锁。

public class OrderEventProducer {
    private RingBuffer<OrderEvent> ringBuffer;
    
    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    
    public void sendData(ByteBuffer data) {
        //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号
        long sequence = ringBuffer.next();
        try {
            //2.根据这个序号, 找到具体的"OrderEvent"元素
            //注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
            OrderEvent event = ringBuffer.get(sequence);
            //3.进行实际的赋值处理
            event.setValue(data.getLong(0));
        } finally {
            //4.提交发布操作
            ringBuffer.publish(sequence);
        }
    }
}

//Ring based store of reusable entries containing the data representing an event being exchanged between event producer and EventProcessors.
//@param <E> implementation storing the data for sharing during exchange or parallel coordination of an event.
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {
    //值为-1
    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
    protected long p1, p2, p3, p4, p5, p6, p7;
    ...
    
    //Increment and return the next sequence for the ring buffer.
    //Calls of this method should ensure that they always publish the sequence afterward.
    //E.g.
    //  long sequence = ringBuffer.next();
    //  try {
    //      Event e = ringBuffer.get(sequence);
    //      ...
    //  } finally {
    //      ringBuffer.publish(sequence);
    //  }
    //@return The next sequence to publish to.
    @Override
    public long next() {
        return sequencer.next();
    }
    
    //Publish the specified sequence.
    //This action marks this particular message as being available to be read.
    //@param sequence the sequence to publish.
    @Override
    public void publish(long sequence) {
        sequencer.publish(sequence);
    }
    
    //Get the event for a given sequence in the RingBuffer.
    //This call has 2 uses.  
    //Firstly use this call when publishing to a ring buffer.
    //After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long).
    //Secondly use this call when consuming data from the ring buffer.  
    //After calling SequenceBarrier#waitFor(long) call this method with any value greater than that 
    //your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method.
    //@param sequence for the event
    //@return the event for the given sequence
    @Override
    public E get(long sequence) {
        //调用父类RingBufferFields的elementAt()方法
        return elementAt(sequence);
    }
    ...
}

abstract class RingBufferPad {
    protected long p1, p2, p3, p4, p5, p6, p7;
}

abstract class RingBufferFields<E> extends RingBufferPad {
    ...
    private static final Unsafe UNSAFE = Util.getUnsafe();
    private final long indexMask;
    //环形数组存储事件消息
    private final Object[] entries;
    protected final int bufferSize;
    //RingBuffer的sequencer属性代表了当前线程对应的生产者
    protected final Sequencer sequencer;
    
    RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();
        if (bufferSize < 1) {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        this.indexMask = bufferSize - 1;
        //初始化数组
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        //内存预加载
        fill(eventFactory);
    }
    
    private void fill(EventFactory<E> eventFactory) {
        for (int i = 0; i < bufferSize; i++) {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }
    
    protected final E elementAt(long sequence) {
        return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
    }
    ...
}
public abstract class AbstractSequencer implements Sequencer {
    private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
        AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
    //环形数组的大小
    protected final int bufferSize;
    //等待策略
    protected final WaitStrategy waitStrategy;
    //当前生产者的进度
    protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    //每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler)
    //这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时,
    //由RingBuffer的addGatingSequences()方法进行添加
    protected volatile Sequence[] gatingSequences = new Sequence[0];
    ...
    
    //Create with the specified buffer size and wait strategy.
    //@param bufferSize The total number of entries, must be a positive power of 2.
    //@param waitStrategy
    public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) {
        if (bufferSize < 1) {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        this.bufferSize = bufferSize;
        this.waitStrategy = waitStrategy;
    }
    ...
}

abstract class SingleProducerSequencerPad extends AbstractSequencer {
    protected long p1, p2, p3, p4, p5, p6, p7;
    
    public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) {
        super(bufferSize, waitStrategy);
    }
}

abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad {
    public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) {
        super(bufferSize, waitStrategy);
    }
    
    //表示生产者的当前序号,值为-1
    protected long nextValue = Sequence.INITIAL_VALUE;
    //表示消费者的最小序号,值为-1
    protected long cachedValue = Sequence.INITIAL_VALUE;
}

public final class SingleProducerSequencer extends SingleProducerSequencerFields {
    protected long p1, p2, p3, p4, p5, p6, p7;
    
    //Construct a Sequencer with the selected wait strategy and buffer size.
    //@param bufferSize   the size of the buffer that this will sequence over.
    //@param waitStrategy for those waiting on sequences.
    public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) {
        super(bufferSize, waitStrategy);
    }
    ...
    
    @Override
    public long next() {
        return next(1);
    }
    
    @Override
    public long next(int n) {
        //Sequence的初始化值为-1
        if (n < 1) {
            throw new IllegalArgumentException("n must be > 0");
        }
        //nextValue指的是当前Sequence
        //this.nextValue为SingleProducerSequencerFields的变量
        //第一次调用next()方法时,nextValue = -1
        //第二次调用next()方法时,nextValue = 0
        //第三次调用next()方法时,nextValue = 1
        //第四次调用next()方法时,nextValue = 2
        //第五次调用next()方法时,nextValue = 3
        long nextValue = this.nextValue;
        //第一次调用next()方法时,nextSequence = -1 + 1 = 0
        //第二次调用next()方法时,nextSequence = 0 + 1 = 1
        //第三次调用next()方法时,nextSequence = 1 + 1 = 2
        //第四次调用next()方法时,nextSequence = 2 + 1 = 3
        //第五次调用next()方法时,nextSequence = 3 + 1 = 4
        long nextSequence = nextValue + n;
        //wrapPoint会用来判断生产者序号是否绕过RingBuffer的环
        //如果wrapPoint是负数,则表示还没绕过RingBuffer的环
        //如果wrapPoint是非负数,则表示已经绕过RingBuffer的环
        //假设bufferSize = 3,那么:
        //第一次调用next()方法时,wrapPoint = 0 - 3 = -3,还没绕过RingBuffer的环
        //第二次调用next()方法时,wrapPoint = 1 - 3 = -2,还没绕过RingBuffer的环
        //第三次调用next()方法时,wrapPoint = 2 - 3 = -1,还没绕过RingBuffer的环
        //第四次调用next()方法时,wrapPoint = 3 - 3 = 0,已经绕过RingBuffer的环
        //第五次调用next()方法时,wrapPoint = 4 - 3 = 1,已经绕过RingBuffer的环
        long wrapPoint = nextSequence - bufferSize;
        //cachedGatingSequence是用来将消费者的最小消费序号缓存起来
        //这样就不用每次执行next()方法都要去获取消费者的最小消费序号
        //第一次调用next()方法时,cachedGatingSequence = -1
        //第二次调用next()方法时,cachedGatingSequence = -1
        //第三次调用next()方法时,cachedGatingSequence = -1
        //第四次调用next()方法时,cachedGatingSequence = -1
        //第五次调用next()方法时,cachedGatingSequence = 1
        long cachedGatingSequence = this.cachedValue;
        
        //第四次调用next()方法时,wrapPoint大于cachedGatingSequence,执行条件中的逻辑
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
            //最小的消费者序号
            long minSequence;
            //自旋操作:
            //Util.getMinimumSequence(gatingSequences, nextValue)的含义就是找到消费者中最小的序号值
            //如果wrapPoint > 消费者中最小的序号,那么生产者线程就需要进行阻塞
            //即如果生产者序号 > 消费者中最小的序号,那么就挂起并进行自旋操作
            //第四次调用next()方法时,nextValue = 2,wrapPoint = 0,gatingSequences里面的消费者序号如果还没消费(即-1),则要挂起
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
                //TODO: Use waitStrategy to spin?  
                LockSupport.parkNanos(1L); 
            }
            //cachedValue接收了消费者的最小序号
            //第四次调用next()方法时,假设消费者的最小序号minSequence为1,则cachedValue = 1
            this.cachedValue = minSequence;
        }
        //第一次调用完next()方法时,nextValue会变为0
        //第二次调用完next()方法时,nextValue会变为1
        //第三次调用完next()方法时,nextValue会变为2
        //第四次调用完next()方法时,nextValue会变为3
        //第五次调用完next()方法时,nextValue会变为4
        this.nextValue = nextSequence;
        //第一次调用next()方法时,返回的nextSequence = 0
        //第二次调用next()方法时,返回的nextSequence = 1
        //第三次调用next()方法时,返回的nextSequence = 2
        //第四次调用next()方法时,返回的nextSequence = 3
        //第五次调用next()方法时,返回的nextSequence = 4
        return nextSequence;
    }
    
    @Override
    public void publish(long sequence) {
        //设置当前生产者的sequence
        cursor.set(sequence);
        //通过等待策略通知阻塞的消费者
        waitStrategy.signalAllWhenBlocking();
    }
    ...
}

public final class Util {
    ...
    //Get the minimum sequence from an array of {@link com.lmax.disruptor.Sequence}s.
    //@param sequences to compare.
    //@param minimum   an initial default minimum. If the array is empty this value will be returned.
    //@return the smaller of minimum sequence value found in sequences and minimum; minimum if sequences is empty
    public static long getMinimumSequence(final Sequence[] sequences, long minimum) {
        for (int i = 0, n = sequences.length; i < n; i++) {
            long value = sequences[i].get();
            minimum = Math.min(minimum, value);
        }
        return minimum;
    }
    ...
}

public final class MultiProducerSequencer extends AbstractSequencer {
    ...
    @Override
    public long next() {
        return next(1);
    }
    
    @Override
    public long next(int n) {
        if (n < 1) {
            throw new IllegalArgumentException("n must be > 0");
        }
        long current;
        long next;
        do {
            //获取当前生产者的序号
            current = cursor.get();
            next = current + n;
            //wrapPoint会用来判断生产者序号是否绕过RingBuffer的环
            //如果wrapPoint是负数,则表示还没绕过RingBuffer的环
            //如果wrapPoint是非负数,则表示已经绕过RingBuffer的环
            long wrapPoint = next - bufferSize;
            //cachedGatingSequence是用来将消费者的最小消费序号缓存起来
            //这样就不用每次执行next()方法都要去获取消费者的最小消费序号
            long cachedGatingSequence = gatingSequenceCache.get();
  
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
                //gatingSequence表示的是消费者的最小序号
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
                if (wrapPoint > gatingSequence) {
                    //TODO, should we spin based on the wait strategy?
                    LockSupport.parkNanos(1); 
                    continue;
                }
                gatingSequenceCache.set(gatingSequence);
            } else if (cursor.compareAndSet(current, next)) {
                break;
            }
        } while (true);
        return next;
    }
    ...
}

相关推荐

面试怕被问Hashmap,多看看这个文章

o数据结构otable数组长度永远为2的幂次方o那么为什么要把数组长度设计为2的幂次方呢?o扩容o链表树化o红黑树拆分o查找o插入o删除o遍历oequasl和hashcode总结HashMap是面试中...

非常简洁地重试Retry组件,使用起来杠杠的

前言小伙伴是不是经常遇到接口调用异常,超时的场景?尤其网络抖动导致timeout超时的场景,我们一般产品就会叫我们要重试几次。很多小伙伴的实现方式是写个循环调用for(inti=1;i<=3;...

Kafka消息可靠传输之幂等、事务机制

一般而言,消息中间件的消息传输保障有3个层级,分别如下。atmostonce:至多一次。消息可能会丢失,但绝对不会重复传输。atleastonce:最少一次。消息绝不会丢失,但可能会重复传输。...

Seata源码—9.Seata XA模式的事务处理

大纲1.SeataXA分布式事务案例及AT与XA的区别2.SeataXA分布式事务案例的各模块运行流程3.Seata使用SpringBoot自动装配简化复杂配置4.全局事务注解扫描组件的自动装配...

Disruptor—3.核心源码实现分析一

大纲1.Disruptor的生产者源码分析2.Disruptor的消费者源码分析3.Disruptor的WaitStrategy等待策略分析4.Disruptor的高性能原因5.Disruptor高性...

Spring Boot 进阶-详解SpringBoot中条件注解使用

作为使用SpringBoot框架的开发者来讲,如果你连如下的这些注解你都没有听说过,没有用过,那我劝你还是放弃吧?在SpringBoot中我们最常见到的注解应该是条件注解了吧!也就是@Condit...

如何自定义编解码器(如何自定义编解码器的程序)

1.前言上一节我们一节了解了什么是编码解码、序列化和反序列化了,并且留有一道思考题,本节内容主要是深入解析该思考题。思考题:能否把我们的编码和解码封装成独立的Handler呢?那么应该如何去封装...

Disruptor—3.核心源码实现分析二

大纲1.Disruptor的生产者源码分析2.Disruptor的消费者源码分析3.Disruptor的WaitStrategy等待策略分析4.Disruptor的高性能原因5.Disruptor高性...

线程的状态有哪些?它是如何工作的?

线程的状态有哪些?它是如何工作的?线程(Thread)是并发编程的基础,也是程序执行的最小单元,它依托进程而存在。一个进程中可以包含多个线程,多线程可以共享一块内存空间和一组系统资源,因此线程之间的切...

有图解有案例,我终于把Condition的原理讲透彻了

平时加解锁都是直接使用Synchronized关键字来实现的,简单好用,为啥还要引用ReentrantLock呢?为了解决小伙伴的疑问,我们来对两者做个简单的比较吧:相同点两者都是“可重入锁”,即当前...

白话DUBBO原理,通俗易记,再也不怕面试时讲不清楚了

现在的各种面试免不了要问些中间件,尤其是互联网公司,更注重获选人对中间件的掌握情况。在中间件中,有一大类是关于RPC框架的,Dubbo即是阿里出品的一款很著名的RPC中间件,很多互联网公司都在用,面试...

Java 最细的集合类总结(java常用的集合类有哪些)

数据结构作为每一个开发者不可回避的问题,而Java对于不同的数据结构提供了非常成熟的实现,这一个又一个实现既是面试中的难点,也是工作中必不可少的工具,在此,笔者经历漫长的剖析,将其抽丝剥茧的呈现出...

详解Java异常(Exception)处理及常见异常

很多事件并非总是按照人们自己设计意愿顺利发展的,经常出现这样那样的异常情况。例如:你计划周末郊游,计划从家里出发→到达目的→游泳→烧烤→回家。但天有不测风云,当你准备烧烤时候突然天降大雨,只能终止郊...

为什么阿里强制要求不要在foreach循环里进行元素remove和add操作

在阅读《阿里巴巴Java开发手册》时,发现有一条关于在foreach循环里进行元素的remove/add操作的规约,具体内容如下:错误演示我们首先在IDEA中编写一个在foreach循...

SpringBoot条件化配置(@Conditional)全面解析与实战指南

一、条件化配置基础概念1.1什么是条件化配置条件化配置是Spring框架提供的一种基于特定条件来决定是否注册Bean或加载配置的机制。在SpringBoot中,这一机制通过@Conditional...