百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

Disruptor—3.核心源码实现分析一

liuian 2025-06-15 17:36 69 浏览

大纲

1.Disruptor的生产者源码分析

2.Disruptor的消费者源码分析

3.Disruptor的WaitStrategy等待策略分析

4.Disruptor的高性能原因

5.Disruptor高性能之数据结构(内存预加载机制)

6.Disruptor高性能之内核(使用单线程写)

7.Disruptor高性能之系统内存优化(内存屏障)

8.Disruptor高性能之系统缓存优化(消除伪共享)

9.Disruptor高性能之序号获取优化(自旋 + CAS)


1.Disruptor的生产者源码分析

(1)通过Sequence序号发布消息

(2)通过Translator事件转换器发布消息


(1)通过Sequence序号发布消息

生产者可以先从RingBuffer中获取一个可用的Sequence序号,然后再根据该Sequence序号从RingBuffer的环形数组中获取对应的元素,接着对该元素进行赋值替换,最后调用RingBuffer的publish()方法设置当前生产者的Sequence序号来完成事件消息的发布。

//注意:这里使用的版本是3.4.4
//单生产者单消费者的使用示例
public class Main {
    public static void main(String[] args) {
        //参数准备
        OrderEventFactory orderEventFactory = new OrderEventFactory();
        int ringBufferSize = 4;
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  
        //参数一:eventFactory,消息(Event)工厂对象
        //参数二:ringBufferSize,容器的长度
        //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler
        //参数四:ProducerType,单生产者还是多生产者
        //参数五:waitStrategy,等待策略
        //1.实例化Disruptor对象
        Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
            orderEventFactory,
            ringBufferSize,
            executor,
            ProducerType.SINGLE,
            new BlockingWaitStrategy()
        );
  
        //2.添加Event处理器,用于处理事件
        //也就是构建Disruptor与消费者的一个关联关系
        disruptor.handleEventsWith(new OrderEventHandler());
  
        //3.启动Disruptor
        disruptor.start();
  
        //4.获取实际存储数据的容器: RingBuffer
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        OrderEventProducer producer = new OrderEventProducer(ringBuffer);
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long i = 0; i < 5; i++) {
            bb.putLong(0, i);
            //向容器中投递数据
            producer.sendData(bb);
        }
        disruptor.shutdown();
        executor.shutdown();
    }
}

public class OrderEventProducer {
    private RingBuffer<OrderEvent> ringBuffer;
    
    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    
    public void sendData(ByteBuffer data) {
        //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号
        long sequence = ringBuffer.next();
        try {
            //2.根据这个序号, 找到具体的"OrderEvent"元素
            //注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
            OrderEvent event = ringBuffer.get(sequence);
            //3.进行实际的赋值处理
            event.setValue(data.getLong(0));
        } finally {
            //4.提交发布操作
            ringBuffer.publish(sequence);
        }
    }
}

public class OrderEventHandler implements EventHandler<OrderEvent> {
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
        Thread.sleep(1000);
        System.err.println("消费者: " + event.getValue());
    }
}
//多生产者多消费者的使用示例
public class Main {
    public static void main(String[] args) throws InterruptedException {
        //1.创建RingBuffer
        RingBuffer<Order> ringBuffer = RingBuffer.create(
            ProducerType.MULTI,//多生产者
            new EventFactory<Order>() {
                public Order newInstance() {
                    return new Order();
                }
            },
            1024 * 1024,
            new YieldingWaitStrategy()
        );

        //2.通过ringBuffer创建一个屏障
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

        //3.创建消费者数组,每个消费者Consumer都需要实现WorkHandler接口
        Consumer[] consumers = new Consumer[10];
        for (int i = 0; i < consumers.length; i++) {
            consumers[i] = new Consumer("C" + i);
        }

        //4.构建多消费者工作池WorkerPool,因为多消费者模式下需要使用WorkerPool
        WorkerPool<Order> workerPool = new WorkerPool<Order>(
            ringBuffer,
            sequenceBarrier,
            new EventExceptionHandler(),
            consumers
        );

        //5.设置多个消费者的sequence序号,用于单独统计每个消费者的消费进度, 并且设置到RingBuffer中
        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());

