无锁的魅力

2017 Aug 07 See all posts


内存队列用法的一个典型的场景:一大堆任务需要分发处理,如何实现高效分发?

目录

方案举例:

实验:

参数:4100个任务,每个任务耗时10毫秒,线程数取CPU的个数,代码见实验代码1

方案 耗时(毫秒)
1.单线程 41,022
2.多线程一次一个任务 74,795
3.多线程批量处理多个任务 5,037
4.Disruptor 3,651

解释:

可以看到方案2比方案1还慢,原因:如果有N个任务,就要创建N个线程,要知道创建销毁线程是一个不轻的操作,更重要的是这么多线程取任务take()的时候还需要对队列加锁,线程多了这也是一笔大的开销。

方案3速度可以,简单高效,原因就是每个线程能批量处理大量任务,基本无锁(几个小的计数器Atomic锁的性能开销很低),缺点是太简单,而且需要对任务进行划分,通用性不算高。

方案4 Disruptor是采用了无锁的ringBuffer,作为无锁高性能一个内存队列早就名声在外,官方的原话是"It's fast. Very fast."

目前公司的RPC Server就是用它来处理request请求,比基本的 FixedThreadPool 强了不止一个档次。 来个demo,代码见实验代码2,见识一下恐怖的tps吧

disruptor的性能

生产者使用三个线程去写数据,一个消费者进行处理。生产者运行在三个线程中,批处理写入,每次写16个数据。

在个人PC上实际测试每秒能达到恐怖的 183486238 的吞吐率,也就是惊人的1.8亿,无锁的ringbuffer太强悍了!唯一可惜是有界的,这个没办法......

实验代码1

