Saturday, 6 April 2013

Disruptor example - UDP "echo" service, with capitalisation

A fully functioning Disruptor 3.x example

The Disruptor, helpfully open sourced by LMAX, has been a real eye opener for me in terms of what can be achieved in performance and efficiency from a multi threaded application. By using a ring buffer and careful use of the single writer principle, they create some extremely low latency and high throughput applications.

One of the things missing from the Disruptor package is a set of useful working examples. Unfortunately the examples posted on Disruptor Wiki are targeted for the 2.x series. The API has had a few changes in the 3.x series, although things have not fundamentally changed.

This is an example of a service which accepts packet requests over UDP, and sends a UDP packet back to the source with the text capitalised using the Java String.toUpperCase method.

Credits to Peter Lawrey as I've copied the technique for computing metrics percentiles:
https://code.google.com/p/core-java-performance-examples/source/browse/trunk/src/test/java/com/google/code/java/core/socket/PingTest.java

Design

An annotated picture shows the event flow.

The only thing specific to the problem of uppercase-ing the input is the BusinessLogicHandler. You could replace this with anything.

This example is enough to get started. Of course, in a Production system, you need to do a lot more around exception handling, pinning threads, service request logging, etc.



The only item not shown here is the DatagramEvent - this is the value event to store entries in the ring buffer. In other words, each slot in the ring buffer is one DatagramEvent.

Results

Tested on a 2x Xeon E5410 (8 cores). No process-core pinning etc. Query client runs on same machine, runs N threads, each using Java NIO in blocking mode to send a request and wait for a response. 

I've compared the results with a standard blocking IO server. The blocking IO server runs with one "acceptor" thread and three "handler" threads. I tried various combinations of Old IO & Blocking NIO and this was the best I could get. Measurements include the client time to send and receive.

Disruptor was configured to use a SleepingWaitStrategy, which uses 4-6% per core but still shows excellent latency.

A couple of observations:

  • There is hardly any business logic here [uppercase a string], so the difference in performance is limited to thread hand-off effectiveness. The results are very noticeable.
  • The 99pc and 50pc are very consistent for Disruptor, which implies low variation in response times (ie: consistently low latency).


Also, given this is an 8-core server, running more than a few client threads is going to result in some contention. Above 4 threads, you can see the effect of queueing in Disruptor (with 99pc increasing significantly).

Other observations / future work


  • java.io.DatagramSocket is synchronised, so if you are using "old" java IO, best to create a socket per thread.
  • Given threads are are able to work totally independently, the threaded approach should theoretically be better latency (since it can handle concurrency easier). It would be interesting to include some access to a shared resource, such as incrementing a shared counter, to compare a more realistic workload.
  • I'd also like to test this across multiple nodes across network to push the system a little harder.
  • Interestingly, in this case, Sleeping Wait Strategy consistently showed better latency than BusySpin, I suspect this is due to thread migrations on linux scheduler but have not investigated.

UDP Echo Service code

Github link

Quick note on Scala - this is not functional styled Scala, I'm using Scala here as a scriptable Java. Feel free to provide feedback though if you think it can make the demos a bit more clear.

// DatagramEvent & DatagramFactory

/**
 * Datagram value event
 * This is the value event we will use to pass data between threads
 */
class DatagramEvent(var buffer:Array[Byte], var address:SocketAddress, 
  var length:Int)

/**
 * Creates instances of DatagramEvent for the ring buffer
 */
object DatagramFactory extends EventFactory[DatagramEvent] {
  def newInstance = new DatagramEvent(new Array[Byte](BYTE_ARRAY_SIZE), null, 0)
}


// Main channel receive loop (includes initialisation code)

/**
 * Server thread - event loop
 *
 * We took a shortcut in the EventTranslator, since it does a blocking read, then
 * all the server thread needs to do is request a new publish. The disruptor will
 * request the translator to fill next event, and translator can write direct to
 * the buffer. This would not work if we had multiple publishers, since the
 * translator is effectively blocking the sequence from progressing while it 
 * is blocked in translateTo().
 *
 * The "in" disruptor is responsible for reading from UDP, doing the upper
 * case transform (the business logic), and then copying the packet to
 * the "out" disruptor"
 */
val RING_SIZE:Int = 1*1024
val BYTE_ARRAY_SIZE:Int = 1*1024