        //6.启动workerPool
        workerPool.start(Executors.newFixedThreadPool(5));

        final CountDownLatch latch = new CountDownLatch(1);
        for (int i = 0; i < 100; i++) {
            final Producer producer = new Producer(ringBuffer);
            new Thread(new Runnable() {
                public void run() {
                    try {
                        latch.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    for (int j = 0; j < 100; j++) {
                        producer.sendData(UUID.randomUUID().toString());
                    }
                }
            }).start();
        }

        Thread.sleep(2000);
        System.err.println("----------线程创建完毕,开始生产数据----------");
        latch.countDown();
        Thread.sleep(10000);
        System.err.println("任务总数:" + consumers[2].getCount());
    }
}

public class Producer {
    private RingBuffer<Order> ringBuffer;
    
    public Producer(RingBuffer<Order> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void sendData(String uuid) {
        //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号
        long sequence = ringBuffer.next();
        try {
            //2.根据这个序号, 找到具体的"Order"元素
            //注意:此时获取的Order对象是一个没有被赋值的"空对象"
            Order order = ringBuffer.get(sequence);
            //3.进行实际的赋值处理
            order.setId(uuid);
        } finally {
            //4.提交发布操作
            ringBuffer.publish(sequence);
        }
    }
}

public class Consumer implements WorkHandler<Order> {
    private static AtomicInteger count = new AtomicInteger(0);
    private String consumerId;
    private Random random = new Random();

    public Consumer(String consumerId) {
        this.consumerId = consumerId;
    }

    public void onEvent(Order event) throws Exception {
        Thread.sleep(1 * random.nextInt(5));
        System.err.println("当前消费者: " + this.consumerId + ", 消费信息ID: " + event.getId());
        count.incrementAndGet();
    }

    public int getCount() {
        return count.get();
    }
}

其中,RingBuffer的publish(sequence)方法会调用Sequencer接口的publish()方法设置当前生产者的Sequence序号

abstract class RingBufferPad {
    protected long p1, p2, p3, p4, p5, p6, p7;
}

abstract class RingBufferFields<E> extends RingBufferPad {
    ...
    private static final Unsafe UNSAFE = Util.getUnsafe();
    private final long indexMask;

    //环形数组存储事件消息
    private final Object[] entries;
    protected final int bufferSize;

    //RingBuffer的sequencer属性代表了当前线程对应的生产者
    protected final Sequencer sequencer;
    
    RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();
        if (bufferSize < 1) {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        this.indexMask = bufferSize - 1;
        //初始化数组
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        //内存预加载
        fill(eventFactory);
    }
    
    private void fill(EventFactory<E> eventFactory) {
        for (int i = 0; i < bufferSize; i++) {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }
    
    protected final E elementAt(long sequence) {
        return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
    }
    ...
}

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {
    protected long p1, p2, p3, p4, p5, p6, p7;
    ...
    
    //Increment and return the next sequence for the ring buffer.  
    //Calls of this method should ensure that they always publish the sequence afterward.  
    //E.g.
    //long sequence = ringBuffer.next();
    //try {
    //    Event e = ringBuffer.get(sequence);
    //    //Do some work with the event.
    //} finally {
    //    ringBuffer.publish(sequence);
    //}
    //@return The next sequence to publish to.
    //@see RingBuffer#publish(long)
    //@see RingBuffer#get(long)
    @Override
    public long next() {
        return sequencer.next();
    }
    
    //Publish the specified sequence.
    //This action marks this particular message as being available to be read.
    //@param sequence the sequence to publish.
    @Override
    public void publish(long sequence) {
        sequencer.publish(sequence);
    }
    
    //Get the event for a given sequence in the RingBuffer.
    //This call has 2 uses.  
    //Firstly use this call when publishing to a ring buffer.
    //After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long).
    //Secondly use this call when consuming data from the ring buffer.  
    //After calling SequenceBarrier#waitFor(long) call this method with any value greater than that 
    //your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method.
    //@param sequence for the event
    //@return the event for the given sequence
    @Override
    public E get(long sequence) {
        //调用父类RingBufferFields的elementAt()方法
        return elementAt(sequence);
    }
    ...
}

RingBuffer的sequencer属性会在创建RingBuffer对象时传入,而创建RingBuffer对象的时机则是在初始化Disruptor的时候。


Disruptor的构造方法中,会调用RingBuffer的create()方法,RingBuffer的create()方法会根据不同的生产者类型来初始化sequencer属性


由生产者线程通过new创建的Sequencer接口实现类的实例就是一个生产者。单生产者的线程执行上面的main()方法时,会创建一个单生产者Sequencer实例来代表生产者。多生产者的线程执行如下的main()方法时,会创建一个多生产者Sequencer实例来代表生产者

public class Disruptor<T> {
    private final RingBuffer<T> ringBuffer;
    private final Executor executor;
    private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
    private final AtomicBoolean started = new AtomicBoolean(false);
    private ExceptionHandler<? super T> exceptionHandler;
    ...
    
