Monday, 8 April 2013

Oracle JDBC row prefetch driver tuning

defaultRowPrefetch - almost an easy win

If you work in a large enterprise context, sooner or later you will connect to an Oracle database.

I wanted to share a tip on tuning the Oracle row prefetch settings. The default prefetch value is 10, which means the JDBC client will do one round-trip to the database to fetch 10 rows, rather than round-trip to the DB for every row fetch. To reduce the round-trips over the network, you can increase the row prefetch, which is especially effective if you are extracting large row volumes.

This is a common trick provided to increase JDBC performance especially where there will be a large ResultSet returned. The row prefetch can be set in two ways -
  • On the Statement object, call the setFetchSize before executing a query. This works if you are using JDBC directly.
  • Alternatively, you can set a default fetch size when the setFetchSize method cannot be called. On the connection properties object, use the defaultRowPrefetch value and set it to an int.

So an easy way to automatically improve your application performance is to increase the fetch size. Great.

Except for the caveat


Oracle JDBC client seems to pre-initialise some memory structures to hold the full prefetch size. So, if you set a prefetch size of 500, you allocate 50x as much memory than if you had prefetch size = 10. This is a huge extra demand on GC especially if you are not actually reading those rows. To think, you might be running a GC 50x more often than needed if you normally only fetch a few rows; this will be a big impact on your application responsiveness.

Recommendation


If possible, I recommend using the setFetchSize on a per-query basis. For example, if you know a particular query will only ever return a few rows, then set the fetch size to say 5. If you know a query will return 1000 rows, use a fetch size of 100.

As a heuristic, there are limited benefits from going over 50-100.

Saturday, 6 April 2013

Disruptor example - UDP "echo" service, with capitalisation

A fully functioning Disruptor 3.x example

The Disruptor, helpfully open sourced by LMAX, has been a real eye opener for me in terms of what can be achieved in performance and efficiency from a multi threaded application. By using a ring buffer and careful use of the single writer principle, they create some extremely low latency and high throughput applications.

One of the things missing from the Disruptor package is a set of useful working examples. Unfortunately the examples posted on Disruptor Wiki are targeted for the 2.x series. The API has had a few changes in the 3.x series, although things have not fundamentally changed.

This is an example of a service which accepts packet requests over UDP, and sends a UDP packet back to the source with the text capitalised using the Java String.toUpperCase method.

Credits to Peter Lawrey as I've copied the technique for computing metrics percentiles:
https://code.google.com/p/core-java-performance-examples/source/browse/trunk/src/test/java/com/google/code/java/core/socket/PingTest.java

Design

An annotated picture shows the event flow.

The only thing specific to the problem of uppercase-ing the input is the BusinessLogicHandler. You could replace this with anything.

This example is enough to get started. Of course, in a Production system, you need to do a lot more around exception handling, pinning threads, service request logging, etc.



The only item not shown here is the DatagramEvent - this is the value event to store entries in the ring buffer. In other words, each slot in the ring buffer is one DatagramEvent.

Results

Tested on a 2x Xeon E5410 (8 cores). No process-core pinning etc. Query client runs on same machine, runs N threads, each using Java NIO in blocking mode to send a request and wait for a response. 

I've compared the results with a standard blocking IO server. The blocking IO server runs with one "acceptor" thread and three "handler" threads. I tried various combinations of Old IO & Blocking NIO and this was the best I could get. Measurements include the client time to send and receive.

Disruptor was configured to use a SleepingWaitStrategy, which uses 4-6% per core but still shows excellent latency.

A couple of observations:

  • There is hardly any business logic here [uppercase a string], so the difference in performance is limited to thread hand-off effectiveness. The results are very noticeable.
  • The 99pc and 50pc are very consistent for Disruptor, which implies low variation in response times (ie: consistently low latency).


Also, given this is an 8-core server, running more than a few client threads is going to result in some contention. Above 4 threads, you can see the effect of queueing in Disruptor (with 99pc increasing significantly).

Other observations / future work


  • java.io.DatagramSocket is synchronised, so if you are using "old" java IO, best to create a socket per thread.
  • Given threads are are able to work totally independently, the threaded approach should theoretically be better latency (since it can handle concurrency easier). It would be interesting to include some access to a shared resource, such as incrementing a shared counter, to compare a more realistic workload.
  • I'd also like to test this across multiple nodes across network to push the system a little harder.
  • Interestingly, in this case, Sleeping Wait Strategy consistently showed better latency than BusySpin, I suspect this is due to thread migrations on linux scheduler but have not investigated.

UDP Echo Service code

Github link

Quick note on Scala - this is not functional styled Scala, I'm using Scala here as a scriptable Java. Feel free to provide feedback though if you think it can make the demos a bit more clear.

// DatagramEvent & DatagramFactory

