A fully functioning Disruptor 3.x example
The Disruptor, helpfully open sourced by LMAX, has been a real eye opener for me in terms of what can be achieved in performance and efficiency from a multi threaded application. By using a ring buffer and careful use of the single writer principle, they create some extremely low latency and high throughput applications.
This is an example of a service which accepts packet requests over UDP, and sends a UDP packet back to the source with the text capitalised using the Java String.toUpperCase method.
Credits to Peter Lawrey as I've copied the technique for computing metrics percentiles:
https://code.google.com/p/core-java-performance-examples/source/browse/trunk/src/test/java/com/google/code/java/core/socket/PingTest.java
One of the things missing from the Disruptor package is a set of useful working examples. Unfortunately the examples posted on Disruptor Wiki are targeted for the 2.x series. The API has had a few changes in the 3.x series, although things have not fundamentally changed.
This is an example of a service which accepts packet requests over UDP, and sends a UDP packet back to the source with the text capitalised using the Java String.toUpperCase method.
Credits to Peter Lawrey as I've copied the technique for computing metrics percentiles:
https://code.google.com/p/core-java-performance-examples/source/browse/trunk/src/test/java/com/google/code/java/core/socket/PingTest.java
Design
An annotated picture shows the event flow.
The only thing specific to the problem of uppercase-ing the input is the BusinessLogicHandler. You could replace this with anything.
This example is enough to get started. Of course, in a Production system, you need to do a lot more around exception handling, pinning threads, service request logging, etc.
The only thing specific to the problem of uppercase-ing the input is the BusinessLogicHandler. You could replace this with anything.
This example is enough to get started. Of course, in a Production system, you need to do a lot more around exception handling, pinning threads, service request logging, etc.
The only item not shown here is the DatagramEvent - this is the value event to store entries in the ring buffer. In other words, each slot in the ring buffer is one DatagramEvent.
Results
Tested on a 2x Xeon E5410 (8 cores). No process-core pinning etc. Query client runs on same machine, runs N threads, each using Java NIO in blocking mode to send a request and wait for a response.
I've compared the results with a standard blocking IO server. The blocking IO server runs with one "acceptor" thread and three "handler" threads. I tried various combinations of Old IO & Blocking NIO and this was the best I could get. Measurements include the client time to send and receive.
Disruptor was configured to use a SleepingWaitStrategy, which uses 4-6% per core but still shows excellent latency.
A couple of observations:
Disruptor was configured to use a SleepingWaitStrategy, which uses 4-6% per core but still shows excellent latency.
A couple of observations:
- There is hardly any business logic here [uppercase a string], so the difference in performance is limited to thread hand-off effectiveness. The results are very noticeable.
- The 99pc and 50pc are very consistent for Disruptor, which implies low variation in response times (ie: consistently low latency).
Also, given this is an 8-core server, running more than a few client threads is going to result in some contention. Above 4 threads, you can see the effect of queueing in Disruptor (with 99pc increasing significantly).
Other observations / future work
- java.io.DatagramSocket is synchronised, so if you are using "old" java IO, best to create a socket per thread.
- Given threads are are able to work totally independently, the threaded approach should theoretically be better latency (since it can handle concurrency easier). It would be interesting to include some access to a shared resource, such as incrementing a shared counter, to compare a more realistic workload.
- I'd also like to test this across multiple nodes across network to push the system a little harder.
- Interestingly, in this case, Sleeping Wait Strategy consistently showed better latency than BusySpin, I suspect this is due to thread migrations on linux scheduler but have not investigated.
UDP Echo Service code
Github link
Quick note on Scala - this is not functional styled Scala, I'm using Scala here as a scriptable Java. Feel free to provide feedback though if you think it can make the demos a bit more clear.
Quick note on Scala - this is not functional styled Scala, I'm using Scala here as a scriptable Java. Feel free to provide feedback though if you think it can make the demos a bit more clear.
// DatagramEvent & DatagramFactory
/**
* Datagram value event
* This is the value event we will use to pass data between threads
*/
class DatagramEvent(var buffer:Array[Byte], var address:SocketAddress,
var length:Int)
var length:Int)
/**
* Creates instances of DatagramEvent for the ring buffer
*/
object DatagramFactory extends EventFactory[DatagramEvent] {
def newInstance = new DatagramEvent(new Array[Byte](BYTE_ARRAY_SIZE), null, 0)
}
// Main channel receive loop (includes initialisation code)
/**
* Server thread - event loop
*
* We took a shortcut in the EventTranslator, since it does a blocking read, then
* all the server thread needs to do is request a new publish. The disruptor will
* request the translator to fill next event, and translator can write direct to
* the buffer. This would not work if we had multiple publishers, since the
* translator is effectively blocking the sequence from progressing while it
* is blocked in translateTo().
* request the translator to fill next event, and translator can write direct to
* the buffer. This would not work if we had multiple publishers, since the
* translator is effectively blocking the sequence from progressing while it
* is blocked in translateTo().
*
* The "in" disruptor is responsible for reading from UDP, doing the upper
* case transform (the business logic), and then copying the packet to
* the "out" disruptor"
* case transform (the business logic), and then copying the packet to
* the "out" disruptor"
*/
val RING_SIZE:Int = 1*1024
val BYTE_ARRAY_SIZE:Int = 1*1024
{
// must have at least as many server threads available for each
// disruptor as there are handlers
// disruptor as there are handlers
val inExecutor = Executors.newFixedThreadPool(1)
val outExecutor = Executors.newFixedThreadPool(1)
// each disruptor is basically the same
// each disruptor is basically the same
val disruptorIn = new Disruptor[DatagramEvent](
DatagramFactory, RING_SIZE, inExecutor, ProducerType.SINGLE,
new SleepingWaitStrategy)
new SleepingWaitStrategy)
val disruptorOut = new Disruptor[DatagramEvent](
DatagramFactory, RING_SIZE, outExecutor, ProducerType.SINGLE,
new SleepingWaitStrategy)
// disruptor handle the events with the provided handlers
new SleepingWaitStrategy)
// disruptor handle the events with the provided handlers
disruptorOut.handleEventsWith(new DatagramSendHandler)
disruptorOut.start
disruptorIn.handleEventsWith(new BusinessLogicHandler(disruptorOut))
disruptorIn.start
// socket receive resources
val buffer = ByteBuffer.allocateDirect(BYTE_ARRAY_SIZE)
val channel = DatagramChannel.open
channel.socket.bind(new InetSocketAddress(9999))
channel.configureBlocking(true)
// core event loop
while(true) {
val socket = channel.receive(buffer)
buffer.flip
// use an anonymous inner class to implement EventTranslator
// otherwise we need to do a double copy; once out of the DBB to
// a byte[], then from a byte[] to a byte[]
// a byte[], then from a byte[] to a byte[]
disruptorIn.publishEvent(new EventTranslator[DatagramEvent] {
def translateTo(event:DatagramEvent,sequence:Long) {
event.length = buffer.remaining
buffer.get(event.buffer, 0, buffer.remaining)
event.address = socket
}
})
}
}
// Business Logic Handler [this is what you replace with your own logic]
/**
* Business logic goes here
*/
class BusinessLogicHandler(val output:Disruptor[DatagramEvent])
extends EventHandler[DatagramEvent] {
extends EventHandler[DatagramEvent] {
def onEvent(event:DatagramEvent, sequence:Long, endOfBatch:Boolean) {
if (event.address != null) {
// do the upper case work
val toSendBytes = new String(
event.buffer, 0, event.length).toUpperCase.getBytes
event.buffer, 0, event.length).toUpperCase.getBytes
// then publish
output.publishEvent(new ByteToDatagramEventTranslator(
toSendBytes, event.address))
toSendBytes, event.address))
}
}
}
// ByteToDatagramEventTranslator
/**
* Pushes an output byte[] and address onto the given DatagramEvent
*/
class ByteToDatagramEventTranslator(val bytes:Array[Byte],
val address:SocketAddress) extends EventTranslator[DatagramEvent] {
val address:SocketAddress) extends EventTranslator[DatagramEvent] {
def translateTo(event:DatagramEvent, sequence:Long) {
event.address = this.address
event.length = bytes.length
System.arraycopy(bytes, 0, event.buffer, 0, bytes.length)
}
}
// DatagramSendHandler
/**
* Sends the datagrams to the endpoints
*
* Don't bother with endOfBatch, because it's assumed each udp
* packet goes to a different address
* packet goes to a different address
*/
class DatagramSendHandler() extends EventHandler[DatagramEvent] {
// private channel for the outbound packets
val channel = DatagramChannel.open
// this buffer is reused to send each event
val buffer = ByteBuffer.allocateDirect(BYTE_ARRAY_SIZE)
def onEvent(event:DatagramEvent, sequence:Long, endOfBatch:Boolean) {
if (event.address != null) {
// copy from the byte[] into the buffer
buffer.put(event.buffer, 0, event.length)
buffer.flip
channel.send(buffer, event.address)
// assume the buffer isn't needed again, so clear it
buffer.clear
}
}
}
You mentioned "...Disruptor was configured to use a SleepingWaitStrategy, which uses 4-6% per core but still shows excellent latency."
ReplyDeleteHow did you measure the per core percentage ?
This is a very nice hello world for Disruptor 3.0 indeed.
Hi Nitin - start the Disruptor on an otherwise idle system and monitor process/CPU utilisation. Not very formal method. System is a pretty standard Debian Xeon Dell 1950.
ReplyDeleteHave you compared the results with the traditional way of using ThreadPoolExecutor? Did you see noticeable difference?
ReplyDeleteHi Ram - short answer is no, I haven't, that wasn't really the purpose here. I might spin it up if I get a chance this week, although I would need to rerun it against different OS combo.
ReplyDeleteHi Ram
ReplyDeleteI ran a quick shot with executor and posted here:
http://fasterjava.blogspot.com.au/2014/02/updating-disruptor-sample.html
What use case were you comparing TPE with Disruptor for?
I think the key point is that Disruptor and TPE are really for different uses. Disruptor is almost more like a SEDA middleware for some cases with fixed flow graphs which gets great performance due to batching, TPE is a nice general abstraction over units of work.