    //Create a new Disruptor.
    //@param eventFactory   the factory to create events in the ring buffer.
    //@param ringBufferSize the size of the ring buffer, must be power of 2.
    //@param executor       an Executor to execute event processors.
    //@param producerType   the claim strategy to use for the ring buffer.
    //@param waitStrategy   the wait strategy to use for the ring buffer.
    public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) {
        this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);
    }
    
    private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) {
        this.ringBuffer = ringBuffer;
        this.executor = executor;
    }
    ...
}

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {
    protected long p1, p2, p3, p4, p5, p6, p7;
    ...
    
    //Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
    //@param producerType producer type to use ProducerType.
    //@param factory used to create events within the ring buffer.
    //@param bufferSize number of elements to create within the ring buffer.
    //@param waitStrategy used to determine how to wait for new elements to become available.
    public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
        switch (producerType) {
            case SINGLE:
                //单生产者模式下的当前生产者是一个SingleProducerSequencer实例
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                //多生产者模式下的当前生产者是一个MultiProducerSequencer实例
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
        }
    }
    
    //Create a new single producer RingBuffer with the specified wait strategy.
    //@param <E> Class of the event stored in the ring buffer.
    //@param factory      used to create the events within the ring buffer.
    //@param bufferSize   number of elements to create within the ring buffer.
    //@param waitStrategy used to determine how to wait for new elements to become available.
    //@return a constructed ring buffer.
    public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
        return new RingBuffer<E>(factory, sequencer);
    }
    
    //Create a new multiple producer RingBuffer with the specified wait strategy.
    //@param <E> Class of the event stored in the ring buffer.
    //@param factory      used to create the events within the ring buffer.
    //@param bufferSize   number of elements to create within the ring buffer.
    //@param waitStrategy used to determine how to wait for new elements to become available.
    //@return a constructed ring buffer.
    public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
        MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
        return new RingBuffer<E>(factory, sequencer);
    }
    
    //Construct a RingBuffer with the full option set.
    //@param eventFactory to newInstance entries for filling the RingBuffer
    //@param sequencer    sequencer to handle the ordering of events moving through the RingBuffer.
    RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {
        super(eventFactory, sequencer);
    }
    ...
}

abstract class RingBufferPad {
    protected long p1, p2, p3, p4, p5, p6, p7;
}

abstract class RingBufferFields<E> extends RingBufferPad {
    ...
    private final long indexMask;
    //环形数组存储事件消息
    private final Object[] entries;
    protected final int bufferSize;
    //RingBuffer的sequencer属性代表了当前线程对应的生产者
    protected final Sequencer sequencer;
    
    RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();
        if (bufferSize < 1) {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        this.indexMask = bufferSize - 1;
        //初始化数组
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        //内存预加载
        fill(eventFactory);
    }
    