{
  // must have at least as many server threads available for each
  // disruptor as there are handlers
  val inExecutor = Executors.newFixedThreadPool(1)
  val outExecutor = Executors.newFixedThreadPool(1)
  // each disruptor is basically the same
  val disruptorIn = new Disruptor[DatagramEvent](
    DatagramFactory, RING_SIZE, inExecutor, ProducerType.SINGLE,
    new SleepingWaitStrategy)
  val disruptorOut = new Disruptor[DatagramEvent](
    DatagramFactory, RING_SIZE, outExecutor, ProducerType.SINGLE,
    new SleepingWaitStrategy)
  // disruptor handle the events with the provided handlers
  disruptorOut.handleEventsWith(new DatagramSendHandler)
  disruptorOut.start
  disruptorIn.handleEventsWith(new BusinessLogicHandler(disruptorOut))
  disruptorIn.start

  // socket receive resources
  val buffer = ByteBuffer.allocateDirect(BYTE_ARRAY_SIZE)
  val channel = DatagramChannel.open
  channel.socket.bind(new InetSocketAddress(9999))
  channel.configureBlocking(true)

  // core event loop
  while(true) {
    val socket = channel.receive(buffer)
    buffer.flip

    // use an anonymous inner class to implement EventTranslator
    // otherwise we need to do a double copy; once out of the DBB to
    // a byte[], then from a byte[] to a byte[]
    disruptorIn.publishEvent(new EventTranslator[DatagramEvent] {
      def translateTo(event:DatagramEvent,sequence:Long) {
        event.length = buffer.remaining
        buffer.get(event.buffer, 0, buffer.remaining)
        event.address = socket
      }
    })

  }
}

// Business Logic Handler [this is what you replace with your own logic]

/**
 * Business logic goes here
 */
class BusinessLogicHandler(val output:Disruptor[DatagramEvent])
  extends EventHandler[DatagramEvent] {
  def onEvent(event:DatagramEvent, sequence:Long, endOfBatch:Boolean) {
    if (event.address != null) {
      // do the upper case work
      val toSendBytes = new String(
        event.buffer, 0, event.length).toUpperCase.getBytes
      // then publish
      output.publishEvent(new ByteToDatagramEventTranslator(
        toSendBytes, event.address))
    }
  }
}

// ByteToDatagramEventTranslator

/**
 * Pushes an output byte[] and address onto the given DatagramEvent
 */
class ByteToDatagramEventTranslator(val bytes:Array[Byte],
  val address:SocketAddress) extends EventTranslator[DatagramEvent] {
  def translateTo(event:DatagramEvent, sequence:Long) {
    event.address = this.address
    event.length = bytes.length
    System.arraycopy(bytes, 0, event.buffer, 0, bytes.length)
  }
}


// DatagramSendHandler

/**
 * Sends the datagrams to the endpoints
 *
 * Don't bother with endOfBatch, because it's assumed each udp
 * packet goes to a different address
 */
class DatagramSendHandler() extends EventHandler[DatagramEvent] {
  // private channel for the outbound packets
  val channel = DatagramChannel.open
  // this buffer is reused to send each event
  val buffer = ByteBuffer.allocateDirect(BYTE_ARRAY_SIZE)
  def onEvent(event:DatagramEvent, sequence:Long, endOfBatch:Boolean) {
    if (event.address != null) {
      // copy from the byte[] into the buffer
      buffer.put(event.buffer, 0, event.length)
      buffer.flip
      channel.send(buffer, event.address)
      // assume the buffer isn't needed again, so clear it
      buffer.clear
    }
  }
}




5 comments:

  1. You mentioned "...Disruptor was configured to use a SleepingWaitStrategy, which uses 4-6% per core but still shows excellent latency."

    How did you measure the per core percentage ?

    This is a very nice hello world for Disruptor 3.0 indeed.

    ReplyDelete
  2. Hi Nitin - start the Disruptor on an otherwise idle system and monitor process/CPU utilisation. Not very formal method. System is a pretty standard Debian Xeon Dell 1950.

    ReplyDelete
  3. Have you compared the results with the traditional way of using ThreadPoolExecutor? Did you see noticeable difference?

    ReplyDelete
  4. Hi Ram - short answer is no, I haven't, that wasn't really the purpose here. I might spin it up if I get a chance this week, although I would need to rerun it against different OS combo.

    ReplyDelete
  5. Hi Ram
    I ran a quick shot with executor and posted here:
    http://fasterjava.blogspot.com.au/2014/02/updating-disruptor-sample.html

    What use case were you comparing TPE with Disruptor for?

    I think the key point is that Disruptor and TPE are really for different uses. Disruptor is almost more like a SEDA middleware for some cases with fixed flow graphs which gets great performance due to batching, TPE is a nice general abstraction over units of work.

    ReplyDelete