Sunday, 21 September 2014

Writing a non-blocking Executor

One of the blogs I follow is Rudiger's blog - in particular this article with a light benchmark I find interesting  as it shows the clear performance advantage of the Disruptor, and also just how slow the out of the box services can be for particular use cases.

In the article, Rudiger tests out a number of different implementations of a pi calculator, and finds quickly that 1million small tasks is a nice way to put enough pressure on the concurrency frameworks to split out a winner. The out of the box java Executor service is actually relatively slow, coming in last position.

One of the items I have on my to-do list is actually to build a fast ExecutorService implementation, and fortunately after a busy few months I've had a chance to sit down and write it. For those new to concurrency on Java, the ExecutorService interface and the Executors factory methods provide an easy way to get into launching threads and handling off composable tasks to other threads for asynchronous work.

The work below borrows heavily from Rudiger's work but also the work at 1024cores.net, and of course I can't neglect to mention Nitsan Wakart's blog and Martin Thompson's numerous talks on the topic.

The target: Get a working ExecutorService that can be used in the Pi calculator and get a result that is better than out-of-the-box.

Getting a baseline

Of course, since my target is to beat the out-of-the-box frameworks, I spent some time downloading the gists and hooking up the libs in maven. Only thing is I didn't hook up the Abstraktor tests as I am not too familiar with the project (happy to take a pull request).

Interestingly, the out of the box services perform pretty much as described in Rudiger's post, with the caveat that disruptor services have a much sharper degradation in performance on this run. Worth mentioning is that this is only a 2-socket 8-core E5410's (the old Harpertown series), so not nearly as new as Rudiger's kit.

First cut