    private void fill(EventFactory<E> eventFactory) {
        for (int i = 0; i < bufferSize; i++) {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }
    ...
}

SingleProducerSequencer的publish()方法在发布事件消息时,首先会设置当前生产者的Sequence,然后会通过等待策略通知阻塞的消费者

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {
    ...
    //Publish the specified sequence.
    //This action marks this particular message as being available to be read.
    //@param sequence the sequence to publish.
    @Override
    public void publish(long sequence) {
        sequencer.publish(sequence);
    }
    ...
}

public abstract class AbstractSequencer implements Sequencer {
    private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
        AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
    //环形数组的大小
    protected final int bufferSize;
    //等待策略
    protected final WaitStrategy waitStrategy;
    //当前生产者的进度
    protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    //每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler)
    //这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时,
    //由RingBuffer的addGatingSequences()方法进行添加
    protected volatile Sequence[] gatingSequences = new Sequence[0];
    ...
    
    //Create with the specified buffer size and wait strategy.
    //@param bufferSize The total number of entries, must be a positive power of 2.
    //@param waitStrategy
    public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) {
        if (bufferSize < 1) {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        this.bufferSize = bufferSize;
        this.waitStrategy = waitStrategy;
    }
    ...
}

abstract class SingleProducerSequencerPad extends AbstractSequencer {
    protected long p1, p2, p3, p4, p5, p6, p7;
    
    public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) {
        super(bufferSize, waitStrategy);
    }
}

abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad {
    public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) {
        super(bufferSize, waitStrategy);
    }
    
    //表示生产者的当前序号,值为-1
    protected long nextValue = Sequence.INITIAL_VALUE;
    //表示消费者的最小序号,值为-1
    protected long cachedValue = Sequence.INITIAL_VALUE;
}

public final class SingleProducerSequencer extends SingleProducerSequencerFields {
    protected long p1, p2, p3, p4, p5, p6, p7;
    
    //Construct a Sequencer with the selected wait strategy and buffer size.
    //@param bufferSize   the size of the buffer that this will sequence over.
    //@param waitStrategy for those waiting on sequences.
    public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) {
        super(bufferSize, waitStrategy);
    }

    @Override
    public void publish(long sequence) {
        //设置当前生产者的进度,cursor代表了当前生产者的Sequence
        cursor.set(sequence);
        //通过等待策略通知阻塞的消费者
        waitStrategy.signalAllWhenBlocking();
    }
    
    @Override
    public long next() {
        return next(1);
    }
    
    @Override
    public long next(int n) {
        if (n < 1) {
            throw new IllegalArgumentException("n must be > 0");
        }
        long nextValue = this.nextValue;
        long nextSequence = nextValue + n;
        long wrapPoint = nextSequence - bufferSize;
        long cachedGatingSequence = this.cachedValue;
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
            long minSequence;
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
                LockSupport.parkNanos(1L); 
            }
            this.cachedValue = minSequence;
        }
        this.nextValue = nextSequence;
        return nextSequence;
    }
    ...
}

class LhsPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
    protected volatile long value;
}

class RhsPadding extends Value {
    protected long p9, p10, p11, p12, p13, p14, p15;
}

//Concurrent sequence class used for tracking the progress of the ring buffer and event processors.  
//Support a number of concurrent operations including CAS and order writes.
//Also attempts to be more efficient with regards to false sharing by adding padding around the volatile field.
public class Sequence extends RhsPadding {
    static final long INITIAL_VALUE = -1L;
    private static final Unsafe UNSAFE;
    private static final long VALUE_OFFSET;

    static {
        UNSAFE = Util.getUnsafe();
        VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
    }

    //Create a sequence initialised to -1.
    public Sequence() {
        this(INITIAL_VALUE);
    }

    //Create a sequence with a specified initial value.
    //@param initialValue The initial value for this sequence.
    public Sequence(final long initialValue) {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
    }

    //Perform a volatile read of this sequence's value.
    //@return The current value of the sequence.
    public long get() {
        return value;
    }

    //Perform an ordered write of this sequence.  
    //The intent is a Store/Store barrier between this write and any previous store.
    //@param value The new value for the sequence.
    public void set(final long value) {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
    }

    //Performs a volatile write of this sequence.  
    //The intent is a Store/Store barrier between this write and 
    //any previous write and a Store/Load barrier between this write and 
    //any subsequent volatile read.
    //@param value The new value for the sequence.
    public void setVolatile(final long value) {
        UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
    }

