Monday, 29 September 2014

JcTools based queues

Improving further

Following on my previous post in which I investigated the performance benefits in writing a custom executor as compared to OOTB -- comments from both Nitsan and Rudiger suggest I should try the JCTools implementation for SPMC queues (Single producer multiple consumer).

As simple as importing the jctools JARs from Maven, and a couple of lines to change the Queue implementation and away we go ....

    private final Queue<Runnable> queue = new SpmcArrayQueue((int) Math.pow(2, 18));

Without much further ado, here are the results. I've re-executed all of the numbers so they may be slightly different.



(Updated with correct 256K elements - most notable difference is that JCTools does not suffer as much from contention at ~8 threads).

LinkedTransferQueue is still an excellent candidate

I did notice that LinkedTransferQueue is still looking very respectable, so as an OOTB JVM queue it's definitely worth considering. Based on observing the system during the run, it does have a relatively high garbage cost. But, if you have memory to burn it is a very good candidate to test. In fact, it seems like the LTQ would perform much better if it didn't cause so much GC work. This is impressive given it is MPMC capable.

Actually, what I do notice when running on the LTQ is that the GC pause times recorded by the JVM are significantly longer (up to 1 sec). This is interesting and I suspect it's a result of the unbounded queue. A few combinations of capped queue lengths showed significantly reduced GC times based on the logs (<10millisec compared to 1sec), however no overall difference in runtime. I suspect there is more lurking in the dark here, for another day.

And custom is still better

The jctools SPMC queue does in fact perform significantly quicker (4-16%) than the other queues tested. As contention increased, it actually performed much better. Looks like Nitsan's work has paid off. See his video here (though I'm yet to get a chance to see it).

Perhaps it is a little like cheating, and we should pass Runnables to the Disruptor instead, but the Disruptor as written is still ~2x as fast as pushing this through than using an Executor.


Sunday, 21 September 2014

Writing a non-blocking Executor

One of the blogs I follow is Rudiger's blog - in particular this article with a light benchmark I find interesting  as it shows the clear performance advantage of the Disruptor, and also just how slow the out of the box services can be for particular use cases.

In the article, Rudiger tests out a number of different implementations of a pi calculator, and finds quickly that 1million small tasks is a nice way to put enough pressure on the concurrency frameworks to split out a winner. The out of the box java Executor service is actually relatively slow, coming in last position.

One of the items I have on my to-do list is actually to build a fast ExecutorService implementation, and fortunately after a busy few months I've had a chance to sit down and write it. For those new to concurrency on Java, the ExecutorService interface and the Executors factory methods provide an easy way to get into launching threads and handling off composable tasks to other threads for asynchronous work.

The work below borrows heavily from Rudiger's work but also the work at 1024cores.net, and of course I can't neglect to mention Nitsan Wakart's blog and Martin Thompson's numerous talks on the topic.

The target: Get a working ExecutorService that can be used in the Pi calculator and get a result that is better than out-of-the-box.

Getting a baseline

Of course, since my target is to beat the out-of-the-box frameworks, I spent some time downloading the gists and hooking up the libs in maven. Only thing is I didn't hook up the Abstraktor tests as I am not too familiar with the project (happy to take a pull request).

Interestingly, the out of the box services perform pretty much as described in Rudiger's post, with the caveat that disruptor services have a much sharper degradation in performance on this run. Worth mentioning is that this is only a 2-socket 8-core E5410's (the old Harpertown series), so not nearly as new as Rudiger's kit.

First cut