I've heard good things about the LinkedTransferQueue but not had a need to test it out. As a first cut, I decided to write up a simple Executor implementation that as part of the submit() call.

    private static class LtqExecutor<T> implements ExecutorService {

        private volatile boolean running;
        private volatile boolean stopped;
        private final int threadCount;
        private final BlockingQueue<Runnable> queue;
        private final Thread[] threads;

        public LtqExecutor(int threadCount) {
            this.threadCount = threadCount;
            this.queue = new LinkedTransferQueue();
            running = true;
            stopped = false;
            threads = new Thread[threadCount];
            for(int i = 0; i < threadCount; i++) {
                threads[i] = new Thread(new Worker());
                threads[i].start();
            }
        }

        public void execute(Runnable runnable) {
            try {
                queue.put(runnable);
            } catch (InterruptedException ie) {
                throw new RuntimeException(ie);
            }
        }

        private class Worker implements Runnable {
            public void run() {
                while(running) {
                    Runnable runnable = null;
                    try {
                        runnable = queue.take();
                    } catch (InterruptedException ie) {
                        // was interrupted - just go round the loop again,
                        // if running = false will cause it to exit
                    }
                    try {
                        if (runnable != null) { 
                            runnable.run();
                        }
                    } catch (Exception e) {
                        System.out.println("failed because of: " + e.toString());
                        e.printStackTrace();
                    }
                }
            }
        }

I also removed the test for getQueue.size(). Actually, performance of this was rather impressive on its own, achieving results in 1/3rd the time of the ootb services, while of course it comes nowhere near Disruptor.

threads disruptor2 threadpi custom / LTQ
1 954 1455 1111
2 498 1473 1318
3 327 2108 802
4 251 2135 762
5 203 2195 643
6 170 2167 581
7 7502 2259 653
8 9469 2444 5238

Trying some things

ArrayBlockingQueue and SynchronousQueue

I also tried substituting ABQ and SQ instead of LinkedTransferQueue, since my understanding is LTQ has some additional unnecessary overheads. Perhaps ABQ and SQ would be a little more cache friendly since ABQ is possibly better laid out in memory, and SQ as I've used it implementations before.

threads disruptor2 threadpi custom / LTQ custom / SQ custom / ABQ 1000
1 954 1455 1111 3738 2570
2 498 1473 1318 2638 2570
3 327 2108 802 3925 1875
4 251 2135 762 4035 1792
5 203 2195 643 4740 1692
6 170 2167 581 4635 1684
7 7502 2259 653 4362 1668
8 9469 2444 5238 4566 1677

Ouch. No need to mention these again.

Striping the LTQ

Clearly Disruptor's approach of limiting contended writes has a benefit. Would the LinkedTransferQueue get a little faster by striping the input queues - one per thread?

In this model, instead of having one queue into the Executor, each thread has its own thread, and the put() method round-robins across the threads.

Actually, this was a bit better for some workloads but with much faster degradation.

threads disruptor2 threadpi custom / LTQ Custom / striped LTQ
1 954 1455 1111 1237
2 498 1473 1318 1127
3 327 2108 802 713
4 251 2135 762 570
5 203 2195 643 498
6 170 2167 581 386
7 7502 2259 653 5264
8 9469 2444 5238 5262

Customising a queue

A ringbuffer based queue

With a baseline set, I knew from reading Nitsan's many posts and the 1024cores.net site, I new there were some optimisations I could get from the problem space. What about using a queue with a ringbuffer?

By introducing a custom lock-free queue and substituting it in my Executor logic, using a put().

            public void put(Runnable r) throws InterruptedException {
                while (true) {
                    long toWrite = nextSlotToWrite.get();
                    if (toWrite < (lastConsumed.get() + capacity)) {
                        buffer[(int) (toWrite % capacity)] = r;
                        nextSlotToWrite.incrementAndGet();
                        break;
                    } else {
                        // no space in queue, sleep
                        LockSupport.parkNanos(1);
                    }
                }
            }

While the take() looks like this.

            public Runnable take() throws InterruptedException {
                while(!Thread.interrupted()) {
                    long lastFilled = nextSlotToWrite.get() - 1;
                    long lastConsumedCache = lastConsumed.get();
                    if (lastConsumedCache < lastFilled) {
                        // fetch the value from buffer first, to avoid a wrap race
                        int bufferslot = (int) ((lastConsumedCache + 1) % capacity);
                        Runnable r = buffer[bufferslot];
                        // attempt to cas
                        boolean casWon = lastConsumed.compareAndSet(
                            lastConsumedCache, lastConsumedCache + 1); 
                        if (casWon) {
                            return r;
                        } else {
                        }
                    } else {
                        // cas failed, do not sleep
                    }
                }
                throw new InterruptedException();
            }

Can we go faster by SPMC?

In particular, for our case, there is only ever a single writer thread issuing put()s to the queue. Can we take advantage of this by tweaking making the queue an SPMC queue (single producer many consumer)?

            public void put(Runnable r) throws InterruptedException {
                while (true) {
                    if (toWrite < (lastConsumed.get() + capacity)) {
                        buffer[(int) (toWrite & REMAINDER_MASK)] = r;
                        toWrite += 1;
                        nextSlotToWrite.lazySet(toWrite);
                        break;
                    } else {
                        // no space in queue, sleep
                        LockSupport.parkNanos(1);
                    }
                }
            }

Note that I use a lazySet() to write, but this doesn't help, I'm speculating that it's really the .get()'s in the below that are causing the issues.

            public Runnable take() throws InterruptedException {
                while(!Thread.interrupted()) {
                    long lastFilled = nextSlotToWrite.get() - 1;
                    long lastConsumedCache = lastConsumed.get();
                    if (lastConsumedCache < lastFilled) {
                        // fetch the value from buffer first, to avoid a wrap race
                        int bufferslot = (int) ((lastConsumedCache + 1) & REMAINDER_MASK);
                        Runnable r = buffer[bufferslot];
                        // attempt to cas
                        boolean casWon = lastConsumed.compareAndSet(
                            lastConsumedCache, lastConsumedCache + 1); 
                        if (casWon) {
                            return r;
                        } else {
                            // if cas failed, then we are not the owner so try again
                        }
                    } else {
                        // no values available to take, wait a while
                        LockSupport.parkNanos(1);
                    }
                }
                throw new InterruptedException();
            }

By this point I've also started to use 2^ sized queues (following the disruptor guidance on fast modulo). Actually, this didn't make a lot of difference but I've left it in.

Caching the get()s

How can I reduce contention when things are busy? Having a look at the take() method we can see that there are two .get() calls to atomic fields, this will be causing volatile reads from memory. On CAS failure, it's not always the case that we need to re-read the variable. So what happens if we move that to only happen during the "things are too busy" loop?

            public Runnable take() throws InterruptedException {
                long lastFilled = nextSlotToWrite.get() - 1;
                long lastConsumedCache = lastConsumed.get();
                while(!Thread.interrupted()) {
                    if (lastConsumedCache < lastFilled) {
                        // fetch the value from buffer first, to avoid a wrap race
                        int bufferslot = (int) ((lastConsumedCache + 1) & REMAINDER_MASK);
                        Runnable r = buffer[bufferslot];
                        // attempt to cas
                        boolean casWon = lastConsumed.compareAndSet(
                            lastConsumedCache, lastConsumedCache + 1);
                        if (casWon) {
                            return r;
                        } else {
                            // if cas failed, then we are not the owner so try again
                            // LockSupport.parkNanos(1);

                            // note that the lastFilled does not need to change just yet
                            // but probably the lastConsumed did
                            lastConsumedCache = lastConsumed.get();
                        }
                    } else {
                        // no values available to take, wait a while
                        LockSupport.parkNanos(1);
                        lastFilled = nextSlotToWrite.get() - 1;
                        // lastConsumedCache = lastConsumed.get();
                    }
                }
                throw new InterruptedException();
            }
        }