    //Perform a compare and set operation on the sequence.
    //@param expectedValue The expected current value.
    //@param newValue The value to update to.
    //@return true if the operation succeeds, false otherwise.
    public boolean compareAndSet(final long expectedValue, final long newValue) {
        return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
    }

    //Atomically increment the sequence by one.
    //@return The value after the increment
    public long incrementAndGet() {
        return addAndGet(1L);
    }

    //Atomically add the supplied value.
    //@param increment The value to add to the sequence.
    //@return The value after the increment.
    public long addAndGet(final long increment) {
        long currentValue;
        long newValue;
        do {
            currentValue = get();
            newValue = currentValue + increment;
        } while (!compareAndSet(currentValue, newValue));
        return newValue;
    }

    @Override
    public String toString() {
        return Long.toString(get());
    }
}

MultiProducerSequencer的publish()方法在发布事件消息时,则会通过UnSafe设置sequence在int数组中对应元素的值

public final class MultiProducerSequencer extends AbstractSequencer {
    private static final Unsafe UNSAFE = Util.getUnsafe();
    private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);
    private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);
    private final int[] availableBuffer;
    private final int indexMask;
    private final int indexShift;
    
    //Construct a Sequencer with the selected wait strategy and buffer size.
    //@param bufferSize   the size of the buffer that this will sequence over.
    //@param waitStrategy for those waiting on sequences.
    public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) {
        super(bufferSize, waitStrategy);
        availableBuffer = new int[bufferSize];
        indexMask = bufferSize - 1;
        indexShift = Util.log2(bufferSize);
        initialiseAvailableBuffer();
    }
    
    private void initialiseAvailableBuffer() {
        for (int i = availableBuffer.length - 1; i != 0; i--) {
            setAvailableBufferValue(i, -1);
        }
        setAvailableBufferValue(0, -1);
    }
    
    private void setAvailableBufferValue(int index, int flag) {
        long bufferAddress = (index * SCALE) + BASE;
        UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
    }

    @Override
    public void publish(final long sequence) {
        setAvailable(sequence);
        waitStrategy.signalAllWhenBlocking();
    }
    
    //The below methods work on the availableBuffer flag.
    //The prime reason is to avoid a shared sequence object between publisher threads.
    //(Keeping single pointers tracking start and end would require coordination between the threads).
    //--  Firstly we have the constraint that the delta between the cursor and minimum gating sequence 
    //will never be larger than the buffer size (the code in next/tryNext in the Sequence takes care of that).
    //-- Given that; take the sequence value and mask off the lower portion of the sequence 
    //as the index into the buffer (indexMask). (aka modulo operator)
    //-- The upper portion of the sequence becomes the value to check for availability.
    //ie: it tells us how many times around the ring buffer we've been (aka division)
    //-- Because we can't wrap without the gating sequences moving forward 
    //(i.e. the minimum gating sequence is effectively our last available position in the buffer), 
    //when we have new data and successfully claimed a slot we can simply write over the top.
    private void setAvailable(final long sequence) {
        setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
    }

    private int calculateIndex(final long sequence) {
        return ((int) sequence) & indexMask;
    }
    
    private int calculateAvailabilityFlag(final long sequence) {
        return (int) (sequence >>> indexShift);
    }
    
    @Override
    public long next() {
        return next(1);
    }
    
    @Override
    public long next(int n) {
        if (n < 1) {
            throw new IllegalArgumentException("n must be > 0");
        }
         
        long current;
        long next;
        do {
            current = cursor.get();
            next = current + n;
            long wrapPoint = next - bufferSize;
            long cachedGatingSequence = gatingSequenceCache.get();
  
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
                if (wrapPoint > gatingSequence) {
                    LockSupport.parkNanos(1); 
                    continue;
                }
                gatingSequenceCache.set(gatingSequence);
            } else if (cursor.compareAndSet(current, next)) {
                break;
            }
        } while (true);
        return next;
    }
    ...
}

(2)通过Translator事件转换器发布消息