/**
 * Datagram value event
 * This is the value event we will use to pass data between threads
 */
class DatagramEvent(var buffer:Array[Byte], var address:SocketAddress, 
  var length:Int)

/**
 * Creates instances of DatagramEvent for the ring buffer
 */
object DatagramFactory extends EventFactory[DatagramEvent] {
  def newInstance = new DatagramEvent(new Array[Byte](BYTE_ARRAY_SIZE), null, 0)
}


// Main channel receive loop (includes initialisation code)

/**
 * Server thread - event loop
 *
 * We took a shortcut in the EventTranslator, since it does a blocking read, then
 * all the server thread needs to do is request a new publish. The disruptor will
 * request the translator to fill next event, and translator can write direct to
 * the buffer. This would not work if we had multiple publishers, since the
 * translator is effectively blocking the sequence from progressing while it 
 * is blocked in translateTo().
 *
 * The "in" disruptor is responsible for reading from UDP, doing the upper
 * case transform (the business logic), and then copying the packet to
 * the "out" disruptor"
 */
val RING_SIZE:Int = 1*1024
val BYTE_ARRAY_SIZE:Int = 1*1024

{
  // must have at least as many server threads available for each
  // disruptor as there are handlers
  val inExecutor = Executors.newFixedThreadPool(1)
  val outExecutor = Executors.newFixedThreadPool(1)
  // each disruptor is basically the same
  val disruptorIn = new Disruptor[DatagramEvent](
    DatagramFactory, RING_SIZE, inExecutor, ProducerType.SINGLE,
    new SleepingWaitStrategy)
  val disruptorOut = new Disruptor[DatagramEvent](
    DatagramFactory, RING_SIZE, outExecutor, ProducerType.SINGLE,
    new SleepingWaitStrategy)
  // disruptor handle the events with the provided handlers
  disruptorOut.handleEventsWith(new DatagramSendHandler)
  disruptorOut.start
  disruptorIn.handleEventsWith(new BusinessLogicHandler(disruptorOut))
  disruptorIn.start

  // socket receive resources
  val buffer = ByteBuffer.allocateDirect(BYTE_ARRAY_SIZE)
  val channel = DatagramChannel.open
  channel.socket.bind(new InetSocketAddress(9999))
  channel.configureBlocking(true)

  // core event loop
  while(true) {
    val socket = channel.receive(buffer)
    buffer.flip

    // use an anonymous inner class to implement EventTranslator
    // otherwise we need to do a double copy; once out of the DBB to
    // a byte[], then from a byte[] to a byte[]
    disruptorIn.publishEvent(new EventTranslator[DatagramEvent] {
      def translateTo(event:DatagramEvent,sequence:Long) {
        event.length = buffer.remaining
        buffer.get(event.buffer, 0, buffer.remaining)
        event.address = socket
      }
    })

  }
}

// Business Logic Handler [this is what you replace with your own logic]

/**
 * Business logic goes here
 */
class BusinessLogicHandler(val output:Disruptor[DatagramEvent])
  extends EventHandler[DatagramEvent] {
  def onEvent(event:DatagramEvent, sequence:Long, endOfBatch:Boolean) {
    if (event.address != null) {
      // do the upper case work
      val toSendBytes = new String(
        event.buffer, 0, event.length).toUpperCase.getBytes
      // then publish
      output.publishEvent(new ByteToDatagramEventTranslator(
        toSendBytes, event.address))
    }
  }
}

// ByteToDatagramEventTranslator

/**
 * Pushes an output byte[] and address onto the given DatagramEvent
 */
class ByteToDatagramEventTranslator(val bytes:Array[Byte],
  val address:SocketAddress) extends EventTranslator[DatagramEvent] {
  def translateTo(event:DatagramEvent, sequence:Long) {
    event.address = this.address
    event.length = bytes.length
    System.arraycopy(bytes, 0, event.buffer, 0, bytes.length)
  }
}


// DatagramSendHandler

/**
 * Sends the datagrams to the endpoints
 *
 * Don't bother with endOfBatch, because it's assumed each udp
 * packet goes to a different address
 */
class DatagramSendHandler() extends EventHandler[DatagramEvent] {
  // private channel for the outbound packets
  val channel = DatagramChannel.open
  // this buffer is reused to send each event
  val buffer = ByteBuffer.allocateDirect(BYTE_ARRAY_SIZE)
  def onEvent(event:DatagramEvent, sequence:Long, endOfBatch:Boolean) {
    if (event.address != null) {
      // copy from the byte[] into the buffer
      buffer.put(event.buffer, 0, event.length)
      buffer.flip
      channel.send(buffer, event.address)
      // assume the buffer isn't needed again, so clear it
      buffer.clear
    }
  }
}




Welcome

This blog will chronicle some of my learnings while experimenting with making Java a faster, yet still productive, platform.