package com.cs.exercise.concurrent.benchmark;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import org.apache.commons.lang.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class TaskDemo {

  private final Logger log = LoggerFactory.getLogger(TaskDemo.this.getClass());
  private final List<Long> itemList = new ArrayList<>();
  private static int N_THREADS = Runtime.getRuntime().availableProcessors();
  private static int STAGE_SIZE = 100;

  protected void prepare() {
    int size = 4100;
    Random r = new Random(size);
    for (int i = 0; i < size; i++) {
      itemList.add(r.nextLong());
    }
  }

  private static void spin(int milliseconds) {
    long sleepTime = milliseconds * 1_000_000L; // convert to nanoseconds
    long startTime = System.nanoTime();
    while ((System.nanoTime() - startTime) < sleepTime) ;
  }

  protected int work(Long i) {
    spin(10);
    return i.intValue();
  }

  public void processBySingleThread() {
    long start = System.currentTimeMillis();
    int total = itemList.size();
    for (int i = 0; i < itemList.size(); i++) {
      try {
        work(itemList.get(i));
      } catch (Exception e) {
        e.printStackTrace();
      }
      if ((i + 1) % STAGE_SIZE == 0) {
        String info = String.format("single thread processed: %d/%d", i + 1, total);
        log.info(info);
      }
    }
    long cost = System.currentTimeMillis() - start;
    log.info("single thread costs: {} millis", cost);
  }

  public void processByMultiThreadsWithBlockingQueue() {
    long start = System.currentTimeMillis();
    int total = itemList.size();
    AtomicLong totalDoneCount = new AtomicLong(0L);
    BlockingQueue<Long> blockingQueue = new ArrayBlockingQueue<>(total);
    blockingQueue.addAll(itemList);
    ExecutorService es = Executors.newFixedThreadPool(N_THREADS);
    do {
      es.execute(() -> {
        try {
          if (blockingQueue.isEmpty()) {
            return;
          }
          long itemId = blockingQueue.poll();
          try {
            work(itemId);
            long doneCount = totalDoneCount.addAndGet(1);
            if (doneCount % STAGE_SIZE == 0) {
              String info = String.format("processed: %d/%d, threadName:%s", totalDoneCount.get(), total, Thread.currentThread().getName());
              log.info(info);
            }
            if (doneCount == total) {
              long cost = System.currentTimeMillis() - start;
              log.info("blocking queue costs: {} millis, empty:{}", cost, blockingQueue.isEmpty());
            }
          } catch (Exception e) {
            e.printStackTrace();
            blockingQueue.put(itemId);
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
    } while (!blockingQueue.isEmpty());
    es.shutdown();
    try {
      if (!es.awaitTermination(3, TimeUnit.SECONDS)) {
        es.shutdownNow();
        if (!es.awaitTermination(5, TimeUnit.SECONDS)) {
          log.error("线程池未能正常关闭");
        }
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public void processByMultiThreadsSimpleSplit() {
    long start = System.currentTimeMillis();
    int total = itemList.size();
    AtomicLong totalDoneCount = new AtomicLong(0L);
    ExecutorService es = Executors.newFixedThreadPool(N_THREADS);
    int fromIndex = 0;
    final int STEP = 500;
    while (true) {
      fromIndex = Math.min(fromIndex, total);
      int toIndex = Math.min(fromIndex + STEP, total);
      List<Long> subList = itemList.subList(fromIndex, toIndex);
      if (subList.isEmpty()) {
        break;
      }
      int finalFromIndex = fromIndex;
      es.execute(() -> {
        int count = doCustomItemSubList(subList);
        long doneCount = totalDoneCount.addAndGet(subList.size());
        String info = String.format("processed: %d/%d, index from-to: %d-%d",
            doneCount, total, finalFromIndex, toIndex);
        if (doneCount % STAGE_SIZE == 0) {
          log.info(info);
        }
        if (doneCount == total) {
          long cost = System.currentTimeMillis() - start;
          log.info("全部, simple split total time costs: " + cost);
        }
      });
      fromIndex = toIndex;
    }
    es.shutdown();
    try {
      if (!es.awaitTermination(3, TimeUnit.SECONDS)) {
        es.shutdownNow();
        if (!es.awaitTermination(5, TimeUnit.SECONDS)) {
          log.error("线程池未能正常关闭");
        }
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  private int doCustomItemSubList(List<Long> itemIdList) {
    int i = 0;
    for (Long itemId : itemIdList) {
      work(itemId);
      i++;
    }
    return i;
  }

  public void processByDisruptor() {
    StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    int total = itemList.size();
    AtomicLong totalDoneCount = new AtomicLong(0L);
    AtomicLong totalSuccessCount = new AtomicLong(0L);
    Disruptor<ItemEvent> disruptor = new Disruptor<>(new ItemEventFactory(), 65536,
        DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new YieldingWaitStrategy());
    ItemEventHandler[] eventHandlers = new ItemEventHandler[N_THREADS];
    for (int i = 0; i < N_THREADS; i++) {
      eventHandlers[i] = new ItemEventHandler();
    }
    disruptor.handleEventsWithWorkerPool(eventHandlers);
    disruptor.start();
    RingBuffer<ItemEvent> ringBuffer = disruptor.getRingBuffer();
    ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<>(total);
    queue.addAll(itemList);
    int publishedCount = 0;
    do {
      long itemId;
      try {
        itemId = queue.take();
        long finalItemId = itemId;
        EventTranslator<ItemEvent> translator = (itemEvent, sequence) -> {
          itemEvent.itemId = finalItemId;
          itemEvent.stopWatch = stopWatch;
          itemEvent.targetTotal = total;
          itemEvent.totalDoneCount = totalDoneCount;
          itemEvent.totalSuccessCount = totalSuccessCount;
        };
        boolean published = ringBuffer.tryPublishEvent(translator);
        if (!published) {
          queue.put(itemId);
        } else {
          publishedCount++;
        }
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }

    } while (publishedCount < total);
    disruptor.shutdown();
  }

  static class ItemEvent {
    public long itemId;
    public StopWatch stopWatch;
    public int targetTotal;
    public AtomicLong totalDoneCount;
    public AtomicLong totalSuccessCount;
  }

  static class ItemEventFactory implements EventFactory<ItemEvent> {

    @Override
    public ItemEvent newInstance() {
      return new ItemEvent();
    }
  }

  class ItemEventHandler implements WorkHandler<ItemEvent> {

    @Override
    public void onEvent(ItemEvent itemEvent) {
//      log.info("@itemEvent.totalSuccessCount: {}", System.identityHashCode(itemEvent.totalSuccessCount));
      long itemId = itemEvent.itemId;
      int ret = work(itemId);
      itemEvent.totalSuccessCount.addAndGet(1);
      itemEvent.totalDoneCount.addAndGet(1);

      if (itemEvent.totalDoneCount.get() % STAGE_SIZE == 0) {
        String info = String.format("disruptor progress: %d/%d, success:%d",
            itemEvent.totalDoneCount.get(), itemEvent.targetTotal, itemEvent.totalSuccessCount.get());
        log.info(info);
      }
      if (itemEvent.totalDoneCount.get() == itemEvent.targetTotal) {
        itemEvent.stopWatch.stop();
        long cost = itemEvent.stopWatch.getTime();
        String info = String.format("disruptor done: %d/%d, success:%d, costs:%d",
            itemEvent.totalDoneCount.get(), itemEvent.targetTotal, itemEvent.totalSuccessCount.get(), cost);
        log.info(info);
      }
    }
  }

  public static void main(String[] args) {
    TaskDemo taskDemo = new TaskDemo();
    taskDemo.prepare();
//    taskDemo.processBySingleThread();                      //41022  millis
//    taskDemo.processByMultiThreadsWithBlockingQueue();     //74795  millis
//    taskDemo.processByMultiThreadsSimpleSplit();           // 5037  millis
//    taskDemo.processByDisruptor();                         // 3651  millis
  }
}

实验代码2

package com.cs.exercise.concurrent.disruptor;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.util.concurrent.*;

public class Main {
  private static final int NUM_PUBLISHERS = 2 * Runtime.getRuntime().availableProcessors();
  private static final int BUFFER_SIZE = 1024 * 64;
  private static final long ITERATIONS = 10000L * 1000L * 200L;
  private final ExecutorService executor = Executors.newFixedThreadPool(NUM_PUBLISHERS + 1, DaemonThreadFactory.INSTANCE);
  private final CyclicBarrier cyclicBarrier = new CyclicBarrier(NUM_PUBLISHERS + 1);
  private final RingBuffer<ValueEvent> ringBuffer = RingBuffer.createMultiProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new BusySpinWaitStrategy());
  private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
  private final BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, ringBuffer.newBarrier(), handler);
  private final ValueBatchPublisher[] valuePublishers = new ValueBatchPublisher[NUM_PUBLISHERS];

  {
    for (int i = 0; i < NUM_PUBLISHERS; i++) {
      valuePublishers[i] = new ValueBatchPublisher(cyclicBarrier, ringBuffer, ITERATIONS / NUM_PUBLISHERS, 16);
    }
    ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
  }

  public long runDisruptorPass() throws Exception {
    final CountDownLatch latch = new CountDownLatch(1);
    long seq = batchEventProcessor.getSequence().get();
    long expectedCount = seq + ((ITERATIONS / NUM_PUBLISHERS) * NUM_PUBLISHERS);
    System.out.println(seq + "," + expectedCount);
    handler.reset(latch, expectedCount);
    Future<?>[] futures = new Future[NUM_PUBLISHERS];
    for (int i = 0; i < NUM_PUBLISHERS; i++) {
      futures[i] = executor.submit(valuePublishers[i]);
    }
    executor.submit(batchEventProcessor);
    long start = System.currentTimeMillis();
    cyclicBarrier.await(); //start test
    for (int i = 0; i < NUM_PUBLISHERS; i++) {
      futures[i].get();
    } //all published
    latch.await(); //all handled
    long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
    System.out.println("ops:" + opsPerSecond);
    batchEventProcessor.halt();
    return opsPerSecond;
  }

  public static void main(String[] args) throws Exception {
    Main m = new Main();
    System.out.println("1opsPerSecond:" + m.runDisruptorPass());
  }
}
package com.cs.exercise.concurrent.disruptor;

import com.lmax.disruptor.EventHandler;

import java.util.concurrent.CountDownLatch;

public final class ValueAdditionEventHandler implements EventHandler<ValueEvent> {
  private long value = 0;
  private long count;
  private CountDownLatch latch;

  public long getValue() {
    return value;
  }

  public void reset(final CountDownLatch latch, final long expectedCount) {
    value = 0;
    this.latch = latch;
    count = expectedCount;
  }

  @Override
  public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception {
    value = event.getValue();
    if (count == sequence) {
      latch.countDown();
    }
  }
}
package com.cs.exercise.concurrent.disruptor;

import com.lmax.disruptor.RingBuffer;

import java.util.concurrent.CyclicBarrier;

public final class ValueBatchPublisher implements Runnable {
  private final CyclicBarrier cyclicBarrier;
  private final RingBuffer<ValueEvent> ringBuffer;
  private final long iterations;
  private final int batchSize;

  public ValueBatchPublisher(
      final CyclicBarrier cyclicBarrier,
      final RingBuffer<ValueEvent> ringBuffer,
      final long iterations,
      final int batchSize) {
    this.cyclicBarrier = cyclicBarrier;
    this.ringBuffer = ringBuffer;
    this.iterations = iterations;
    this.batchSize = batchSize;
  }

  @Override
  public void run() {
    try {
      cyclicBarrier.await();
      for (long i = 0; i < iterations; i += batchSize) {
        long hi = ringBuffer.next(batchSize);
        long lo = hi - (batchSize - 1);
        for (long l = lo; l <= hi; l++) {
          ValueEvent event = ringBuffer.get(l);
          event.setValue(l);
        }
        ringBuffer.publish(lo, hi);
      }
    } catch (Exception ex) {
      throw new RuntimeException(ex);
    }
  }
}
package com.cs.exercise.concurrent.disruptor;

import com.lmax.disruptor.EventFactory;

import java.io.Serializable;

public final class ValueEvent implements Serializable {
  private static final long serialVersionUID = 4660928408226614035L;
  private long value;

  public long getValue() {
    return value;
  }

  public void setValue(final long value) {
    this.value = value;
  }

  public static final EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>() {
    public ValueEvent newInstance() {
      return new ValueEvent();
    }
  };
}

Back to top