The
Disruptor, helpfully open sourced by LMAX, has been a real eye opener for me in terms of what can be achieved in performance and efficiency from a multi threaded application. By using a ring buffer and careful use of the single writer principle, they create some extremely low latency and high throughput applications.
One of the things missing from the Disruptor package is a set of useful working examples. Unfortunately the
examples posted on Disruptor Wiki are targeted for the 2.x series. The API has had a few changes in the 3.x series, although things have not fundamentally changed.
Design
An annotated picture shows the event flow.
The only thing specific to the problem of uppercase-ing the input is the BusinessLogicHandler. You could replace this with anything.
This example is enough to get started. Of course, in a Production system, you need to do a lot more around exception handling, pinning threads, service request logging, etc.
The only item not shown here is the DatagramEvent - this is the value event to store entries in the ring buffer. In other words, each slot in the ring buffer is one DatagramEvent.
Results
Tested on a 2x Xeon E5410 (8 cores). No process-core pinning etc.
Query client runs on same machine, runs N threads, each using Java NIO in blocking mode to send a request and wait for a response.
I've compared the results with a standard blocking IO server. The blocking IO server runs with one "acceptor" thread and three "handler" threads. I tried various combinations of Old IO & Blocking NIO and this was the best I could get. Measurements
include the client time to send and receive.
Disruptor was configured to use a SleepingWaitStrategy, which uses 4-6% per core but still shows excellent latency.
A couple of observations:
- There is hardly any business logic here [uppercase a string], so the difference in performance is limited to thread hand-off effectiveness. The results are very noticeable.
- The 99pc and 50pc are very consistent for Disruptor, which implies low variation in response times (ie: consistently low latency).
Also, given this is an 8-core server, running more than a few client threads is going to result in some contention. Above 4 threads, you can see the effect of queueing in Disruptor (with 99pc increasing significantly).
Other observations / future work
- java.io.DatagramSocket is synchronised, so if you are using "old" java IO, best to create a socket per thread.
- Given threads are are able to work totally independently, the threaded approach should theoretically be better latency (since it can handle concurrency easier). It would be interesting to include some access to a shared resource, such as incrementing a shared counter, to compare a more realistic workload.
- I'd also like to test this across multiple nodes across network to push the system a little harder.
- Interestingly, in this case, Sleeping Wait Strategy consistently showed better latency than BusySpin, I suspect this is due to thread migrations on linux scheduler but have not investigated.
UDP Echo Service code
// DatagramEvent & DatagramFactory
/**
* Datagram value event
* This is the value event we will use to pass data between threads
*/
class DatagramEvent(var buffer:Array[Byte], var address:SocketAddress,
var length:Int)
/**
* Creates instances of DatagramEvent for the ring buffer
*/
object DatagramFactory extends EventFactory[DatagramEvent] {
def newInstance = new DatagramEvent(new Array[Byte](BYTE_ARRAY_SIZE), null, 0)
}
// Main channel receive loop (includes initialisation code)
/**
* Server thread - event loop
*
* We took a shortcut in the EventTranslator, since it does a blocking read, then
* all the server thread needs to do is request a new publish. The disruptor will
* request the translator to fill next event, and translator can write direct to
* the buffer. This would not work if we had multiple publishers, since the
* translator is effectively blocking the sequence from progressing while it
* is blocked in translateTo().
*
* The "in" disruptor is responsible for reading from UDP, doing the upper
* case transform (the business logic), and then copying the packet to
* the "out" disruptor"
*/
val RING_SIZE:Int = 1*1024
val BYTE_ARRAY_SIZE:Int = 1*1024
{
// must have at least as many server threads available for each
// disruptor as there are handlers
val inExecutor = Executors.newFixedThreadPool(1)
val outExecutor = Executors.newFixedThreadPool(1)
// each disruptor is basically the same
val disruptorIn = new Disruptor[DatagramEvent](
DatagramFactory, RING_SIZE, inExecutor, ProducerType.SINGLE,
new SleepingWaitStrategy)
val disruptorOut = new Disruptor[DatagramEvent](
DatagramFactory, RING_SIZE, outExecutor, ProducerType.SINGLE,
new SleepingWaitStrategy)
// disruptor handle the events with the provided handlers
disruptorOut.handleEventsWith(new DatagramSendHandler)
disruptorOut.start
disruptorIn.handleEventsWith(new BusinessLogicHandler(disruptorOut))
disruptorIn.start
// socket receive resources
val buffer = ByteBuffer.allocateDirect(BYTE_ARRAY_SIZE)
val channel = DatagramChannel.open
channel.socket.bind(new InetSocketAddress(9999))
channel.configureBlocking(true)
// core event loop
while(true) {
val socket = channel.receive(buffer)
buffer.flip
// use an anonymous inner class to implement EventTranslator
// otherwise we need to do a double copy; once out of the DBB to
// a byte[], then from a byte[] to a byte[]
disruptorIn.publishEvent(new EventTranslator[DatagramEvent] {
def translateTo(event:DatagramEvent,sequence:Long) {
event.length = buffer.remaining
buffer.get(event.buffer, 0, buffer.remaining)
event.address = socket
}
})
}
}
// Business Logic Handler [this is what you replace with your own logic]
/**
* Business logic goes here
*/
class BusinessLogicHandler(val output:Disruptor[DatagramEvent])
extends EventHandler[DatagramEvent] {
def onEvent(event:DatagramEvent, sequence:Long, endOfBatch:Boolean) {
if (event.address != null) {
// do the upper case work
val toSendBytes = new String(
event.buffer, 0, event.length).toUpperCase.getBytes
// then publish
output.publishEvent(new ByteToDatagramEventTranslator(
toSendBytes, event.address))
}
}
}
// ByteToDatagramEventTranslator
/**
* Pushes an output byte[] and address onto the given DatagramEvent
*/
class ByteToDatagramEventTranslator(val bytes:Array[Byte],
val address:SocketAddress) extends EventTranslator[DatagramEvent] {
def translateTo(event:DatagramEvent, sequence:Long) {
event.address = this.address
event.length = bytes.length
System.arraycopy(bytes, 0, event.buffer, 0, bytes.length)
}
}
// DatagramSendHandler
/**
* Sends the datagrams to the endpoints
*
* Don't bother with endOfBatch, because it's assumed each udp
* packet goes to a different address
*/
class DatagramSendHandler() extends EventHandler[DatagramEvent] {
// private channel for the outbound packets
val channel = DatagramChannel.open
// this buffer is reused to send each event
val buffer = ByteBuffer.allocateDirect(BYTE_ARRAY_SIZE)
def onEvent(event:DatagramEvent, sequence:Long, endOfBatch:Boolean) {
if (event.address != null) {
// copy from the byte[] into the buffer
buffer.put(event.buffer, 0, event.length)
buffer.flip
channel.send(buffer, event.address)
// assume the buffer isn't needed again, so clear it
buffer.clear
}
}
}