One of the blogs I follow is Rudiger's blog - in particular
this article with a light benchmark I find interesting as it shows the clear performance advantage of the Disruptor, and also just how slow the out of the box services can be for particular use cases.
In the article, Rudiger tests out a number of different implementations of a pi calculator, and finds quickly that 1million small tasks is a nice way to put enough pressure on the concurrency frameworks to split out a winner. The out of the box java Executor service is actually relatively slow, coming in last position.
One of the items I have on my to-do list is actually to build a fast ExecutorService implementation, and fortunately after a busy few months I've had a chance to sit down and write it. For those new to concurrency on Java, the ExecutorService interface and the Executors factory methods provide an easy way to get into launching threads and handling off composable tasks to other threads for asynchronous work.
The work below borrows heavily from Rudiger's work but also the work at
1024cores.net, and of course I can't neglect to mention
Nitsan Wakart's blog and
Martin Thompson's numerous talks on the topic.
The target: Get a working ExecutorService that can be used in the Pi calculator and get a result that is better than out-of-the-box.
Getting a baseline
Of course, since my target is to beat the out-of-the-box frameworks, I spent some time downloading the gists and hooking up the libs in maven. Only thing is I didn't hook up the Abstraktor tests as I am not too familiar with the project (happy to take a pull request).
Interestingly, the out of the box services perform pretty much as described in Rudiger's post, with the caveat that disruptor services have a much sharper degradation in performance on this run. Worth mentioning is that this is only a 2-socket 8-core E5410's (the old Harpertown series), so not nearly as new as Rudiger's kit.
First cut
I've heard good things about the LinkedTransferQueue but not had a need to test it out. As a first cut, I decided to write up a simple Executor implementation that as part of the submit() call.
private static class LtqExecutor<T> implements ExecutorService {
private volatile boolean running;
private volatile boolean stopped;
private final int threadCount;
private final BlockingQueue<Runnable> queue;
private final Thread[] threads;
public LtqExecutor(int threadCount) {
this.threadCount = threadCount;
this.queue = new LinkedTransferQueue();
running = true;
stopped = false;
threads = new Thread[threadCount];
for(int i = 0; i < threadCount; i++) {
threads[i] = new Thread(new Worker());
threads[i].start();
}
}
public void execute(Runnable runnable) {
try {
queue.put(runnable);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
private class Worker implements Runnable {
public void run() {
while(running) {
Runnable runnable = null;
try {
runnable = queue.take();
} catch (InterruptedException ie) {
// was interrupted - just go round the loop again,
// if running = false will cause it to exit
}
try {
if (runnable != null) {
runnable.run();
}
} catch (Exception e) {
System.out.println("failed because of: " + e.toString());
e.printStackTrace();
}
}
}
}
I also removed the test for getQueue.size(). Actually, performance of this was rather impressive on its own, achieving results in 1/3rd the time of the ootb services, while of course it comes nowhere near Disruptor.
threads |
disruptor2 |
threadpi |
custom / LTQ |
1 |
954 |
1455 |
1111 |
2 |
498 |
1473 |
1318 |
3 |
327 |
2108 |
802 |
4 |
251 |
2135 |
762 |
5 |
203 |
2195 |
643 |
6 |
170 |
2167 |
581 |
7 |
7502 |
2259 |
653 |
8 |
9469 |
2444 |
5238 |
Trying some things
ArrayBlockingQueue and SynchronousQueue
I also tried substituting ABQ and SQ instead of LinkedTransferQueue, since my understanding is LTQ has some additional unnecessary overheads. Perhaps ABQ and SQ would be a little more cache friendly since ABQ is possibly better laid out in memory, and SQ as I've used it implementations before.
threads |
disruptor2 |
threadpi |
custom / LTQ |
custom / SQ |
custom / ABQ 1000 |
1 |
954 |
1455 |
1111 |
3738 |
2570 |
2 |
498 |
1473 |
1318 |
2638 |
2570 |
3 |
327 |
2108 |
802 |
3925 |
1875 |
4 |
251 |
2135 |
762 |
4035 |
1792 |
5 |
203 |
2195 |
643 |
4740 |
1692 |
6 |
170 |
2167 |
581 |
4635 |
1684 |
7 |
7502 |
2259 |
653 |
4362 |
1668 |
8 |
9469 |
2444 |
5238 |
4566 |
1677 |
Ouch. No need to mention these again.
Striping the LTQ
Clearly Disruptor's approach of limiting contended writes has a benefit. Would the LinkedTransferQueue get a little faster by striping the input queues - one per thread?
In this model, instead of having one queue into the Executor, each thread has its own thread, and the put() method round-robins across the threads.
Actually, this was a bit better for some workloads but with much faster degradation.
threads |
disruptor2 |
threadpi |
custom / LTQ |
Custom / striped LTQ |
1 |
954 |
1455 |
1111 |
1237 |
2 |
498 |
1473 |
1318 |
1127 |
3 |
327 |
2108 |
802 |
713 |
4 |
251 |
2135 |
762 |
570 |
5 |
203 |
2195 |
643 |
498 |
6 |
170 |
2167 |
581 |
386 |
7 |
7502 |
2259 |
653 |
5264 |
8 |
9469 |
2444 |
5238 |
5262 |
Customising a queue
A ringbuffer based queue
With a baseline set, I knew from reading Nitsan's many posts and the 1024cores.net site, I new there were some optimisations I could get from the problem space. What about using a queue with a ringbuffer?
By introducing a custom lock-free queue and substituting it in my Executor logic, using a put().
public void put(Runnable r) throws InterruptedException {
while (true) {
long toWrite = nextSlotToWrite.get();
if (toWrite < (lastConsumed.get() + capacity)) {
buffer[(int) (toWrite % capacity)] = r;
nextSlotToWrite.incrementAndGet();
break;
} else {
// no space in queue, sleep
LockSupport.parkNanos(1);
}
}
}
While the take() looks like this.
public Runnable take() throws InterruptedException {
while(!Thread.interrupted()) {
long lastFilled = nextSlotToWrite.get() - 1;
long lastConsumedCache = lastConsumed.get();
if (lastConsumedCache < lastFilled) {
// fetch the value from buffer first, to avoid a wrap race
int bufferslot = (int) ((lastConsumedCache + 1) % capacity);
Runnable r = buffer[bufferslot];
// attempt to cas
boolean casWon = lastConsumed.compareAndSet(
lastConsumedCache, lastConsumedCache + 1);
if (casWon) {
return r;
} else {
}
} else {
// cas failed, do not sleep
}
}
throw new InterruptedException();
}
Can we go faster by SPMC?
In particular, for our case, there is only ever a single writer thread issuing put()s to the queue. Can we take advantage of this by tweaking making the queue an SPMC queue (single producer many consumer)?
public void put(Runnable r) throws InterruptedException {
while (true) {
if (toWrite < (lastConsumed.get() + capacity)) {
buffer[(int) (toWrite & REMAINDER_MASK)] = r;
toWrite += 1;
nextSlotToWrite.lazySet(toWrite);
break;
} else {
// no space in queue, sleep
LockSupport.parkNanos(1);
}
}
}
Note that I use a lazySet() to write, but this doesn't help, I'm speculating that it's really the .get()'s in the below that are causing the issues.
public Runnable take() throws InterruptedException {
while(!Thread.interrupted()) {
long lastFilled = nextSlotToWrite.get() - 1;
long lastConsumedCache = lastConsumed.get();
if (lastConsumedCache < lastFilled) {
// fetch the value from buffer first, to avoid a wrap race
int bufferslot = (int) ((lastConsumedCache + 1) & REMAINDER_MASK);
Runnable r = buffer[bufferslot];
// attempt to cas
boolean casWon = lastConsumed.compareAndSet(
lastConsumedCache, lastConsumedCache + 1);
if (casWon) {
return r;
} else {
// if cas failed, then we are not the owner so try again
}
} else {
// no values available to take, wait a while
LockSupport.parkNanos(1);
}
}
throw new InterruptedException();
}
By this point I've also started to use 2^ sized queues (following the disruptor guidance on fast modulo). Actually, this didn't make a lot of difference but I've left it in.
Caching the get()s
How can I reduce contention when things are busy? Having a look at the take() method we can see that there are two .get() calls to atomic fields, this will be causing volatile reads from memory. On CAS failure, it's not always the case that we need to re-read the variable. So what happens if we move that to only happen during the "things are too busy" loop?
public Runnable take() throws InterruptedException {
long lastFilled = nextSlotToWrite.get() - 1;
long lastConsumedCache = lastConsumed.get();
while(!Thread.interrupted()) {
if (lastConsumedCache < lastFilled) {
// fetch the value from buffer first, to avoid a wrap race
int bufferslot = (int) ((lastConsumedCache + 1) & REMAINDER_MASK);
Runnable r = buffer[bufferslot];
// attempt to cas
boolean casWon = lastConsumed.compareAndSet(
lastConsumedCache, lastConsumedCache + 1);
if (casWon) {
return r;
} else {
// if cas failed, then we are not the owner so try again
// LockSupport.parkNanos(1);
// note that the lastFilled does not need to change just yet
// but probably the lastConsumed did
lastConsumedCache = lastConsumed.get();
}
} else {
// no values available to take, wait a while
LockSupport.parkNanos(1);
lastFilled = nextSlotToWrite.get() - 1;
// lastConsumedCache = lastConsumed.get();
}
}
throw new InterruptedException();
}
}
threads |
akkapi |
disruptor |
disruptor2 |
threadpi |
custom / LTQ |
Custom Lockfree Pp Tp Padded |
Custom with cached gets |
1 |
2389 |
1300 |
954 |
1455 |
1111 |
1245 |
1257 |
2 |
1477 |
713 |
498 |
1473 |
1318 |
992 |
1007 |
3 |
1266 |
492 |
327 |
2108 |
802 |
676 |
695 |
4 |
1629 |
423 |
251 |
2135 |
762 |
548 |
559 |
5 |
1528 |
400 |
203 |
2195 |
643 |
538 |
515 |
6 |
1541 |
398 |
170 |
2167 |
581 |
545 |
510 |
7 |
1482 |
1085 |
7502 |
2259 |
653 |
603 |
562 |
8 |
1397 |
1297 |
9469 |
2444 |
5238 |
925 |
883 |
Not a lot of difference, but it does help our under-contention numbers, which is exactly what we'd expect.
The results
Actually, the results were good, I was pretty happy with them. They're a small fraction faster than the LinkedTransferQueue with what seems like better worst case performance. Nowhere near as fast as the Disruptor but frankly that's not surprising, given the energy and talent working on it.
Of course, this is not Production ready, and YMMV but I would welcome comments and questions. Particularly if there are any issues with correctness.
Code samples on github.