Disruptor中怎么实现一个高性能队列,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
Disruptor 例子
import java.util.concurrent.ThreadFactory
import com.lmax.disruptor.dsl.{Disruptor, ProducerType}
import com.lmax.disruptor.{BlockingWaitStrategy,EventFactory,EventHandler,EventTranslatorOneArg,WaitStrategy}
object DisruptorTest {
val disruptor = {
val factory = new EventFactory[Event] {
override def newInstance(): Event = Event(-1)
}
val threadFactory = new ThreadFactory(){
override def newThread(r: Runnable): Thread = new Thread(r)
}
val disruptor = new Disruptor[Event](factory, 4, threadFactory, ProducerType.SINGLE,
new BlockingWaitStrategy())
disruptor.handleEventsWith(TestHandler).`then`(ThenHandler)
disruptor
}
val translator = new EventTranslatorOneArg[Event, Int]() {
override def translateTo(event: Event, sequence: Long, arg: Int): Unit = {
event.id = arg
println(s"translator: ${event}, sequence: ${sequence}, arg: ${arg}")
}
}
def main(args: Array[String]): Unit = {
disruptor.start()
(0 until 100).foreach { i =>
disruptor.publishEvent(translator, i)
}
disruptor.shutdown()
}
}
case class Event(var id: Int) {
override def toString: String = s"event: ${id}"
}
object TestHandler extends EventHandler[Event] {
override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {
println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")
}
}
object ThenHandler extends EventHandler[Event] {
override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {
println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")
}
}
源码阅读
disrutpor 初始化
先看 Disruptor 构造方法
public Disruptor(final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy) {
this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
在看 RingBuffer.create, 最终通过 fill 方法 将 eventFactory.newInstance() 作为默认值,塞到 ringBuffer 里面
public static <E> RingBuffer<E> create(ProducerType producerType,
EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
switch (producerType) {
case SINGLE:
return createSingleProducer(factory, bufferSize, waitStrategy);
case MULTI:
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
throw new IllegalStateException(producerType.toString());
}
}
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize,
WaitStrategy waitStrategy) {
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1) {
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.indexMask = bufferSize - 1;
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}
private void fill(EventFactory<E> eventFactory) {
for (int i = 0; i < bufferSize; i++) {
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
消费事件消息
首先看 disruptor.start(): 消费事件消息入口
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>();
public RingBuffer<T> start() {
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository) {
consumerInfo.start(executor);
}
return ringBuffer;
}
consumerRepository 类型由 disruptor.handleEventsWith(TestHandler) 初始化, 并构造事件消息处理链
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
return createEventProcessors(new Sequence[0], handlers);
}
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) {
checkNotStarted();
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null) {
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}
回头看 disruptor.start() 中的 consumerInfo.start(executor) executor = new BasicExecutor(threadFactory),BasicExecutor 在每次 execute 任务时,都会 new thread **但是 consumerRepository 的数量是有限的,所以 new thread 也没啥问题
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy) {
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) {
this.ringBuffer = ringBuffer;
this.executor = executor;
}
@Override
public void start(final java.util.concurrent.Executor executor){
//EventProcessor extends Runnable
//executor = BasicExecutor
executor.execute(eventprocessor);
}
public final class BatchEventProcessor<T> implements EventProcessor {
@Override
public void run() {
if (running.compareAndSet(IDLE, RUNNING)) {
sequenceBarrier.clearAlert();
notifyStart();
try {
if (running.get() == RUNNING) {
processEvents();
}
} finally {
notifyShutdown();
running.set(IDLE);
}
} else {
if (running.get() == RUNNING) {
throw new IllegalStateException("Thread is already running");
} else {
earlyExit();
}
}
}
}
private void processEvents() {
T event = null;
long nextSequence = sequence.get() + 1L;
while (true) {
try {
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null) {
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
while (nextSequence <= availableSequence) {
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
} catch (final TimeoutException e) {
notifyTimeout(sequence.get());
} catch (final AlertException ex) {
if (running.get() != RUNNING) {
break;
}
} catch (final Throwable ex) {
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
executor.execute 也就是 BasicExecutor.execute(eventHandler) 会异步的执行 eventHandler, 也就是调用 BatchEventProcessor.run 方法
问题来了,既然是异步执行,多个 eventHandler 是怎么按照顺序去处理事件消息的?
我们看 processEvents 方法执行逻辑
先获取 BatchEventProcessor.sequence 并 +1
通过 sequenceBarrier.waitFor 也就是 WaitStrategy.waitFor 获取到可用的 availableSequence
先看下 BlockingWaitStrategy.waitFor 的实现
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence,
SequenceBarrier barrier)
throws AlertException, InterruptedException {
long availableSequence;
if (cursorSequence.get() < sequence) {
lock.lock();
try {
while (cursorSequence.get() < sequence) {
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally {
lock.unlock();
}
}
while ((availableSequence = dependentSequence.get()) < sequence) {
barrier.checkAlert();
}
return availableSequence;
}
如果 cursorSequence(ringbuffer 的索引) < sequence(batchEventProcessor 的索引) 则batchEventProcessor挂起等待 否则 就用 dependentSequence 作为 availableSequence 返回 然后 batchEventProcessor 会将 availableSequence 索引之前的数据一次性处理完,并更新自身的 sequence 索引值
dependentSequence 由 ProcessingSequenceBarrier 构造方法初始化
final class ProcessingSequenceBarrier implements SequenceBarrier {
private final WaitStrategy waitStrategy;
private final Sequence dependentSequence;
private volatile boolean alerted = false;
private final Sequence cursorSequence;
private final Sequencer sequencer;
ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy,
final Sequence cursorSequence, final Sequence[] dependentSequences) {
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length) {
dependentSequence = cursorSequence;
} else {
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
}
在 Disruptor.createEventProcessors 中的, 进行了初始化 ProcessingSequenceBarrier final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences)
createEventProcessors 仅会被 Disruptor.handleEventsWith 和 EventHandlerGroup.handleEventsWith
public class Disruptor<T> {
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
return createEventProcessors(new Sequence[0], handlers);
}
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers) {
checkNotStarted();
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null) {
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}
}
public class EventHandlerGroup<T> {
private final Disruptor<T> disruptor;
private final ConsumerRepository<T> consumerRepository;
private final Sequence[] sequences;
EventHandlerGroup(final Disruptor<T> disruptor, final ConsumerRepository<T> consumerRepository,
final Sequence[] sequences) {
this.disruptor = disruptor;
this.consumerRepository = consumerRepository;
this.sequences = Arrays.copyOf(sequences, sequences.length);
}
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
return disruptor.createEventProcessors(sequences, handlers);
}
public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers) {
return handleEventsWith(handlers);
}
}
EventHandlerGroup 会拷贝一份 batchEventProcessor 中的 sequence demo 例子中 disruptor.handleEventsWith(TestHandler).then
(ThenHandler) 通过 then 方法将 TestHandler 中的 sequence 传递给 ThenHandler 这样 ThenHandler 就依赖了 TestHandler, ThenHandler 就会在 TestHandler 后执行
生产事件消息
接着看 disruptor.publishEvent(translator, i)
就是往 ringBuffer 里面放数据,
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) {
final long sequence = sequencer.next();
translateAndPublish(translator, sequence, arg0);
}
private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) {
try {
translator.translateTo(get(sequence), sequence, arg0);
} finally {
sequencer.publish(sequence);
}
}
public E get(long sequence) {
return elementAt(sequence);
}
get(sequence) 根据 sequence [ringbuffer 索引] 获取 ringbuffer 数组里的对象 translator 将其处理替换完后,ringbuffer 数组的的值将是新的值,publish 将会更新索引的标记位 waitStrategy.signalAllWhenBlocking() 会通知阻塞等待的消费者去继续消费消息
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
@Override
public void publish(long sequence) {
cursor.set(sequence);
waitStrategy.signalAllWhenBlocking();
}
总结
流程理清楚了,我们看看 知识点
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注天达云行业资讯频道,感谢您对天达云的支持。