Disruptor—3.核心源码实现分析一
liuian 2025-06-15 17:36 5 浏览
大纲
1.Disruptor的生产者源码分析
2.Disruptor的消费者源码分析
3.Disruptor的WaitStrategy等待策略分析
4.Disruptor的高性能原因
5.Disruptor高性能之数据结构(内存预加载机制)
6.Disruptor高性能之内核(使用单线程写)
7.Disruptor高性能之系统内存优化(内存屏障)
8.Disruptor高性能之系统缓存优化(消除伪共享)
9.Disruptor高性能之序号获取优化(自旋 + CAS)
1.Disruptor的生产者源码分析
(1)通过Sequence序号发布消息
(2)通过Translator事件转换器发布消息
(1)通过Sequence序号发布消息
生产者可以先从RingBuffer中获取一个可用的Sequence序号,然后再根据该Sequence序号从RingBuffer的环形数组中获取对应的元素,接着对该元素进行赋值替换,最后调用RingBuffer的publish()方法设置当前生产者的Sequence序号来完成事件消息的发布。
//注意:这里使用的版本是3.4.4
//单生产者单消费者的使用示例
public class Main {
public static void main(String[] args) {
//参数准备
OrderEventFactory orderEventFactory = new OrderEventFactory();
int ringBufferSize = 4;
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
//参数一:eventFactory,消息(Event)工厂对象
//参数二:ringBufferSize,容器的长度
//参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler
//参数四:ProducerType,单生产者还是多生产者
//参数五:waitStrategy,等待策略
//1.实例化Disruptor对象
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
orderEventFactory,
ringBufferSize,
executor,
ProducerType.SINGLE,
new BlockingWaitStrategy()
);
//2.添加Event处理器,用于处理事件
//也就是构建Disruptor与消费者的一个关联关系
disruptor.handleEventsWith(new OrderEventHandler());
//3.启动Disruptor
disruptor.start();
//4.获取实际存储数据的容器: RingBuffer
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
OrderEventProducer producer = new OrderEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long i = 0; i < 5; i++) {
bb.putLong(0, i);
//向容器中投递数据
producer.sendData(bb);
}
disruptor.shutdown();
executor.shutdown();
}
}
public class OrderEventProducer {
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(ByteBuffer data) {
//1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号
long sequence = ringBuffer.next();
try {
//2.根据这个序号, 找到具体的"OrderEvent"元素
//注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
OrderEvent event = ringBuffer.get(sequence);
//3.进行实际的赋值处理
event.setValue(data.getLong(0));
} finally {
//4.提交发布操作
ringBuffer.publish(sequence);
}
}
}
public class OrderEventHandler implements EventHandler<OrderEvent> {
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(1000);
System.err.println("消费者: " + event.getValue());
}
}
//多生产者多消费者的使用示例
public class Main {
public static void main(String[] args) throws InterruptedException {
//1.创建RingBuffer
RingBuffer<Order> ringBuffer = RingBuffer.create(
ProducerType.MULTI,//多生产者
new EventFactory<Order>() {
public Order newInstance() {
return new Order();
}
},
1024 * 1024,
new YieldingWaitStrategy()
);
//2.通过ringBuffer创建一个屏障
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//3.创建消费者数组,每个消费者Consumer都需要实现WorkHandler接口
Consumer[] consumers = new Consumer[10];
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new Consumer("C" + i);
}
//4.构建多消费者工作池WorkerPool,因为多消费者模式下需要使用WorkerPool
WorkerPool<Order> workerPool = new WorkerPool<Order>(
ringBuffer,
sequenceBarrier,
new EventExceptionHandler(),
consumers
);
//5.设置多个消费者的sequence序号,用于单独统计每个消费者的消费进度, 并且设置到RingBuffer中
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
//6.启动workerPool
workerPool.start(Executors.newFixedThreadPool(5));
final CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < 100; i++) {
final Producer producer = new Producer(ringBuffer);
new Thread(new Runnable() {
public void run() {
try {
latch.await();
} catch (Exception e) {
e.printStackTrace();
}
for (int j = 0; j < 100; j++) {
producer.sendData(UUID.randomUUID().toString());
}
}
}).start();
}
Thread.sleep(2000);
System.err.println("----------线程创建完毕,开始生产数据----------");
latch.countDown();
Thread.sleep(10000);
System.err.println("任务总数:" + consumers[2].getCount());
}
}
public class Producer {
private RingBuffer<Order> ringBuffer;
public Producer(RingBuffer<Order> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(String uuid) {
//1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号
long sequence = ringBuffer.next();
try {
//2.根据这个序号, 找到具体的"Order"元素
//注意:此时获取的Order对象是一个没有被赋值的"空对象"
Order order = ringBuffer.get(sequence);
//3.进行实际的赋值处理
order.setId(uuid);
} finally {
//4.提交发布操作
ringBuffer.publish(sequence);
}
}
}
public class Consumer implements WorkHandler<Order> {
private static AtomicInteger count = new AtomicInteger(0);
private String consumerId;
private Random random = new Random();
public Consumer(String consumerId) {
this.consumerId = consumerId;
}
public void onEvent(Order event) throws Exception {
Thread.sleep(1 * random.nextInt(5));
System.err.println("当前消费者: " + this.consumerId + ", 消费信息ID: " + event.getId());
count.incrementAndGet();
}
public int getCount() {
return count.get();
}
}
其中,RingBuffer的publish(sequence)方法会调用Sequencer接口的publish()方法来设置当前生产者的Sequence序号。
abstract class RingBufferPad {
protected long p1, p2, p3, p4, p5, p6, p7;
}
abstract class RingBufferFields<E> extends RingBufferPad {
...
private static final Unsafe UNSAFE = Util.getUnsafe();
private final long indexMask;
//环形数组存储事件消息
private final Object[] entries;
protected final int bufferSize;
//RingBuffer的sequencer属性代表了当前线程对应的生产者
protected final Sequencer sequencer;
RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1) {
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.indexMask = bufferSize - 1;
//初始化数组
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
//内存预加载
fill(eventFactory);
}
private void fill(EventFactory<E> eventFactory) {
for (int i = 0; i < bufferSize; i++) {
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
protected final E elementAt(long sequence) {
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
...
}
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {
protected long p1, p2, p3, p4, p5, p6, p7;
...
//Increment and return the next sequence for the ring buffer.
//Calls of this method should ensure that they always publish the sequence afterward.
//E.g.
//long sequence = ringBuffer.next();
//try {
// Event e = ringBuffer.get(sequence);
// //Do some work with the event.
//} finally {
// ringBuffer.publish(sequence);
//}
//@return The next sequence to publish to.
//@see RingBuffer#publish(long)
//@see RingBuffer#get(long)
@Override
public long next() {
return sequencer.next();
}
//Publish the specified sequence.
//This action marks this particular message as being available to be read.
//@param sequence the sequence to publish.
@Override
public void publish(long sequence) {
sequencer.publish(sequence);
}
//Get the event for a given sequence in the RingBuffer.
//This call has 2 uses.
//Firstly use this call when publishing to a ring buffer.
//After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long).
//Secondly use this call when consuming data from the ring buffer.
//After calling SequenceBarrier#waitFor(long) call this method with any value greater than that
//your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method.
//@param sequence for the event
//@return the event for the given sequence
@Override
public E get(long sequence) {
//调用父类RingBufferFields的elementAt()方法
return elementAt(sequence);
}
...
}
RingBuffer的sequencer属性会在创建RingBuffer对象时传入,而创建RingBuffer对象的时机则是在初始化Disruptor的时候。
在Disruptor的构造方法中,会调用RingBuffer的create()方法,RingBuffer的create()方法会根据不同的生产者类型来初始化sequencer属性。
由生产者线程通过new创建的Sequencer接口实现类的实例就是一个生产者。单生产者的线程执行上面的main()方法时,会创建一个单生产者Sequencer实例来代表生产者。多生产者的线程执行如下的main()方法时,会创建一个多生产者Sequencer实例来代表生产者。
public class Disruptor<T> {
private final RingBuffer<T> ringBuffer;
private final Executor executor;
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
private final AtomicBoolean started = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler;
...
//Create a new Disruptor.
//@param eventFactory the factory to create events in the ring buffer.
//@param ringBufferSize the size of the ring buffer, must be power of 2.
//@param executor an Executor to execute event processors.
//@param producerType the claim strategy to use for the ring buffer.
//@param waitStrategy the wait strategy to use for the ring buffer.
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) {
this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);
}
private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) {
this.ringBuffer = ringBuffer;
this.executor = executor;
}
...
}
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {
protected long p1, p2, p3, p4, p5, p6, p7;
...
//Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
//@param producerType producer type to use ProducerType.
//@param factory used to create events within the ring buffer.
//@param bufferSize number of elements to create within the ring buffer.
//@param waitStrategy used to determine how to wait for new elements to become available.
public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
switch (producerType) {
case SINGLE:
//单生产者模式下的当前生产者是一个SingleProducerSequencer实例
return createSingleProducer(factory, bufferSize, waitStrategy);
case MULTI:
//多生产者模式下的当前生产者是一个MultiProducerSequencer实例
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
throw new IllegalStateException(producerType.toString());
}
}
//Create a new single producer RingBuffer with the specified wait strategy.
//@param <E> Class of the event stored in the ring buffer.
//@param factory used to create the events within the ring buffer.
//@param bufferSize number of elements to create within the ring buffer.
//@param waitStrategy used to determine how to wait for new elements to become available.
//@return a constructed ring buffer.
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
//Create a new multiple producer RingBuffer with the specified wait strategy.
//@param <E> Class of the event stored in the ring buffer.
//@param factory used to create the events within the ring buffer.
//@param bufferSize number of elements to create within the ring buffer.
//@param waitStrategy used to determine how to wait for new elements to become available.
//@return a constructed ring buffer.
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
//Construct a RingBuffer with the full option set.
//@param eventFactory to newInstance entries for filling the RingBuffer
//@param sequencer sequencer to handle the ordering of events moving through the RingBuffer.
RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {
super(eventFactory, sequencer);
}
...
}
abstract class RingBufferPad {
protected long p1, p2, p3, p4, p5, p6, p7;
}
abstract class RingBufferFields<E> extends RingBufferPad {
...
private final long indexMask;
//环形数组存储事件消息
private final Object[] entries;
protected final int bufferSize;
//RingBuffer的sequencer属性代表了当前线程对应的生产者
protected final Sequencer sequencer;
RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1) {
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.indexMask = bufferSize - 1;
//初始化数组
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
//内存预加载
fill(eventFactory);
}
private void fill(EventFactory<E> eventFactory) {
for (int i = 0; i < bufferSize; i++) {
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
...
}
SingleProducerSequencer的publish()方法在发布事件消息时,首先会设置当前生产者的Sequence,然后会通过等待策略通知阻塞的消费者。
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {
...
//Publish the specified sequence.
//This action marks this particular message as being available to be read.
//@param sequence the sequence to publish.
@Override
public void publish(long sequence) {
sequencer.publish(sequence);
}
...
}
public abstract class AbstractSequencer implements Sequencer {
private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
//环形数组的大小
protected final int bufferSize;
//等待策略
protected final WaitStrategy waitStrategy;
//当前生产者的进度
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
//每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler)
//这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时,
//由RingBuffer的addGatingSequences()方法进行添加
protected volatile Sequence[] gatingSequences = new Sequence[0];
...
//Create with the specified buffer size and wait strategy.
//@param bufferSize The total number of entries, must be a positive power of 2.
//@param waitStrategy
public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) {
if (bufferSize < 1) {
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.bufferSize = bufferSize;
this.waitStrategy = waitStrategy;
}
...
}
abstract class SingleProducerSequencerPad extends AbstractSequencer {
protected long p1, p2, p3, p4, p5, p6, p7;
public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) {
super(bufferSize, waitStrategy);
}
}
abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad {
public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) {
super(bufferSize, waitStrategy);
}
//表示生产者的当前序号,值为-1
protected long nextValue = Sequence.INITIAL_VALUE;
//表示消费者的最小序号,值为-1
protected long cachedValue = Sequence.INITIAL_VALUE;
}
public final class SingleProducerSequencer extends SingleProducerSequencerFields {
protected long p1, p2, p3, p4, p5, p6, p7;
//Construct a Sequencer with the selected wait strategy and buffer size.
//@param bufferSize the size of the buffer that this will sequence over.
//@param waitStrategy for those waiting on sequences.
public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) {
super(bufferSize, waitStrategy);
}
@Override
public void publish(long sequence) {
//设置当前生产者的进度,cursor代表了当前生产者的Sequence
cursor.set(sequence);
//通过等待策略通知阻塞的消费者
waitStrategy.signalAllWhenBlocking();
}
@Override
public long next() {
return next(1);
}
@Override
public long next(int n) {
if (n < 1) {
throw new IllegalArgumentException("n must be > 0");
}
long nextValue = this.nextValue;
long nextSequence = nextValue + n;
long wrapPoint = nextSequence - bufferSize;
long cachedGatingSequence = this.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
long minSequence;
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
LockSupport.parkNanos(1L);
}
this.cachedValue = minSequence;
}
this.nextValue = nextSequence;
return nextSequence;
}
...
}
class LhsPadding {
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding {
protected volatile long value;
}
class RhsPadding extends Value {
protected long p9, p10, p11, p12, p13, p14, p15;
}
//Concurrent sequence class used for tracking the progress of the ring buffer and event processors.
//Support a number of concurrent operations including CAS and order writes.
//Also attempts to be more efficient with regards to false sharing by adding padding around the volatile field.
public class Sequence extends RhsPadding {
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
static {
UNSAFE = Util.getUnsafe();
VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
}
//Create a sequence initialised to -1.
public Sequence() {
this(INITIAL_VALUE);
}
//Create a sequence with a specified initial value.
//@param initialValue The initial value for this sequence.
public Sequence(final long initialValue) {
UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
}
//Perform a volatile read of this sequence's value.
//@return The current value of the sequence.
public long get() {
return value;
}
//Perform an ordered write of this sequence.
//The intent is a Store/Store barrier between this write and any previous store.
//@param value The new value for the sequence.
public void set(final long value) {
UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}
//Performs a volatile write of this sequence.
//The intent is a Store/Store barrier between this write and
//any previous write and a Store/Load barrier between this write and
//any subsequent volatile read.
//@param value The new value for the sequence.
public void setVolatile(final long value) {
UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
}
//Perform a compare and set operation on the sequence.
//@param expectedValue The expected current value.
//@param newValue The value to update to.
//@return true if the operation succeeds, false otherwise.
public boolean compareAndSet(final long expectedValue, final long newValue) {
return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
}
//Atomically increment the sequence by one.
//@return The value after the increment
public long incrementAndGet() {
return addAndGet(1L);
}
//Atomically add the supplied value.
//@param increment The value to add to the sequence.
//@return The value after the increment.
public long addAndGet(final long increment) {
long currentValue;
long newValue;
do {
currentValue = get();
newValue = currentValue + increment;
} while (!compareAndSet(currentValue, newValue));
return newValue;
}
@Override
public String toString() {
return Long.toString(get());
}
}
MultiProducerSequencer的publish()方法在发布事件消息时,则会通过UnSafe设置sequence在int数组中对应元素的值。
public final class MultiProducerSequencer extends AbstractSequencer {
private static final Unsafe UNSAFE = Util.getUnsafe();
private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);
private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);
private final int[] availableBuffer;
private final int indexMask;
private final int indexShift;
//Construct a Sequencer with the selected wait strategy and buffer size.
//@param bufferSize the size of the buffer that this will sequence over.
//@param waitStrategy for those waiting on sequences.
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) {
super(bufferSize, waitStrategy);
availableBuffer = new int[bufferSize];
indexMask = bufferSize - 1;
indexShift = Util.log2(bufferSize);
initialiseAvailableBuffer();
}
private void initialiseAvailableBuffer() {
for (int i = availableBuffer.length - 1; i != 0; i--) {
setAvailableBufferValue(i, -1);
}
setAvailableBufferValue(0, -1);
}
private void setAvailableBufferValue(int index, int flag) {
long bufferAddress = (index * SCALE) + BASE;
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
@Override
public void publish(final long sequence) {
setAvailable(sequence);
waitStrategy.signalAllWhenBlocking();
}
//The below methods work on the availableBuffer flag.
//The prime reason is to avoid a shared sequence object between publisher threads.
//(Keeping single pointers tracking start and end would require coordination between the threads).
//-- Firstly we have the constraint that the delta between the cursor and minimum gating sequence
//will never be larger than the buffer size (the code in next/tryNext in the Sequence takes care of that).
//-- Given that; take the sequence value and mask off the lower portion of the sequence
//as the index into the buffer (indexMask). (aka modulo operator)
//-- The upper portion of the sequence becomes the value to check for availability.
//ie: it tells us how many times around the ring buffer we've been (aka division)
//-- Because we can't wrap without the gating sequences moving forward
//(i.e. the minimum gating sequence is effectively our last available position in the buffer),
//when we have new data and successfully claimed a slot we can simply write over the top.
private void setAvailable(final long sequence) {
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
private int calculateIndex(final long sequence) {
return ((int) sequence) & indexMask;
}
private int calculateAvailabilityFlag(final long sequence) {
return (int) (sequence >>> indexShift);
}
@Override
public long next() {
return next(1);
}
@Override
public long next(int n) {
if (n < 1) {
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do {
current = cursor.get();
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence) {
LockSupport.parkNanos(1);
continue;
}
gatingSequenceCache.set(gatingSequence);
} else if (cursor.compareAndSet(current, next)) {
break;
}
} while (true);
return next;
}
...
}
(2)通过Translator事件转换器发布消息
生产者还可以直接调用RingBuffer的tryPublishEvent()方法来完成发布事件消息到RingBuffer。该方法首先会调用Sequencer接口的tryNext()方法获取sequence序号,然后根据该sequence序号从RingBuffer的环形数组中获取对应的元素,接着再调用RingBuffer的translateAndPublish()方法将事件消息赋值替换到该元素中,最后调用Sequencer接口的publish()方法设置当前生产者的sequence序号来完成事件消息的发布。
abstract class RingBufferPad {
protected long p1, p2, p3, p4, p5, p6, p7;
}
abstract class RingBufferFields<E> extends RingBufferPad {
...
private static final Unsafe UNSAFE = Util.getUnsafe();
private final long indexMask;
//环形数组存储事件消息
private final Object[] entries;
protected final int bufferSize;
//RingBuffer的sequencer属性代表了当前线程对应的生产者
protected final Sequencer sequencer;
RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1) {
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.indexMask = bufferSize - 1;
//初始化数组
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
//内存预加载
fill(eventFactory);
}
private void fill(EventFactory<E> eventFactory) {
for (int i = 0; i < bufferSize; i++) {
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
protected final E elementAt(long sequence) {
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
...
}
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {
//值为-1
public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
protected long p1, p2, p3, p4, p5, p6, p7;
//Construct a RingBuffer with the full option set.
//@param eventFactory to newInstance entries for filling the RingBuffer
//@param sequencer sequencer to handle the ordering of events moving through the RingBuffer.
RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {
super(eventFactory, sequencer);
}
@Override
public boolean tryPublishEvent(EventTranslator<E> translator) {
try {
final long sequence = sequencer.tryNext();
translateAndPublish(translator, sequence);
return true;
} catch (InsufficientCapacityException e) {
return false;
}
}
private void translateAndPublish(EventTranslator<E> translator, long sequence) {
try {
translator.translateTo(get(sequence), sequence);
} finally {
sequencer.publish(sequence);
}
}
//Get the event for a given sequence in the RingBuffer.
//This call has 2 uses.
//Firstly use this call when publishing to a ring buffer.
//After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long).
//Secondly use this call when consuming data from the ring buffer.
//After calling SequenceBarrier#waitFor(long) call this method with any value greater than that
//your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method.
//@param sequence for the event
//@return the event for the given sequence
@Override
public E get(long sequence) {
//调用父类RingBufferFields的elementAt()方法
return elementAt(sequence);
}
...
}
public abstract class AbstractSequencer implements Sequencer {
private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
//环形数组的大小
protected final int bufferSize;
//等待策略
protected final WaitStrategy waitStrategy;
//当前生产者的进度
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
//每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler)
//这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时,
//由RingBuffer的addGatingSequences()方法进行添加
protected volatile Sequence[] gatingSequences = new Sequence[0];
...
//Create with the specified buffer size and wait strategy.
//@param bufferSize The total number of entries, must be a positive power of 2.
//@param waitStrategy
public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) {
if (bufferSize < 1) {
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.bufferSize = bufferSize;
this.waitStrategy = waitStrategy;
}
...
}
public final class SingleProducerSequencer extends SingleProducerSequencerFields {
protected long p1, p2, p3, p4, p5, p6, p7;
//Construct a Sequencer with the selected wait strategy and buffer size.
//@param bufferSize the size of the buffer that this will sequence over.
//@param waitStrategy for those waiting on sequences.
public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) {
super(bufferSize, waitStrategy);
}
...
@Override
public long tryNext() throws InsufficientCapacityException {
return tryNext(1);
}
@Override
public long tryNext(int n) throws InsufficientCapacityException {
if (n < 1) {
throw new IllegalArgumentException("n must be > 0");
}
if (!hasAvailableCapacity(n, true)) {
throw InsufficientCapacityException.INSTANCE;
}
long nextSequence = this.nextValue += n;
return nextSequence;
}
private boolean hasAvailableCapacity(int requiredCapacity, boolean doStore) {
long nextValue = this.nextValue;
long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
long cachedGatingSequence = this.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
if (doStore) {
cursor.setVolatile(nextValue);//StoreLoad fence
}
long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
this.cachedValue = minSequence;
if (wrapPoint > minSequence) {
return false;
}
}
return true;
}
@Override
public void publish(long sequence) {
//设置当前生产者的sequence
cursor.set(sequence);
//通过等待策略通知阻塞的消费者
waitStrategy.signalAllWhenBlocking();
}
...
}
abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad {
SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) {
super(bufferSize, waitStrategy);
}
//表示生产者的当前序号,值为-1
protected long nextValue = Sequence.INITIAL_VALUE;
//表示消费者的最小序号,值为-1
protected long cachedValue = Sequence.INITIAL_VALUE;
}
abstract class SingleProducerSequencerPad extends AbstractSequencer {
protected long p1, p2, p3, p4, p5, p6, p7;
SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) {
super(bufferSize, waitStrategy);
}
}
public final class MultiProducerSequencer extends AbstractSequencer {
...
@Override
public long tryNext() throws InsufficientCapacityException {
return tryNext(1);
}
@Override
public long tryNext(int n) throws InsufficientCapacityException {
if (n < 1) {
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do {
current = cursor.get();
next = current + n;
if (!hasAvailableCapacity(gatingSequences, n, current)) {
throw InsufficientCapacityException.INSTANCE;
}
} while (!cursor.compareAndSet(current, next));
return next;
}
private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue) {
long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue) {
long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);
gatingSequenceCache.set(minSequence);
if (wrapPoint > minSequence) {
return false;
}
}
return true;
}
@Override
public void publish(final long sequence) {
setAvailable(sequence);
waitStrategy.signalAllWhenBlocking();
}
...
}
//Implementations translate (write) data representations into events claimed from the RingBuffer.
//When publishing to the RingBuffer, provide an EventTranslator.
//The RingBuffer will select the next available event by sequence and provide it to the EventTranslator (which should update the event),
//before publishing the sequence update.
//@param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
public interface EventTranslator<T> {
//Translate a data representation into fields set in given event
//@param event into which the data should be translated.
//@param sequence that is assigned to event.
void translateTo(T event, long sequence);
}
相关推荐
- 面试怕被问Hashmap,多看看这个文章
-
o数据结构otable数组长度永远为2的幂次方o那么为什么要把数组长度设计为2的幂次方呢?o扩容o链表树化o红黑树拆分o查找o插入o删除o遍历oequasl和hashcode总结HashMap是面试中...
- 非常简洁地重试Retry组件,使用起来杠杠的
-
前言小伙伴是不是经常遇到接口调用异常,超时的场景?尤其网络抖动导致timeout超时的场景,我们一般产品就会叫我们要重试几次。很多小伙伴的实现方式是写个循环调用for(inti=1;i<=3;...
- Kafka消息可靠传输之幂等、事务机制
-
一般而言,消息中间件的消息传输保障有3个层级,分别如下。atmostonce:至多一次。消息可能会丢失,但绝对不会重复传输。atleastonce:最少一次。消息绝不会丢失,但可能会重复传输。...
- Seata源码—9.Seata XA模式的事务处理
-
大纲1.SeataXA分布式事务案例及AT与XA的区别2.SeataXA分布式事务案例的各模块运行流程3.Seata使用SpringBoot自动装配简化复杂配置4.全局事务注解扫描组件的自动装配...
- Disruptor—3.核心源码实现分析一
-
大纲1.Disruptor的生产者源码分析2.Disruptor的消费者源码分析3.Disruptor的WaitStrategy等待策略分析4.Disruptor的高性能原因5.Disruptor高性...
- Spring Boot 进阶-详解SpringBoot中条件注解使用
-
作为使用SpringBoot框架的开发者来讲,如果你连如下的这些注解你都没有听说过,没有用过,那我劝你还是放弃吧?在SpringBoot中我们最常见到的注解应该是条件注解了吧!也就是@Condit...
- 如何自定义编解码器(如何自定义编解码器的程序)
-
1.前言上一节我们一节了解了什么是编码解码、序列化和反序列化了,并且留有一道思考题,本节内容主要是深入解析该思考题。思考题:能否把我们的编码和解码封装成独立的Handler呢?那么应该如何去封装...
- Disruptor—3.核心源码实现分析二
-
大纲1.Disruptor的生产者源码分析2.Disruptor的消费者源码分析3.Disruptor的WaitStrategy等待策略分析4.Disruptor的高性能原因5.Disruptor高性...
- 线程的状态有哪些?它是如何工作的?
-
线程的状态有哪些?它是如何工作的?线程(Thread)是并发编程的基础,也是程序执行的最小单元,它依托进程而存在。一个进程中可以包含多个线程,多线程可以共享一块内存空间和一组系统资源,因此线程之间的切...
- 有图解有案例,我终于把Condition的原理讲透彻了
-
平时加解锁都是直接使用Synchronized关键字来实现的,简单好用,为啥还要引用ReentrantLock呢?为了解决小伙伴的疑问,我们来对两者做个简单的比较吧:相同点两者都是“可重入锁”,即当前...
- 白话DUBBO原理,通俗易记,再也不怕面试时讲不清楚了
-
现在的各种面试免不了要问些中间件,尤其是互联网公司,更注重获选人对中间件的掌握情况。在中间件中,有一大类是关于RPC框架的,Dubbo即是阿里出品的一款很著名的RPC中间件,很多互联网公司都在用,面试...
- Java 最细的集合类总结(java常用的集合类有哪些)
-
数据结构作为每一个开发者不可回避的问题,而Java对于不同的数据结构提供了非常成熟的实现,这一个又一个实现既是面试中的难点,也是工作中必不可少的工具,在此,笔者经历漫长的剖析,将其抽丝剥茧的呈现出...
- 详解Java异常(Exception)处理及常见异常
-
很多事件并非总是按照人们自己设计意愿顺利发展的,经常出现这样那样的异常情况。例如:你计划周末郊游,计划从家里出发→到达目的→游泳→烧烤→回家。但天有不测风云,当你准备烧烤时候突然天降大雨,只能终止郊...
- 为什么阿里强制要求不要在foreach循环里进行元素remove和add操作
-
在阅读《阿里巴巴Java开发手册》时,发现有一条关于在foreach循环里进行元素的remove/add操作的规约,具体内容如下:错误演示我们首先在IDEA中编写一个在foreach循...
- SpringBoot条件化配置(@Conditional)全面解析与实战指南
-
一、条件化配置基础概念1.1什么是条件化配置条件化配置是Spring框架提供的一种基于特定条件来决定是否注册Bean或加载配置的机制。在SpringBoot中,这一机制通过@Conditional...
- 一周热门
-
-
Python实现人事自动打卡,再也不会被批评
-
Psutil + Flask + Pyecharts + Bootstrap 开发动态可视化系统监控
-
【验证码逆向专栏】vaptcha 手势验证码逆向分析
-
一个解决支持HTML/CSS/JS网页转PDF(高质量)的终极解决方案
-
再见Swagger UI 国人开源了一款超好用的 API 文档生成框架,真香
-
网页转成pdf文件的经验分享 网页转成pdf文件的经验分享怎么弄
-
C++ std::vector 简介
-
python使用fitz模块提取pdf中的图片
-
《人人译客》如何规划你的移动电商网站(2)
-
Jupyterhub安装教程 jupyter怎么安装包
-
- 最近发表
- 标签列表
-
- python判断字典是否为空 (50)
- crontab每周一执行 (48)
- aes和des区别 (43)
- bash脚本和shell脚本的区别 (35)
- canvas库 (33)
- dataframe筛选满足条件的行 (35)
- gitlab日志 (33)
- lua xpcall (36)
- blob转json (33)
- python判断是否在列表中 (34)
- python html转pdf (36)
- 安装指定版本npm (37)
- idea搜索jar包内容 (33)
- css鼠标悬停出现隐藏的文字 (34)
- linux nacos启动命令 (33)
- gitlab 日志 (36)
- adb pull (37)
- table.render (33)
- uniapp textarea (33)
- python判断元素在不在列表里 (34)
- python 字典删除元素 (34)
- vscode切换git分支 (35)
- python bytes转16进制 (35)
- grep前后几行 (34)
- hashmap转list (35)