threads akkapi disruptor disruptor2 threadpi custom / LTQ Custom Lockfree Pp Tp Padded Custom with cached gets
1 2389 1300 954 1455 1111 1245 1257
2 1477 713 498 1473 1318 992 1007
3 1266 492 327 2108 802 676 695
4 1629 423 251 2135 762 548 559
5 1528 400 203 2195 643 538 515
6 1541 398 170 2167 581 545 510
7 1482 1085 7502 2259 653 603 562
8 1397 1297 9469 2444 5238 925 883

Not a lot of difference, but it does help our under-contention numbers, which is exactly what we'd expect.

The results

Actually, the results were good, I was pretty happy with them. They're a small fraction faster than the LinkedTransferQueue with what seems like better worst case performance. Nowhere near as fast as the Disruptor but frankly that's not surprising, given the energy and talent working on it.


Of course, this is not Production ready, and YMMV but I would welcome comments and questions. Particularly if there are any issues with correctness.

Code samples on github.

9 comments:

  1. Hi Jason,

    interesting results. As pointed out by M.Barker, the disruptor test consumes an additional thread feeding the queue, so a decline at 7 threads should be expected.

    Regarding abstraktor (now renamed to kontraktor ..). I am in the middle of the road of a 1.x => 2.x transition so I'll come back with a testcase once i got this polished and optimized.

    have to dig into your code deeper, a custom executor should perform better than my original abstractor tests, as it does not require reflection based calls. From the charts it looks like it would not perform better, however these kind of tests depend a lot on underlying hardware, so results might look different on other machines.

    ReplyDelete
  2. BTW: have you tried https://github.com/JCTools/JCTools using as a queue implementation ? For my actor lib, this made a big difference.

    ReplyDelete
  3. Thanks for your compliments, glad my blog was helpful :-)
    I would be interested in the sort of performance you get with the JCTools queues. A lock free executor is in the JCTools future, there's some very interesting academical papers on this topic some of which are copied under the JCTools resources folder.
    Some points for consideration on the SPMC front:
    1. The producer is MUCH faster than the consumers are going to be(in this use case), so the size of the queue and the behaviour when near full/full will be important. If you size your queue to the workload you can remove this issue. An implementation is available (Fast Flow like) such that the producer does not read the consumer index, thus removing a further thread harassing that particular memory location.
    2. The CAS loop on the consumer side will be a cause of contention for all the consumer threads (and the main concurrency bottleneck). You can let the consumers 'chunk' work by letting them grab several slots at a time. You can stripe the queues per consumer, but than you have no work stealing which is usually desirable, using multiple SPMC queues and and cycling between them on CAS failure can reduce the contention as well. This is close to the approach described by Dice & co as a Multi-Lane queue (others have the same idea). There's a similar MPSC in JCTools.
    3. There's many layout related optimizations to add: using inlined counters instead of referencing external counters, fully padding the counters/array elements/queue fields(you are padding the counters on one side only), extending the queue instead of referring to a queue. They should not make a massive difference but should improve the overall stability of performance.
    4. Note that there are semantic differences between the Disruptor solution to the other solutions which needs to be acknowledged when not all Runnables are of the same class.
    The FJ pool discussion which followed on the MS mailing list was very interesting in the context of the original post: https://groups.google.com/forum/#!topic/mechanical-sympathy/xbAiMHS3pOk%5B1-25-false%5D
    Also, you mention 1024cores (which is a great resource), but there's no link: http://www.1024cores.net/
    Nice post, thanks :-)

    ReplyDelete
  4. @Rudiger - thanks for the tip, I'd seen it but thought it was a little extra work to get going. You've prompted me to have a look and in fact it was a drop-in replacement which has a significant impact especially at the higher thread counts. I would be interested to see how your abstractor compares, although I do think that my hardware is quite old, I'm due for an ebay refresh :).

    @Nitsan thanks for your detailed look. I have put a drop-in Spmc Jctools. In regards to your points.
    1) Yes this is a good observation, I haven't given much thought yet on how to implement. Perhaps I can look at FF and try to find a port to Java, although I seem to recall you might have already done that.
    2) Yes this definitely seems the case. This however is not very compatible with the idea of Runnables as independent tasks unless you implement workstealing which significantly increases complexity. I did have a quick test striping with LinkedTransferQueue but found the benefits not signficant enough to pursue in more detail but maybe this is worth pursuing further.
    3) Baby steps ..
    4) This is a very very important thing to note, the tasks being submitted here are well suited to the Disruptor as they are all of a similar size and nonblocking. However, there's no way here for disruptor to handle a single slow-running task without blocking all tasks. This is a critical point for a generic executor framework. It seems an optimisation "overfit" to use the Disruptor in cases where the tasks may have significantly varying runtimes.

    ReplyDelete
  5. I'll give an abstractor version once I find time to consolidate (say 2 weeks). I created my very own jar hell as i edit all my stuff (fast serialization, kontraktor, kontraktor-netty, nett2go, reallive) inside one big project. So in order to get a valid kontraktor release i frequently have to release all subprojects required by kontraktor to maven.org, which takes some time you know :-). Also i have changed the scheduler to scale automatically core-by-core using runtime profiling. Downside is, that this might impose some cost for this benchmark, i have to implement a way to force kontraktor to actually use the given amount of threads instead of figuring this out automatically (else it will always start with one thread and end with 8).

    4) The disruptor is a good fit if you want to speed up processing of a single item by parallelizing per-item logic.
    E.g. one Thread decodes, one or more thread do business logic (stepwise), one thread encodes, one thread sends out to network.

    You often read on "try using the disruptor for Actor frameworks" however this does not fit at all.

    For its given use case, the disruptor is unbeatable (have done many non-public tests on that), but its hardly usable as an "Executor Framework".

    The idea of an Executor framework is to handle several tasks in parallel. The idea of disruptor is to parallelize the processing of a single item/event.
    Especially if you need to process events in order (e.g. parallelize processing but outbound send results in the order submitted), disruptor is way faster.

    ReplyDelete
  6. Added Kontrakor 2.0 results (check for my pull request)
    Compared to the very early "Abstractor" version I tested, i lost some speed. However see how compact the code with kontraktor is, still way faster than AKKA and MT.

    On Opteron 2 socket each 8 FP cores 16 Int cores:
    AKKA:
    average 1 threads : 1498
    average 2 threads : 1069
    average 3 threads : 942
    average 4 threads : 760
    average 5 threads : 2215
    average 6 threads : 2190
    average 7 threads : 2309
    average 8 threads : 1791


    Kontraktor:
    average 1 threads : 1520
    average 2 threads : 804
    average 3 threads : 571
    average 4 threads : 459
    average 5 threads : 457
    average 6 threads : 534
    average 7 threads : 615
    average 8 threads : 659

    Disruptor:
    1: 712
    2: 360
    3: 267
    4: 197
    5: 179
    6: 141
    7: 193
    8: 310


    ReplyDelete
    Replies
    1. This comment has been removed by the author.

      Delete
    2. argh .. wrong gist: here is the right one https://gist.github.com/RuedigerMoeller/4a6bf58275ec09277525

      Delete
    3. Intel Numbers:

      Intel CPU's

      moelrue@otd2039 ~]$ lscpu
      Architecture: x86_64
      CPU op-mode(s): 32-bit, 64-bit
      Byte Order: Little Endian
      CPU(s): 12
      On-line CPU(s) list: 0-11
      Thread(s) per core: 1
      Core(s) per socket: 6
      Socket(s): 2
      NUMA node(s): 2
      Vendor ID: GenuineIntel
      CPU family: 6
      Model: 44
      Stepping: 2
      CPU MHz: 3067.058
      BogoMIPS: 6133.20
      Virtualization: VT-x
      L1d cache: 32K
      L1i cache: 32K
      L2 cache: 256K
      L3 cache: 12288K
      NUMA node0 CPU(s): 1,3,5,7,9,11
      NUMA node1 CPU(s): 0,2,4,6,8,10


      Akka:
      average 1 threads : 1019
      average 2 threads : 605
      average 3 threads : 507
      average 4 threads : 454
      average 5 threads : 907
      average 6 threads : 1517
      average 7 threads : 1536
      average 8 threads : 1599

      Kontraktor:
      average 1 threads : 979
      average 2 threads : 508
      average 3 threads : 359
      average 4 threads : 301
      average 5 threads : 256
      average 6 threads : 310
      average 7 threads : 325
      average 8 threads : 315

      Disruptor2:
      1: 676
      2: 339
      3: 230
      4: 173
      5: 140
      6: 125
      7: 119
      8: 119

      CustomExecutorPI:
      average 1 threads : 856
      average 2 threads : 749
      average 3 threads : 460
      average 4 threads : 390
      average 5 threads : 346
      average 6 threads : 370
      average 7 threads : 533
      average 8 threads : 1086

      CustomExecutorStripedPI:
      average 1 threads : 856
      average 2 threads : 596
      average 3 threads : 410
      average 4 threads : 326
      average 5 threads : 322
      average 6 threads : 344
      average 7 threads : 508
      average 8 threads : 856



      Delete