生产者还可以直接调用RingBuffer的tryPublishEvent()方法来完成发布事件消息到RingBuffer。该方法首先会调用Sequencer接口的tryNext()方法获取sequence序号,然后根据该sequence序号从RingBuffer的环形数组中获取对应的元素,接着再调用RingBuffer的translateAndPublish()方法将事件消息赋值替换到该元素中,最后调用Sequencer接口的publish()方法设置当前生产者的sequence序号来完成事件消息的发布。

abstract class RingBufferPad {
    protected long p1, p2, p3, p4, p5, p6, p7;
}

abstract class RingBufferFields<E> extends RingBufferPad {
    ...
    private static final Unsafe UNSAFE = Util.getUnsafe();
    private final long indexMask;
    //环形数组存储事件消息
    private final Object[] entries;
    protected final int bufferSize;
    //RingBuffer的sequencer属性代表了当前线程对应的生产者
    protected final Sequencer sequencer;
    
    RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();
        if (bufferSize < 1) {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        this.indexMask = bufferSize - 1;
        //初始化数组
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        //内存预加载
        fill(eventFactory);
    }
    
    private void fill(EventFactory<E> eventFactory) {
        for (int i = 0; i < bufferSize; i++) {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }
    
    protected final E elementAt(long sequence) {
        return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
    }
    ...
}

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {
    //值为-1
    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
    protected long p1, p2, p3, p4, p5, p6, p7;

    //Construct a RingBuffer with the full option set.
    //@param eventFactory to newInstance entries for filling the RingBuffer
    //@param sequencer    sequencer to handle the ordering of events moving through the RingBuffer.
    RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {
        super(eventFactory, sequencer);
    }
    
    @Override
    public boolean tryPublishEvent(EventTranslator<E> translator) {
        try {
            final long sequence = sequencer.tryNext();
            translateAndPublish(translator, sequence);
            return true;
        } catch (InsufficientCapacityException e) {
            return false;
        }
    }
    
    private void translateAndPublish(EventTranslator<E> translator, long sequence) {
        try {
            translator.translateTo(get(sequence), sequence);
        } finally {
            sequencer.publish(sequence);
        }
    }
    
    //Get the event for a given sequence in the RingBuffer.
    //This call has 2 uses.  
    //Firstly use this call when publishing to a ring buffer.
    //After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long).
    //Secondly use this call when consuming data from the ring buffer.  
    //After calling SequenceBarrier#waitFor(long) call this method with any value greater than that 
    //your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method.
    //@param sequence for the event
    //@return the event for the given sequence
    @Override
    public E get(long sequence) {
        //调用父类RingBufferFields的elementAt()方法
        return elementAt(sequence);
    }
    ...
}

public abstract class AbstractSequencer implements Sequencer {
    private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
        AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
    //环形数组的大小
    protected final int bufferSize;
    //等待策略
    protected final WaitStrategy waitStrategy;
    //当前生产者的进度
    protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    //每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler)
    //这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时,
    //由RingBuffer的addGatingSequences()方法进行添加
    protected volatile Sequence[] gatingSequences = new Sequence[0];
    ...
    
    //Create with the specified buffer size and wait strategy.
    //@param bufferSize The total number of entries, must be a positive power of 2.
    //@param waitStrategy
    public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) {
        if (bufferSize < 1) {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        this.bufferSize = bufferSize;
        this.waitStrategy = waitStrategy;
    }
    ...
}

public final class SingleProducerSequencer extends SingleProducerSequencerFields {
    protected long p1, p2, p3, p4, p5, p6, p7;
    
    //Construct a Sequencer with the selected wait strategy and buffer size.
    //@param bufferSize   the size of the buffer that this will sequence over.
    //@param waitStrategy for those waiting on sequences.
    public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) {
        super(bufferSize, waitStrategy);
    }
    ...
    
    @Override
    public long tryNext() throws InsufficientCapacityException {
        return tryNext(1);
    }

    @Override
    public long tryNext(int n) throws InsufficientCapacityException {
        if (n < 1) {
            throw new IllegalArgumentException("n must be > 0");
        }
        if (!hasAvailableCapacity(n, true)) {
            throw InsufficientCapacityException.INSTANCE;
        }
        long nextSequence = this.nextValue += n;
        return nextSequence;
    }
    