I've heard good things about the LinkedTransferQueue but not had a need to test it out. As a first cut, I decided to write up a simple Executor implementation that as part of the submit() call.

    private static class LtqExecutor<T> implements ExecutorService {

        private volatile boolean running;
        private volatile boolean stopped;
        private final int threadCount;
        private final BlockingQueue<Runnable> queue;
        private final Thread[] threads;

        public LtqExecutor(int threadCount) {
            this.threadCount = threadCount;
            this.queue = new LinkedTransferQueue();
            running = true;
            stopped = false;
            threads = new Thread[threadCount];
            for(int i = 0; i < threadCount; i++) {
                threads[i] = new Thread(new Worker());
                threads[i].start();
            }
        }

        public void execute(Runnable runnable) {
            try {
                queue.put(runnable);
            } catch (InterruptedException ie) {
                throw new RuntimeException(ie);
            }
        }

        private class Worker implements Runnable {
            public void run() {
                while(running) {
                    Runnable runnable = null;
                    try {
                        runnable = queue.take();
                    } catch (InterruptedException ie) {
                        // was interrupted - just go round the loop again,
                        // if running = false will cause it to exit
                    }
                    try {
                        if (runnable != null) { 
                            runnable.run();
                        }
                    } catch (Exception e) {
                        System.out.println("failed because of: " + e.toString());
                        e.printStackTrace();
                    }
                }
            }
        }

I also removed the test for getQueue.size(). Actually, performance of this was rather impressive on its own, achieving results in 1/3rd the time of the ootb services, while of course it comes nowhere near Disruptor.

threads disruptor2 threadpi custom / LTQ
1 954 1455 1111
2 498 1473 1318
3 327 2108 802
4 251 2135 762
5 203 2195 643
6 170 2167 581
7 7502 2259 653
8 9469 2444 5238

Trying some things

ArrayBlockingQueue and SynchronousQueue

I also tried substituting ABQ and SQ instead of LinkedTransferQueue, since my understanding is LTQ has some additional unnecessary overheads. Perhaps ABQ and SQ would be a little more cache friendly since ABQ is possibly better laid out in memory, and SQ as I've used it implementations before.

threads disruptor2 threadpi custom / LTQ custom / SQ custom / ABQ 1000
1 954 1455 1111 3738 2570
2 498 1473 1318 2638 2570
3 327 2108 802 3925 1875
4 251 2135 762 4035 1792
5 203 2195 643 4740 1692
6 170 2167 581 4635 1684
7 7502 2259 653 4362 1668
8 9469 2444 5238 4566 1677

Ouch. No need to mention these again.

Striping the LTQ

Clearly Disruptor's approach of limiting contended writes has a benefit. Would the LinkedTransferQueue get a little faster by striping the input queues - one per thread?

In this model, instead of having one queue into the Executor, each thread has its own thread, and the put() method round-robins across the threads.

Actually, this was a bit better for some workloads but with much faster degradation.

threads disruptor2 threadpi custom / LTQ Custom / striped LTQ
1 954 1455 1111 1237
2 498 1473 1318 1127
3 327 2108 802 713
4 251 2135 762 570
5 203 2195 643 498
6 170 2167 581 386
7 7502 2259 653 5264
8 9469 2444 5238 5262

Customising a queue

A ringbuffer based queue

With a baseline set, I knew from reading Nitsan's many posts and the 1024cores.net site, I new there were some optimisations I could get from the problem space. What about using a queue with a ringbuffer?

By introducing a custom lock-free queue and substituting it in my Executor logic, using a put().

            public void put(Runnable r) throws InterruptedException {
                while (true) {
                    long toWrite = nextSlotToWrite.get();
                    if (toWrite < (lastConsumed.get() + capacity)) {
                        buffer[(int) (toWrite % capacity)] = r;
                        nextSlotToWrite.incrementAndGet();
                        break;
                    } else {
                        // no space in queue, sleep
                        LockSupport.parkNanos(1);
                    }
                }
            }

While the take() looks like this.

            public Runnable take() throws InterruptedException {
                while(!Thread.interrupted()) {
                    long lastFilled = nextSlotToWrite.get() - 1;
                    long lastConsumedCache = lastConsumed.get();
                    if (lastConsumedCache < lastFilled) {
                        // fetch the value from buffer first, to avoid a wrap race
                        int bufferslot = (int) ((lastConsumedCache + 1) % capacity);
                        Runnable r = buffer[bufferslot];
                        // attempt to cas
                        boolean casWon = lastConsumed.compareAndSet(
                            lastConsumedCache, lastConsumedCache + 1); 
                        if (casWon) {
                            return r;
                        } else {
                        }
                    } else {
                        // cas failed, do not sleep
                    }
                }
                throw new InterruptedException();
            }

Can we go faster by SPMC?

In particular, for our case, there is only ever a single writer thread issuing put()s to the queue. Can we take advantage of this by tweaking making the queue an SPMC queue (single producer many consumer)?

            public void put(Runnable r) throws InterruptedException {
                while (true) {
                    if (toWrite < (lastConsumed.get() + capacity)) {
                        buffer[(int) (toWrite & REMAINDER_MASK)] = r;
                        toWrite += 1;
                        nextSlotToWrite.lazySet(toWrite);
                        break;
                    } else {
                        // no space in queue, sleep
                        LockSupport.parkNanos(1);
                    }
                }
            }

Note that I use a lazySet() to write, but this doesn't help, I'm speculating that it's really the .get()'s in the below that are causing the issues.

            public Runnable take() throws InterruptedException {
                while(!Thread.interrupted()) {
                    long lastFilled = nextSlotToWrite.get() - 1;
                    long lastConsumedCache = lastConsumed.get();
                    if (lastConsumedCache < lastFilled) {
                        // fetch the value from buffer first, to avoid a wrap race
                        int bufferslot = (int) ((lastConsumedCache + 1) & REMAINDER_MASK);
                        Runnable r = buffer[bufferslot];
                        // attempt to cas
                        boolean casWon = lastConsumed.compareAndSet(
                            lastConsumedCache, lastConsumedCache + 1); 
                        if (casWon) {
                            return r;
                        } else {
                            // if cas failed, then we are not the owner so try again
                        }
                    } else {
                        // no values available to take, wait a while
                        LockSupport.parkNanos(1);
                    }
                }
                throw new InterruptedException();
            }

By this point I've also started to use 2^ sized queues (following the disruptor guidance on fast modulo). Actually, this didn't make a lot of difference but I've left it in.

Caching the get()s

How can I reduce contention when things are busy? Having a look at the take() method we can see that there are two .get() calls to atomic fields, this will be causing volatile reads from memory. On CAS failure, it's not always the case that we need to re-read the variable. So what happens if we move that to only happen during the "things are too busy" loop?

            public Runnable take() throws InterruptedException {
                long lastFilled = nextSlotToWrite.get() - 1;
                long lastConsumedCache = lastConsumed.get();
                while(!Thread.interrupted()) {
                    if (lastConsumedCache < lastFilled) {
                        // fetch the value from buffer first, to avoid a wrap race
                        int bufferslot = (int) ((lastConsumedCache + 1) & REMAINDER_MASK);
                        Runnable r = buffer[bufferslot];
                        // attempt to cas
                        boolean casWon = lastConsumed.compareAndSet(
                            lastConsumedCache, lastConsumedCache + 1);
                        if (casWon) {
                            return r;
                        } else {
                            // if cas failed, then we are not the owner so try again
                            // LockSupport.parkNanos(1);

                            // note that the lastFilled does not need to change just yet
                            // but probably the lastConsumed did
                            lastConsumedCache = lastConsumed.get();
                        }
                    } else {
                        // no values available to take, wait a while
                        LockSupport.parkNanos(1);
                        lastFilled = nextSlotToWrite.get() - 1;
                        // lastConsumedCache = lastConsumed.get();
                    }
                }
                throw new InterruptedException();
            }
        }

