无锁的魅力
2017 Aug 07
See all posts
内存队列用法的一个典型的场景:一大堆任务需要分发处理,如何实现高效分发?
目录
方案举例:
- 1.单线程,直接一个线程处理
- 2.多线程,如开固定大小的线程池,任务放在内存队列中,FixedThreadPool
+
BlockingQueue,每个线程一次去队列里取一个任务(为了对比,故意这样设计,看到底有多慢)
- 3.多线程,但是将任务按照线程数平均划分,每个线程批量处理多个任务
- 4.Disruptor,由ringBuffer来处理任务分发
实验:
参数:4100个任务,每个任务耗时10毫秒,线程数取CPU的个数,代码见实验代码1
1.单线程 |
41,022 |
2.多线程一次一个任务 |
74,795 |
3.多线程批量处理多个任务 |
5,037 |
4.Disruptor |
3,651 |
解释:
可以看到方案2比方案1还慢,原因:如果有N个任务,就要创建N个线程,要知道创建销毁线程是一个不轻的操作,更重要的是这么多线程取任务take()的时候还需要对队列加锁,线程多了这也是一笔大的开销。
方案3速度可以,简单高效,原因就是每个线程能批量处理大量任务,基本无锁(几个小的计数器Atomic锁的性能开销很低),缺点是太简单,而且需要对任务进行划分,通用性不算高。
方案4 Disruptor是采用了无锁的ringBuffer,作为无锁高性能一个内存队列早就名声在外,官方的原话是"It's fast. Very fast."
目前公司的RPC Server就是用它来处理request请求,比基本的
FixedThreadPool 强了不止一个档次。 来个demo,代码见实验代码2,见识一下恐怖的tps吧
disruptor的性能
生产者使用三个线程去写数据,一个消费者进行处理。生产者运行在三个线程中,批处理写入,每次写16个数据。
在个人PC上实际测试每秒能达到恐怖的 183486238
的吞吐率,也就是惊人的1.8亿,无锁的ringbuffer太强悍了!唯一可惜是有界的,这个没办法......
实验代码1
package com.cs.exercise.concurrent.benchmark;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import org.apache.commons.lang.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class TaskDemo {
private final Logger log = LoggerFactory.getLogger(TaskDemo.this.getClass());
private final List<Long> itemList = new ArrayList<>();
private static int N_THREADS = Runtime.getRuntime().availableProcessors();
private static int STAGE_SIZE = 100;
protected void prepare() {
int size = 4100;
Random r = new Random(size);
for (int i = 0; i < size; i++) {
itemList.add(r.nextLong());
}
}
private static void spin(int milliseconds) {
long sleepTime = milliseconds * 1_000_000L; // convert to nanoseconds
long startTime = System.nanoTime();
while ((System.nanoTime() - startTime) < sleepTime) ;
}
protected int work(Long i) {
spin(10);
return i.intValue();
}
public void processBySingleThread() {
long start = System.currentTimeMillis();
int total = itemList.size();
for (int i = 0; i < itemList.size(); i++) {
try {
work(itemList.get(i));
} catch (Exception e) {
e.printStackTrace();
}
if ((i + 1) % STAGE_SIZE == 0) {
String info = String.format("single thread processed: %d/%d", i + 1, total);
log.info(info);
}
}
long cost = System.currentTimeMillis() - start;
log.info("single thread costs: {} millis", cost);
}
public void processByMultiThreadsWithBlockingQueue() {
long start = System.currentTimeMillis();
int total = itemList.size();
AtomicLong totalDoneCount = new AtomicLong(0L);
BlockingQueue<Long> blockingQueue = new ArrayBlockingQueue<>(total);
blockingQueue.addAll(itemList);
ExecutorService es = Executors.newFixedThreadPool(N_THREADS);
do {
es.execute(() -> {
try {
if (blockingQueue.isEmpty()) {
return;
}
long itemId = blockingQueue.poll();
try {
work(itemId);
long doneCount = totalDoneCount.addAndGet(1);
if (doneCount % STAGE_SIZE == 0) {
String info = String.format("processed: %d/%d, threadName:%s", totalDoneCount.get(), total, Thread.currentThread().getName());
log.info(info);
}
if (doneCount == total) {
long cost = System.currentTimeMillis() - start;
log.info("blocking queue costs: {} millis, empty:{}", cost, blockingQueue.isEmpty());
}
} catch (Exception e) {
e.printStackTrace();
blockingQueue.put(itemId);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
} while (!blockingQueue.isEmpty());
es.shutdown();
try {
if (!es.awaitTermination(3, TimeUnit.SECONDS)) {
es.shutdownNow();
if (!es.awaitTermination(5, TimeUnit.SECONDS)) {
log.error("线程池未能正常关闭");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void processByMultiThreadsSimpleSplit() {
long start = System.currentTimeMillis();
int total = itemList.size();
AtomicLong totalDoneCount = new AtomicLong(0L);
ExecutorService es = Executors.newFixedThreadPool(N_THREADS);
int fromIndex = 0;
final int STEP = 500;
while (true) {
fromIndex = Math.min(fromIndex, total);
int toIndex = Math.min(fromIndex + STEP, total);
List<Long> subList = itemList.subList(fromIndex, toIndex);
if (subList.isEmpty()) {
break;
}
int finalFromIndex = fromIndex;
es.execute(() -> {
int count = doCustomItemSubList(subList);
long doneCount = totalDoneCount.addAndGet(subList.size());
String info = String.format("processed: %d/%d, index from-to: %d-%d",
doneCount, total, finalFromIndex, toIndex);
if (doneCount % STAGE_SIZE == 0) {
log.info(info);
}
if (doneCount == total) {
long cost = System.currentTimeMillis() - start;
log.info("全部, simple split total time costs: " + cost);
}
});
fromIndex = toIndex;
}
es.shutdown();
try {
if (!es.awaitTermination(3, TimeUnit.SECONDS)) {
es.shutdownNow();
if (!es.awaitTermination(5, TimeUnit.SECONDS)) {
log.error("线程池未能正常关闭");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private int doCustomItemSubList(List<Long> itemIdList) {
int i = 0;
for (Long itemId : itemIdList) {
work(itemId);
i++;
}
return i;
}
public void processByDisruptor() {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
int total = itemList.size();
AtomicLong totalDoneCount = new AtomicLong(0L);
AtomicLong totalSuccessCount = new AtomicLong(0L);
Disruptor<ItemEvent> disruptor = new Disruptor<>(new ItemEventFactory(), 65536,
DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new YieldingWaitStrategy());
ItemEventHandler[] eventHandlers = new ItemEventHandler[N_THREADS];
for (int i = 0; i < N_THREADS; i++) {
eventHandlers[i] = new ItemEventHandler();
}
disruptor.handleEventsWithWorkerPool(eventHandlers);
disruptor.start();
RingBuffer<ItemEvent> ringBuffer = disruptor.getRingBuffer();
ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<>(total);
queue.addAll(itemList);
int publishedCount = 0;
do {
long itemId;
try {
itemId = queue.take();
long finalItemId = itemId;
EventTranslator<ItemEvent> translator = (itemEvent, sequence) -> {
itemEvent.itemId = finalItemId;
itemEvent.stopWatch = stopWatch;
itemEvent.targetTotal = total;
itemEvent.totalDoneCount = totalDoneCount;
itemEvent.totalSuccessCount = totalSuccessCount;
};
boolean published = ringBuffer.tryPublishEvent(translator);
if (!published) {
queue.put(itemId);
} else {
publishedCount++;
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} while (publishedCount < total);
disruptor.shutdown();
}
static class ItemEvent {
public long itemId;
public StopWatch stopWatch;
public int targetTotal;
public AtomicLong totalDoneCount;
public AtomicLong totalSuccessCount;
}
static class ItemEventFactory implements EventFactory<ItemEvent> {
@Override
public ItemEvent newInstance() {
return new ItemEvent();
}
}
class ItemEventHandler implements WorkHandler<ItemEvent> {
@Override
public void onEvent(ItemEvent itemEvent) {
// log.info("@itemEvent.totalSuccessCount: {}", System.identityHashCode(itemEvent.totalSuccessCount));
long itemId = itemEvent.itemId;
int ret = work(itemId);
itemEvent.totalSuccessCount.addAndGet(1);
itemEvent.totalDoneCount.addAndGet(1);
if (itemEvent.totalDoneCount.get() % STAGE_SIZE == 0) {
String info = String.format("disruptor progress: %d/%d, success:%d",
itemEvent.totalDoneCount.get(), itemEvent.targetTotal, itemEvent.totalSuccessCount.get());
log.info(info);
}
if (itemEvent.totalDoneCount.get() == itemEvent.targetTotal) {
itemEvent.stopWatch.stop();
long cost = itemEvent.stopWatch.getTime();
String info = String.format("disruptor done: %d/%d, success:%d, costs:%d",
itemEvent.totalDoneCount.get(), itemEvent.targetTotal, itemEvent.totalSuccessCount.get(), cost);
log.info(info);
}
}
}
public static void main(String[] args) {
TaskDemo taskDemo = new TaskDemo();
taskDemo.prepare();
// taskDemo.processBySingleThread(); //41022 millis
// taskDemo.processByMultiThreadsWithBlockingQueue(); //74795 millis
// taskDemo.processByMultiThreadsSimpleSplit(); // 5037 millis
// taskDemo.processByDisruptor(); // 3651 millis
}
}
实验代码2
package com.cs.exercise.concurrent.disruptor;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.util.concurrent.*;
public class Main {
private static final int NUM_PUBLISHERS = 2 * Runtime.getRuntime().availableProcessors();
private static final int BUFFER_SIZE = 1024 * 64;
private static final long ITERATIONS = 10000L * 1000L * 200L;
private final ExecutorService executor = Executors.newFixedThreadPool(NUM_PUBLISHERS + 1, DaemonThreadFactory.INSTANCE);
private final CyclicBarrier cyclicBarrier = new CyclicBarrier(NUM_PUBLISHERS + 1);
private final RingBuffer<ValueEvent> ringBuffer = RingBuffer.createMultiProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new BusySpinWaitStrategy());
private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
private final BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, ringBuffer.newBarrier(), handler);
private final ValueBatchPublisher[] valuePublishers = new ValueBatchPublisher[NUM_PUBLISHERS];
{
for (int i = 0; i < NUM_PUBLISHERS; i++) {
valuePublishers[i] = new ValueBatchPublisher(cyclicBarrier, ringBuffer, ITERATIONS / NUM_PUBLISHERS, 16);
}
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
}
public long runDisruptorPass() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
long seq = batchEventProcessor.getSequence().get();
long expectedCount = seq + ((ITERATIONS / NUM_PUBLISHERS) * NUM_PUBLISHERS);
System.out.println(seq + "," + expectedCount);
handler.reset(latch, expectedCount);
Future<?>[] futures = new Future[NUM_PUBLISHERS];
for (int i = 0; i < NUM_PUBLISHERS; i++) {
futures[i] = executor.submit(valuePublishers[i]);
}
executor.submit(batchEventProcessor);
long start = System.currentTimeMillis();
cyclicBarrier.await(); //start test
for (int i = 0; i < NUM_PUBLISHERS; i++) {
futures[i].get();
} //all published
latch.await(); //all handled
long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
System.out.println("ops:" + opsPerSecond);
batchEventProcessor.halt();
return opsPerSecond;
}
public static void main(String[] args) throws Exception {
Main m = new Main();
System.out.println("1opsPerSecond:" + m.runDisruptorPass());
}
}
package com.cs.exercise.concurrent.disruptor;
import com.lmax.disruptor.EventHandler;
import java.util.concurrent.CountDownLatch;
public final class ValueAdditionEventHandler implements EventHandler<ValueEvent> {
private long value = 0;
private long count;
private CountDownLatch latch;
public long getValue() {
return value;
}
public void reset(final CountDownLatch latch, final long expectedCount) {
value = 0;
this.latch = latch;
count = expectedCount;
}
@Override
public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception {
value = event.getValue();
if (count == sequence) {
latch.countDown();
}
}
}
package com.cs.exercise.concurrent.disruptor;
import com.lmax.disruptor.RingBuffer;
import java.util.concurrent.CyclicBarrier;
public final class ValueBatchPublisher implements Runnable {
private final CyclicBarrier cyclicBarrier;
private final RingBuffer<ValueEvent> ringBuffer;
private final long iterations;
private final int batchSize;
public ValueBatchPublisher(
final CyclicBarrier cyclicBarrier,
final RingBuffer<ValueEvent> ringBuffer,
final long iterations,
final int batchSize) {
this.cyclicBarrier = cyclicBarrier;
this.ringBuffer = ringBuffer;
this.iterations = iterations;
this.batchSize = batchSize;
}
@Override
public void run() {
try {
cyclicBarrier.await();
for (long i = 0; i < iterations; i += batchSize) {
long hi = ringBuffer.next(batchSize);
long lo = hi - (batchSize - 1);
for (long l = lo; l <= hi; l++) {
ValueEvent event = ringBuffer.get(l);
event.setValue(l);
}
ringBuffer.publish(lo, hi);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
package com.cs.exercise.concurrent.disruptor;
import com.lmax.disruptor.EventFactory;
import java.io.Serializable;
public final class ValueEvent implements Serializable {
private static final long serialVersionUID = 4660928408226614035L;
private long value;
public long getValue() {
return value;
}
public void setValue(final long value) {
this.value = value;
}
public static final EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>() {
public ValueEvent newInstance() {
return new ValueEvent();
}
};
}
无锁的魅力
2017 Aug 07 See all posts内存队列用法的一个典型的场景:一大堆任务需要分发处理,如何实现高效分发?
目录
方案举例:
实验:
参数:4100个任务,每个任务耗时10毫秒,线程数取CPU的个数,代码见实验代码1
解释:
可以看到方案2比方案1还慢,原因:如果有N个任务,就要创建N个线程,要知道创建销毁线程是一个不轻的操作,更重要的是这么多线程取任务take()的时候还需要对队列加锁,线程多了这也是一笔大的开销。
方案3速度可以,简单高效,原因就是每个线程能批量处理大量任务,基本无锁(几个小的计数器Atomic锁的性能开销很低),缺点是太简单,而且需要对任务进行划分,通用性不算高。
方案4 Disruptor是采用了无锁的ringBuffer,作为无锁高性能一个内存队列早就名声在外,官方的原话是
"It's fast. Very fast."
目前公司的RPC Server就是用它来处理request请求,比基本的 FixedThreadPool 强了不止一个档次。 来个demo,代码见实验代码2,见识一下恐怖的tps吧
disruptor的性能
生产者使用三个线程去写数据,一个消费者进行处理。生产者运行在三个线程中,批处理写入,每次写16个数据。
在个人PC上实际测试每秒能达到恐怖的 183486238 的吞吐率,也就是惊人的1.8亿,无锁的ringbuffer太强悍了!唯一可惜是有界的,这个没办法......
实验代码1
实验代码2