    private boolean hasAvailableCapacity(int requiredCapacity, boolean doStore) {
        long nextValue = this.nextValue;
        long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
        long cachedGatingSequence = this.cachedValue;
  
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
            if (doStore) {
                cursor.setVolatile(nextValue);//StoreLoad fence
            }
            long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
            this.cachedValue = minSequence;
            if (wrapPoint > minSequence) {
                return false;
            }
        }
        return true;
    }
    
    @Override
    public void publish(long sequence) {
        //设置当前生产者的sequence
        cursor.set(sequence);
        //通过等待策略通知阻塞的消费者
        waitStrategy.signalAllWhenBlocking();
    }
    ...
}

abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad {
    SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) {
        super(bufferSize, waitStrategy);
    }
    
    //表示生产者的当前序号,值为-1
    protected long nextValue = Sequence.INITIAL_VALUE;
    //表示消费者的最小序号,值为-1
    protected long cachedValue = Sequence.INITIAL_VALUE;
}

abstract class SingleProducerSequencerPad extends AbstractSequencer {
    protected long p1, p2, p3, p4, p5, p6, p7;
    
    SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) {
        super(bufferSize, waitStrategy);
    }
}

public final class MultiProducerSequencer extends AbstractSequencer {
    ...
    @Override
    public long tryNext() throws InsufficientCapacityException {
        return tryNext(1);
    }

    @Override
    public long tryNext(int n) throws InsufficientCapacityException {
        if (n < 1) {
            throw new IllegalArgumentException("n must be > 0");
        }
        long current;
        long next;
        do {
            current = cursor.get();
            next = current + n;
            if (!hasAvailableCapacity(gatingSequences, n, current)) {
                throw InsufficientCapacityException.INSTANCE;
            }
        } while (!cursor.compareAndSet(current, next));
        return next;
    }
    
    private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue) {
        long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;
        long cachedGatingSequence = gatingSequenceCache.get();
  
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue) {
            long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);
            gatingSequenceCache.set(minSequence);
            if (wrapPoint > minSequence) {
                return false;
            }
        }
        return true;
    }
    
    @Override
    public void publish(final long sequence) {
        setAvailable(sequence);
        waitStrategy.signalAllWhenBlocking();
    }
    ...
}

//Implementations translate (write) data representations into events claimed from the RingBuffer.
//When publishing to the RingBuffer, provide an EventTranslator. 
//The RingBuffer will select the next available event by sequence and provide it to the EventTranslator (which should update the event), 
//before publishing the sequence update.
//@param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
public interface EventTranslator<T> {
    //Translate a data representation into fields set in given event
    //@param event    into which the data should be translated.
    //@param sequence that is assigned to event.
    void translateTo(T event, long sequence);
}

相关推荐

驱动网卡(怎么从新驱动网卡)
驱动网卡(怎么从新驱动网卡)

网卡一般是指为电脑主机提供有线无线网络功能的适配器。而网卡驱动指的就是电脑连接识别这些网卡型号的桥梁。网卡只有打上了网卡驱动才能正常使用。并不是说所有的网卡一插到电脑上面就能进行数据传输了,他都需要里面芯片组的驱动文件才能支持他进行数据传输...

2026-01-30 00:37 liuian

win10更新助手装系统(微软win10更新助手)

1、点击首页“系统升级”的按钮,给出弹框,告诉用户需要上传IMEI码才能使用升级服务。同时给出同意和取消按钮。华为手机助手2、点击同意,则进入到“系统升级”功能华为手机助手华为手机助手3、在检测界面,...

windows11专业版密钥最新(windows11专业版激活码永久)

 Windows11专业版的正版密钥,我们是对windows的激活所必备的工具。该密钥我们可以通过微软商城或者通过计算机的硬件供应商去购买获得。获得了windows11专业版的正版密钥后,我...

手机删过的软件恢复(手机删除过的软件怎么恢复)
手机删过的软件恢复(手机删除过的软件怎么恢复)