threads akkapi disruptor disruptor2 threadpi custom / LTQ Custom Lockfree Pp Tp Padded Custom with cached gets
1 2389 1300 954 1455 1111 1245 1257
2 1477 713 498 1473 1318 992 1007
3 1266 492 327 2108 802 676 695
4 1629 423 251 2135 762 548 559
5 1528 400 203 2195 643 538 515
6 1541 398 170 2167 581 545 510
7 1482 1085 7502 2259 653 603 562
8 1397 1297 9469 2444 5238 925 883

Not a lot of difference, but it does help our under-contention numbers, which is exactly what we'd expect.

The results

Actually, the results were good, I was pretty happy with them. They're a small fraction faster than the LinkedTransferQueue with what seems like better worst case performance. Nowhere near as fast as the Disruptor but frankly that's not surprising, given the energy and talent working on it.


Of course, this is not Production ready, and YMMV but I would welcome comments and questions. Particularly if there are any issues with correctness.

Code samples on github.

Thursday, 6 March 2014

Chasing pointers with scissors .. or something

Nitsan W has a great article on the dangers of using on heap pointers within the JVM. In fact Gil has a classically clear post on the same topic.

Try as I might though, I couldn't find someone who had posted a concrete example. Especially since rumours seem to abound that it's a possibility, it seemed a little bizarre that there were plenty of examples of "how to access unsafe" but none on "what not to do".

