In the article, Rudiger tests out a number of different implementations of a pi calculator, and finds quickly that 1million small tasks is a nice way to put enough pressure on the concurrency frameworks to split out a winner. The out of the box java Executor service is actually relatively slow, coming in last position.
One of the items I have on my to-do list is actually to build a fast ExecutorService implementation, and fortunately after a busy few months I've had a chance to sit down and write it. For those new to concurrency on Java, the ExecutorService interface and the Executors factory methods provide an easy way to get into launching threads and handling off composable tasks to other threads for asynchronous work.
The work below borrows heavily from Rudiger's work but also the work at 1024cores.net, and of course I can't neglect to mention Nitsan Wakart's blog and Martin Thompson's numerous talks on the topic.
The target: Get a working ExecutorService that can be used in the Pi calculator and get a result that is better than out-of-the-box.
Getting a baseline
Of course, since my target is to beat the out-of-the-box frameworks, I spent some time downloading the gists and hooking up the libs in maven. Only thing is I didn't hook up the Abstraktor tests as I am not too familiar with the project (happy to take a pull request).Interestingly, the out of the box services perform pretty much as described in Rudiger's post, with the caveat that disruptor services have a much sharper degradation in performance on this run. Worth mentioning is that this is only a 2-socket 8-core E5410's (the old Harpertown series), so not nearly as new as Rudiger's kit.
First cut
I've heard good things about the LinkedTransferQueue but not had a need to test it out. As a first cut, I decided to write up a simple Executor implementation that as part of the submit() call.private static class LtqExecutor<T> implements ExecutorService {
private volatile boolean running;
private volatile boolean stopped;
private final int threadCount;
private final BlockingQueue<Runnable> queue;
private final Thread[] threads;
public LtqExecutor(int threadCount) {
this.threadCount = threadCount;
this.queue = new LinkedTransferQueue();
running = true;
stopped = false;
threads = new Thread[threadCount];
for(int i = 0; i < threadCount; i++) {
threads[i] = new Thread(new Worker());
threads[i].start();
}
}
public void execute(Runnable runnable) {
try {
queue.put(runnable);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
private class Worker implements Runnable {
public void run() {
while(running) {
Runnable runnable = null;
try {
runnable = queue.take();
} catch (InterruptedException ie) {
// was interrupted - just go round the loop again,
// if running = false will cause it to exit
}
try {
if (runnable != null) {
runnable.run();
}
} catch (Exception e) {
System.out.println("failed because of: " + e.toString());
e.printStackTrace();
}
}
}
}
| threads | disruptor2 | threadpi | custom / LTQ | 
| 1 | 954 | 1455 | 1111 | 
| 2 | 498 | 1473 | 1318 | 
| 3 | 327 | 2108 | 802 | 
| 4 | 251 | 2135 | 762 | 
| 5 | 203 | 2195 | 643 | 
| 6 | 170 | 2167 | 581 | 
| 7 | 7502 | 2259 | 653 | 
| 8 | 9469 | 2444 | 5238 | 
Trying some things
ArrayBlockingQueue and SynchronousQueue
I also tried substituting ABQ and SQ instead of LinkedTransferQueue, since my understanding is LTQ has some additional unnecessary overheads. Perhaps ABQ and SQ would be a little more cache friendly since ABQ is possibly better laid out in memory, and SQ as I've used it implementations before.| threads | disruptor2 | threadpi | custom / LTQ | custom / SQ | custom / ABQ 1000 | 
| 1 | 954 | 1455 | 1111 | 3738 | 2570 | 
| 2 | 498 | 1473 | 1318 | 2638 | 2570 | 
| 3 | 327 | 2108 | 802 | 3925 | 1875 | 
| 4 | 251 | 2135 | 762 | 4035 | 1792 | 
| 5 | 203 | 2195 | 643 | 4740 | 1692 | 
| 6 | 170 | 2167 | 581 | 4635 | 1684 | 
| 7 | 7502 | 2259 | 653 | 4362 | 1668 | 
| 8 | 9469 | 2444 | 5238 | 4566 | 1677 | 
Ouch. No need to mention these again.
Striping the LTQ
Clearly Disruptor's approach of limiting contended writes has a benefit. Would the LinkedTransferQueue get a little faster by striping the input queues - one per thread?
In this model, instead of having one queue into the Executor, each thread has its own thread, and the put() method round-robins across the threads.
Actually, this was a bit better for some workloads but with much faster degradation.
 
In this model, instead of having one queue into the Executor, each thread has its own thread, and the put() method round-robins across the threads.
Actually, this was a bit better for some workloads but with much faster degradation.
| threads | disruptor2 | threadpi | custom / LTQ | Custom / striped LTQ | 
| 1 | 954 | 1455 | 1111 | 1237 | 
| 2 | 498 | 1473 | 1318 | 1127 | 
| 3 | 327 | 2108 | 802 | 713 | 
| 4 | 251 | 2135 | 762 | 570 | 
| 5 | 203 | 2195 | 643 | 498 | 
| 6 | 170 | 2167 | 581 | 386 | 
| 7 | 7502 | 2259 | 653 | 5264 | 
| 8 | 9469 | 2444 | 5238 | 5262 | 
Customising a queue
A ringbuffer based queue
With a baseline set, I knew from reading Nitsan's many posts and the 1024cores.net site, I new there were some optimisations I could get from the problem space. What about using a queue with a ringbuffer?
By introducing a custom lock-free queue and substituting it in my Executor logic, using a put().
            public void put(Runnable r) throws InterruptedException {
                while (true) {
                    long toWrite = nextSlotToWrite.get();
                    if (toWrite < (lastConsumed.get() + capacity)) {
                        buffer[(int) (toWrite % capacity)] = r;
                        nextSlotToWrite.incrementAndGet();
                        break;
                    } else {
                        // no space in queue, sleep
                        LockSupport.parkNanos(1);
                    }
                }
            }
While the take() looks like this.
            public Runnable take() throws InterruptedException {
                while(!Thread.interrupted()) {
                    long lastFilled = nextSlotToWrite.get() - 1;
                    long lastConsumedCache = lastConsumed.get();
                    if (lastConsumedCache < lastFilled) {
                        // fetch the value from buffer first, to avoid a wrap race
                        int bufferslot = (int) ((lastConsumedCache + 1) % capacity);
                        Runnable r = buffer[bufferslot];
                        // attempt to cas
                        boolean casWon = lastConsumed.compareAndSet(
                            lastConsumedCache, lastConsumedCache + 1); 
                        if (casWon) {
                            return r;
                        } else {
                        }
                    } else {
                        // cas failed, do not sleep
                    }
                }
                throw new InterruptedException();
            }
Can we go faster by SPMC?
In particular, for our case, there is only ever a single writer thread issuing put()s to the queue. Can we take advantage of this by tweaking making the queue an SPMC queue (single producer many consumer)?
            public void put(Runnable r) throws InterruptedException {
                while (true) {
                    if (toWrite < (lastConsumed.get() + capacity)) {
                        buffer[(int) (toWrite & REMAINDER_MASK)] = r;
                        toWrite += 1;
                        nextSlotToWrite.lazySet(toWrite);
                        break;
                    } else {
                        // no space in queue, sleep
                        LockSupport.parkNanos(1);
                    }
                }
            }
Note that I use a lazySet() to write, but this doesn't help, I'm speculating that it's really the .get()'s in the below that are causing the issues.
            public Runnable take() throws InterruptedException {
                while(!Thread.interrupted()) {
                    long lastFilled = nextSlotToWrite.get() - 1;
                    long lastConsumedCache = lastConsumed.get();
                    if (lastConsumedCache < lastFilled) {
                        // fetch the value from buffer first, to avoid a wrap race
                        int bufferslot = (int) ((lastConsumedCache + 1) & REMAINDER_MASK);
                        Runnable r = buffer[bufferslot];
                        // attempt to cas
                        boolean casWon = lastConsumed.compareAndSet(
                            lastConsumedCache, lastConsumedCache + 1); 
                        if (casWon) {
                            return r;
                        } else {
                            // if cas failed, then we are not the owner so try again
                        }
                    } else {
                        // no values available to take, wait a while
                        LockSupport.parkNanos(1);
                    }
                }
                throw new InterruptedException();
            }
By this point I've also started to use 2^ sized queues (following the disruptor guidance on fast modulo). Actually, this didn't make a lot of difference but I've left it in.
Caching the get()s
How can I reduce contention when things are busy? Having a look at the take() method we can see that there are two .get() calls to atomic fields, this will be causing volatile reads from memory. On CAS failure, it's not always the case that we need to re-read the variable. So what happens if we move that to only happen during the "things are too busy" loop?
            public Runnable take() throws InterruptedException {
                long lastFilled = nextSlotToWrite.get() - 1;
                long lastConsumedCache = lastConsumed.get();
                while(!Thread.interrupted()) {
                    if (lastConsumedCache < lastFilled) {
                        // fetch the value from buffer first, to avoid a wrap race
                        int bufferslot = (int) ((lastConsumedCache + 1) & REMAINDER_MASK);
                        Runnable r = buffer[bufferslot];
                        // attempt to cas
                        boolean casWon = lastConsumed.compareAndSet(
                            lastConsumedCache, lastConsumedCache + 1);
                        if (casWon) {
                            return r;
                        } else {
                            // if cas failed, then we are not the owner so try again
                            // LockSupport.parkNanos(1);
                            // note that the lastFilled does not need to change just yet
                            // but probably the lastConsumed did
                            lastConsumedCache = lastConsumed.get();
                        }
                    } else {
                        // no values available to take, wait a while
                        LockSupport.parkNanos(1);
                        lastFilled = nextSlotToWrite.get() - 1;
                        // lastConsumedCache = lastConsumed.get();
                    }
                }
                throw new InterruptedException();
            }
        }
| threads | akkapi | disruptor | disruptor2 | threadpi | custom / LTQ | Custom Lockfree Pp Tp Padded | Custom with cached gets | 
| 1 | 2389 | 1300 | 954 | 1455 | 1111 | 1245 | 1257 | 
| 2 | 1477 | 713 | 498 | 1473 | 1318 | 992 | 1007 | 
| 3 | 1266 | 492 | 327 | 2108 | 802 | 676 | 695 | 
| 4 | 1629 | 423 | 251 | 2135 | 762 | 548 | 559 | 
| 5 | 1528 | 400 | 203 | 2195 | 643 | 538 | 515 | 
| 6 | 1541 | 398 | 170 | 2167 | 581 | 545 | 510 | 
| 7 | 1482 | 1085 | 7502 | 2259 | 653 | 603 | 562 | 
| 8 | 1397 | 1297 | 9469 | 2444 | 5238 | 925 | 883 | 
Not a lot of difference, but it does help our under-contention numbers, which is exactly what we'd expect.
The results
Actually, the results were good, I was pretty happy with them. They're a small fraction faster than the LinkedTransferQueue with what seems like better worst case performance. Nowhere near as fast as the Disruptor but frankly that's not surprising, given the energy and talent working on it.
Of course, this is not Production ready, and YMMV but I would welcome comments and questions. Particularly if there are any issues with correctness.
Code samples on github.
Code samples on github.

 
Hi Jason,
ReplyDeleteinteresting results. As pointed out by M.Barker, the disruptor test consumes an additional thread feeding the queue, so a decline at 7 threads should be expected.
Regarding abstraktor (now renamed to kontraktor ..). I am in the middle of the road of a 1.x => 2.x transition so I'll come back with a testcase once i got this polished and optimized.
have to dig into your code deeper, a custom executor should perform better than my original abstractor tests, as it does not require reflection based calls. From the charts it looks like it would not perform better, however these kind of tests depend a lot on underlying hardware, so results might look different on other machines.
BTW: have you tried https://github.com/JCTools/JCTools using as a queue implementation ? For my actor lib, this made a big difference.
ReplyDeleteThanks for your compliments, glad my blog was helpful :-)
ReplyDeleteI would be interested in the sort of performance you get with the JCTools queues. A lock free executor is in the JCTools future, there's some very interesting academical papers on this topic some of which are copied under the JCTools resources folder.
Some points for consideration on the SPMC front:
1. The producer is MUCH faster than the consumers are going to be(in this use case), so the size of the queue and the behaviour when near full/full will be important. If you size your queue to the workload you can remove this issue. An implementation is available (Fast Flow like) such that the producer does not read the consumer index, thus removing a further thread harassing that particular memory location.
2. The CAS loop on the consumer side will be a cause of contention for all the consumer threads (and the main concurrency bottleneck). You can let the consumers 'chunk' work by letting them grab several slots at a time. You can stripe the queues per consumer, but than you have no work stealing which is usually desirable, using multiple SPMC queues and and cycling between them on CAS failure can reduce the contention as well. This is close to the approach described by Dice & co as a Multi-Lane queue (others have the same idea). There's a similar MPSC in JCTools.
3. There's many layout related optimizations to add: using inlined counters instead of referencing external counters, fully padding the counters/array elements/queue fields(you are padding the counters on one side only), extending the queue instead of referring to a queue. They should not make a massive difference but should improve the overall stability of performance.
4. Note that there are semantic differences between the Disruptor solution to the other solutions which needs to be acknowledged when not all Runnables are of the same class.
The FJ pool discussion which followed on the MS mailing list was very interesting in the context of the original post: https://groups.google.com/forum/#!topic/mechanical-sympathy/xbAiMHS3pOk%5B1-25-false%5D
Also, you mention 1024cores (which is a great resource), but there's no link: http://www.1024cores.net/
Nice post, thanks :-)
@Rudiger - thanks for the tip, I'd seen it but thought it was a little extra work to get going. You've prompted me to have a look and in fact it was a drop-in replacement which has a significant impact especially at the higher thread counts. I would be interested to see how your abstractor compares, although I do think that my hardware is quite old, I'm due for an ebay refresh :).
ReplyDelete@Nitsan thanks for your detailed look. I have put a drop-in Spmc Jctools. In regards to your points.
1) Yes this is a good observation, I haven't given much thought yet on how to implement. Perhaps I can look at FF and try to find a port to Java, although I seem to recall you might have already done that.
2) Yes this definitely seems the case. This however is not very compatible with the idea of Runnables as independent tasks unless you implement workstealing which significantly increases complexity. I did have a quick test striping with LinkedTransferQueue but found the benefits not signficant enough to pursue in more detail but maybe this is worth pursuing further.
3) Baby steps ..
4) This is a very very important thing to note, the tasks being submitted here are well suited to the Disruptor as they are all of a similar size and nonblocking. However, there's no way here for disruptor to handle a single slow-running task without blocking all tasks. This is a critical point for a generic executor framework. It seems an optimisation "overfit" to use the Disruptor in cases where the tasks may have significantly varying runtimes.
I'll give an abstractor version once I find time to consolidate (say 2 weeks). I created my very own jar hell as i edit all my stuff (fast serialization, kontraktor, kontraktor-netty, nett2go, reallive) inside one big project. So in order to get a valid kontraktor release i frequently have to release all subprojects required by kontraktor to maven.org, which takes some time you know :-). Also i have changed the scheduler to scale automatically core-by-core using runtime profiling. Downside is, that this might impose some cost for this benchmark, i have to implement a way to force kontraktor to actually use the given amount of threads instead of figuring this out automatically (else it will always start with one thread and end with 8).
ReplyDelete4) The disruptor is a good fit if you want to speed up processing of a single item by parallelizing per-item logic.
E.g. one Thread decodes, one or more thread do business logic (stepwise), one thread encodes, one thread sends out to network.
You often read on "try using the disruptor for Actor frameworks" however this does not fit at all.
For its given use case, the disruptor is unbeatable (have done many non-public tests on that), but its hardly usable as an "Executor Framework".
The idea of an Executor framework is to handle several tasks in parallel. The idea of disruptor is to parallelize the processing of a single item/event.
Especially if you need to process events in order (e.g. parallelize processing but outbound send results in the order submitted), disruptor is way faster.
Added Kontrakor 2.0 results (check for my pull request)
ReplyDeleteCompared to the very early "Abstractor" version I tested, i lost some speed. However see how compact the code with kontraktor is, still way faster than AKKA and MT.
On Opteron 2 socket each 8 FP cores 16 Int cores:
AKKA:
average 1 threads : 1498
average 2 threads : 1069
average 3 threads : 942
average 4 threads : 760
average 5 threads : 2215
average 6 threads : 2190
average 7 threads : 2309
average 8 threads : 1791
Kontraktor:
average 1 threads : 1520
average 2 threads : 804
average 3 threads : 571
average 4 threads : 459
average 5 threads : 457
average 6 threads : 534
average 7 threads : 615
average 8 threads : 659
Disruptor:
1: 712
2: 360
3: 267
4: 197
5: 179
6: 141
7: 193
8: 310
This comment has been removed by the author.
Deleteargh .. wrong gist: here is the right one https://gist.github.com/RuedigerMoeller/4a6bf58275ec09277525
DeleteIntel Numbers:
DeleteIntel CPU's
moelrue@otd2039 ~]$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 12
On-line CPU(s) list: 0-11
Thread(s) per core: 1
Core(s) per socket: 6
Socket(s): 2
NUMA node(s): 2
Vendor ID: GenuineIntel
CPU family: 6
Model: 44
Stepping: 2
CPU MHz: 3067.058
BogoMIPS: 6133.20
Virtualization: VT-x
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 12288K
NUMA node0 CPU(s): 1,3,5,7,9,11
NUMA node1 CPU(s): 0,2,4,6,8,10
Akka:
average 1 threads : 1019
average 2 threads : 605
average 3 threads : 507
average 4 threads : 454
average 5 threads : 907
average 6 threads : 1517
average 7 threads : 1536
average 8 threads : 1599
Kontraktor:
average 1 threads : 979
average 2 threads : 508
average 3 threads : 359
average 4 threads : 301
average 5 threads : 256
average 6 threads : 310
average 7 threads : 325
average 8 threads : 315
Disruptor2:
1: 676
2: 339
3: 230
4: 173
5: 140
6: 125
7: 119
8: 119
CustomExecutorPI:
average 1 threads : 856
average 2 threads : 749
average 3 threads : 460
average 4 threads : 390
average 5 threads : 346
average 6 threads : 370
average 7 threads : 533
average 8 threads : 1086
CustomExecutorStripedPI:
average 1 threads : 856
average 2 threads : 596
average 3 threads : 410
average 4 threads : 326
average 5 threads : 322
average 6 threads : 344
average 7 threads : 508
average 8 threads : 856