操作步骤:1、首先,我们需要先打开手机。然后在许多图标中找到带有[文件管理]文本的图标,然后单击“文件管理”进入页面。2、进入页面后,我们将在顶部看到一行文本:手机,最新信息,文档,视频,图片,音乐,收藏,最后是我们正在寻找的[更多],单击...

2026-01-29 23:55 liuian

一键ghost手动备份系统步骤(一键ghost 备份)

  步骤1、首先把装有一键GHOST装系统的U盘插在电脑上,然后打开电脑马上按F2或DEL键入BIOS界面,然后就选择BOOT打USDHDD模式选择好,然后按F10键保存,电脑就会马上重启。  步骤...

怎么创建局域网(怎么创建局域网打游戏)

  1、购买路由器一台。进入路由器把dhcp功能打开  2、购买一台交换机。从路由器lan端口拉出一条网线查到交换机的任意一个端口上。  3、两台以上电脑。从交换机任意端口拉出网线插到电脑上(电脑设置...

精灵驱动器官方下载(精灵驱动手机版下载)

是的。驱动精灵是一款集驱动管理和硬件检测于一体的、专业级的驱动管理和维护工具。驱动精灵为用户提供驱动备份、恢复、安装、删除、在线更新等实用功能。1、全新驱动精灵2012引擎,大幅提升硬件和驱动辨识能力...

一键还原系统步骤(一键还原系统有哪些)

1、首先需要下载安装一下Windows一键还原程序,在安装程序窗口中,点击“下一步”,弹出“用户许可协议”窗口,选择“我同意该许可协议的条款”,并点击“下一步”。  2、在弹出的“准备安装”窗口中,可...

电脑加速器哪个好(电脑加速器哪款好)

我认为pp加速器最好用,飞速土豆太懒,急速酷六根本不工作。pp加速器什么网页都加速,太任劳任怨了!以上是个人观点,具体性能请自己试。ps:我家电脑性能很好。迅游加速盒子是可以加速电脑的。因为有过之...

任何u盘都可以做启动盘吗(u盘必须做成启动盘才能装系统吗)

是的,需要注意,U盘的大小要在4G以上,最好是8G以上,因为启动盘里面需要装系统,内存小的话,不能用来安装系统。内存卡或者U盘或者移动硬盘都可以用来做启动盘安装系统。普通的U盘就可以,不过最好U盘...

u盘怎么恢复文件(u盘文件恢复的方法)

开360安全卫士,点击上面的“功能大全”。点击文件恢复然后点击“数据”下的“文件恢复”功能。选择驱动接着选择需要恢复的驱动,选择接入的U盘。点击开始扫描选好就点击中间的“开始扫描”,开始扫描U盘数据。...

系统虚拟内存太低怎么办(系统虚拟内存占用过高什么原因)

1.检查系统虚拟内存使用情况,如果发现有大量的空闲内存,可以尝试释放一些不必要的进程,以释放内存空间。2.如果系统虚拟内存使用率较高,可以尝试增加系统虚拟内存的大小,以便更多的应用程序可以使用更多...

剪贴板权限设置方法(剪贴板访问权限)
剪贴板权限设置方法(剪贴板访问权限)

1、首先打开iphone手机,触碰并按住单词或图像直到显示选择选项。2、其次,然后选取“拷贝”或“剪贴板”。3、勾选需要的“权限”,最后选择开启,即可完成苹果剪贴板权限设置。仅参考1.打开苹果手机设置按钮,点击【通用】。2.点击【键盘】,再...

2026-01-29 21:37 liuian

平板系统重装大师(平板重装win系统)

如果你的平板开不了机,但可以连接上电脑,那就能好办,楼主下载安装个平板刷机王到你的个人电脑上,然后连接你的平板,平板刷机王会自动识别你的平板,平板刷机王上有你平板的我刷机包,楼主点击下载一个,下载完成...

联想官网售后服务网点(联想官网售后服务热线)

联想3c服务中心是联想旗下的官方售后,是基于互联网O2O模式开发的全新服务平台。可以为终端用户提供多品牌手机、电脑以及其他3C类产品的维修、保养和保险服务。根据客户需求层次,联想服务针对个人及家庭客户...