So I thought, it might be fun to put together a demo to show just how quickly this can cause problems. It's a contrived example, of course, so in the real world I'm sure it will take longer to show up, and will probably be more random.

The code is pretty simple, and self explanatory (I guess). Review the code and then run the example.

Just so:

[jason@freebsd01 ~/code/examples/pointers]$ mvn clean compile exec:exec
<< the usual maven build things >>
[INFO][INFO] --- exec-maven-plugin:1.1:exec (default-cli) @ pointers ---
[INFO] [GC 2621K->1388K(502464K), 0.0042790 secs]
<< lots of GC while the vm starts up >>
<< and now we get down to business >>
[INFO] initialising array
[INFO] address: 34913266488
[INFO] native address: 34913266488
[INFO] checking array:
[INFO] seems ok
[INFO] seems ok
<< carries on for a while >>
[INFO] seems ok
[INFO] seems ok
[INFO] seems ok
[INFO] making garbage
[INFO] seems ok
[INFO] seems ok
[INFO] seems ok
[INFO] seems ok
[INFO] seems ok
[INFO] [GC 132255K->129303K(502464K), 0.3024900 secs]
[INFO] seems ok
[INFO] oops! expected: 0; but found: 109260096
[INFO] inserted to map -> 1013558; map size ended at -> 950748
[INFO] garbage generator finished
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 14.938s
[INFO] Finished at: Thu Mar 06 22:29:41 EST 2014
[INFO] Final Memory: 10M/151M
[INFO] ------------------------------------------------------------------------


Build success!

Tuesday, 25 February 2014

Updating the Disruptor sample

Quick update to Disruptor sample

As a comment was posted on the previous blog asking about comparisons to the juc Executor implementations, I thought it was time to refresh the examples a little and update that with some additional examples and in particular a comparison to a thread pooled implementation (since that was the question).

The code simply does 10000 UDP "echo" packets, the service is expected to uppercase the characters. What I want to look at here is latency while bringing in an additional test to the mix.

This test is a pretty straightforward test - just looking to identify 99th percentile latency across a couple of scenarios. Since these are simple applications I've included a baseline blocking IO implementation to get a comparison.

Total runtime tests

The initial test is a total runtime test. This is a naive indicative number of relative performance. Test runs a number of 10K UDP request-response loops.



A single thread shows that the blocking IO send/receive loop is quicker than the Disruptor implementation, which is quicker than the threadpool. But - one issue; after more than one thread, the results are almost identical? What's going on here?

My initial assumption is that the log to file was causing issues as a cap on throughput, so let's remove it.

Latency with logging disabled


As you can see, similar results apply. Disruptor is only slightly slower than a blocking loop in the sleeping view. The threadpool is a bit further away but still close behind. Disruptor with busy spin waiting strategy - and the non blocking IO in a tight loop - results in the best latency. I think this requires a bit more investigation than I have time to understand but I would welcome comments.

Key observations

Suffice to say, similar key points come up as normally do:
  • A single thread processing is often the quickest approach. Avoid threads until there is a clear reason. Not only that, but the code is significantly shorter and simpler.
  • Disruptor is very quick for handoff between threads. In fact, the message passes between three threads nearly as quick as the single threaded code.
  • Disruptor response times are very sensitive to the choice of blocking strategy. By choosing the wrong strategy, you will burn 100pc CPU across multiple cores while getting worse latency.
  • Threadpool is "quick enough" for many cases. Unless your architecture and design is already very efficient, you're unlikely to notice magical performance gains without significant tuning to other areas of the codebase. It's probable that other areas of the codebase need optimisations first.

The test code and bench

I've tried to write the code in a fairly "natural" style for each approach, so the comparisons are not strictly apples to apples - I think they are more indicative of real-world usage. For example, the creation of a dedicated outbound socket for the threadpool approach is used, where the single threaded and disruptor code can quite easily create a single outbound UDP socket to transmit all responses. I'd be happy to take pull requests.

Beware when you are running the samples that you will need a quiet network to avoid any dropped UDP packets.

Running the client -> mvn clean package ; java -jar target/disruptor-1.0-SNAPSHOT.jar <<serveripaddress>> <<numclients>>
Running the server -> mvn exec:java -Dbio (or -Dthreadpool or -Ddisruptor), will open UDP port 9999.

The code.