Leather Hose

breadcrumbz by @zvozin

Time-and-Space Bounded Buffer Actor With Scalaz

| Comments

You meet this pattern a lot when you do network programming - a buffer that flushes upon reaching either a space or a time limit, whichever comes first. Limits of 2-3 secs and/or 100-200 events are often used as a basis for further tuning.

I’d needed this construct a lot lately, and having built it and re-built it a couple of times, arrived at the Scalaz Actor-based version below. If you don’t need proof with your theorems, the complete Gist can be had here. If you do, follow me as we build it from scratch.

Buffering

The core idea is obviously simple - accept message, put them in a private buffer, and once size or time limitation has been reached, flush. We’ll start with the size bound - it’s easier. We’ll use - horror of horrors - a mutable ArrayBuffer too - it’s a lot more performance than the immutable List.

Since we’re using Scalaz’s Actor, which is a final case class, we can’t extend it (really, for the better), so we’ll wrap around it instead like so:

Space-bounded Buffer Actor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import scalaz.concurrent.Actor
import collection.mutable.ArrayBuffer

case class BufferActor[A](onFlush: List[A] => Unit,
                          onError: Throwable => Unit = throw (_),
                          bufferSize: Int = 100) {

  private val buffer = new ArrayBuffer[A](bufferSize)
  private lazy val actor = Actor[A](onMessage, onError)

  private val onMessage: A => Unit = {
    buffer append a
    if (buffer.size >= bufferSize) flush()
  }

  private def flush() {
    if (!buffer.isEmpty) {
      onFlush(buffer.toList)
      buffer.clear()
    }
  }

  def !(a: A) {
    actor ! a
  }
}

Flushing on time

The above buffer works for size, but threatens to become a morass if the flow over it is slow, backing the world up until it collects its bufferSize messages. The common solution is to force a flush at timed intervals regardless of the current buffer size.

Since the only way to talk to a Scalaz Actor is to send messages to it (a good thing), and since it’s strongly-typed (also a good thing), we’ll need an Algebraic Data Type that can represent both our payload messages, and a flush timer, like so:

ADT for payload and flush timer
1
2
3
trait Message[A]
case class MessageWrapper(a: A) extends Message[A]
case object Flush extends Message[A]

We’ll also need a timer to send those Flush objects at a regular intervals, so we whip up a quick convenience wrapper around java.util.Timer:

Convenience for java.util.Timer
1
2
3
4
5
6
7
8
9
10
11
12
case class Timer() {

  private val javaTimer = new java.util.Timer()

  def runIn(msFromNow: Long)(f: => Unit) {
    javaTimer schedule(new TimerTask {def run() { f }}, msFromNow)
  }

  def runEvery(ms: Long)(f: => Unit) {
    javaTimer schedule(new TimerTask {def run() { f }}, ms, ms)
  }
}

Putting it all together

All we have left is to start the timer, which we can simply do in the body of BufferActor since in Scala class bodies are essentially constructor code. Behold:

Fully-cooked BufferActor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
case class BufferActor[A](onFlush: List[A] => Unit,
                          onError: Throwable => Unit = throw (_),
                          bufferSize: Int = 100,
                          flushIntervalMs: Long = 2000L) {

  private trait Message[A]
  private case object Flush extends Message[A]
  private case class MessageWrapper(a: A) extends Message[A]

  private val buffer = new ArrayBuffer[A](bufferSize)
  private val flushTimer = new Timer()
  private lazy val actor = Actor[Message[A]](onMessage, onError)

  /**
   * Get the flush ticker going ...
   */
  flushTimer.runEvery(flushIntervalMs)(actor ! Flush)

  private val onMessage: Message[A] => Unit = _ match {
    case MessageWrapper(a) =>
      buffer append a
      if (buffer.size >= bufferSize) flush()
    case Flush => flush()
  }

  private def flush() {
    if (!buffer.isEmpty) {
      onFlush(buffer.toList)
      buffer.clear()
    }
  }

  def !(a: A) {
    actor ! MessageWrapper(a)
  }
}

Note that the actor is lazy. This is to avoid a race condition that can otherwise occur between the initialization of the class and the firing of the timer.

It’s highly advised that you remove it and see what happens.

As mentioned before, the complete Gist can be had here. Enjoy!

Cloud Servers as Office Supplies

| Comments

Today’s job as a server-side software developer must come with a playground account at the company’s cloud provider of choice (Amazon Web Services and Rackspace Cloud seem to be preferred by most). Here’s why.

Developers as a class are generally blithely unaware of the joys and sorrows of deploying and running software they write. Abstraction, a core principle of dealing with complex systems (software or otherwise), is taken to a rather harsh extreme by today’s software tools. We try to abstract away the fact that code runs on physical boxes that require configuration, monitoring and maintenance, have finite RAM, finite processing capacity, finite # of file handlers, and which, most importantly, unexpectedly fail.

As recently as five years ago this Craft-of-Boxes(tm), otherwise known as system administration, still required significant specific expertise and time. Most developers preferred, to their detriment, not to develop the expertise. Today, with clouds and system administration scripting tools (Puppet, Chef), this is no longer excusable.

Software code isn’t a poem, it’s a recipe for a complex system that operates on a lot of assumptions, a lot of them outside of the code’s control. You can’t be any good at designing complex software systems without intimate familiarity with these assumptions, and their consequences.

One best understands the need for detailed logging after a sleepless night of live-debugging a production issue in a system with poor logs. One learns the value of detailed metrics after manually invoking jstat or the like enough times to cause Carpal Tunnel Syndrom.

So make it a part of your process to have developers write Puppet or Chef scripts and build their own test environments in the cloud. Have your sysadmin help a lot, but make sure that every feature walks every step of the way, and every assumption about environment is checked off by having the developer script, configure and create it.

A medium Amazon EC2 instance costs 1/150th of a good developer’s salary - within the margin of error. There’s no reason not to just give you developers access and let them have at it.

Do it. Then see the number of production issues drop, and the quality of your organization’s life improve.

Non-blocking Actor Pipelines With Scalaz

| Comments

Scala has a selection of actor libraries: there’s the standard library Actor, deprecated in Scala 2.10 in favor of Akka Actor. There’s also Lift’s Actor, and the Scalaz Actor, which I usually prefer.

For those unfamiliar with actors and too lazy to read through the Wikipedia article above, tl;dr. Actors are basically an improved version of something all who’ve written networked Java apps have created at one time or another - an asynchronous message-processing queue.

Our naïve implementations of such queues usually used an inefficient sub-species of java.util.collection, and ran in a dedicated thread, idling a lot. Actors use hyper-specialized FIFO structures and get scheduled across a thread pool as messages arrive, thus ending up ultra-light and blazingly fast, with Scalaz currently the fastest.

Scalaz Actor

Scalaz actors differ from the rest of the crowd in several other beneficial ways. A typical actor processes messages through a PartialFunction[Any => Unit], essentially disconnecting you from the type system. It is also usually represented as a trait, making composition tedious.

In contrast, Scalaz Actor is defined as

Scalaz Actorlink
1
2
final case class Actor[A](handler: A => Unit, onError: Throwable => Unit = throw(_))
                         (implicit val strategy: Strategy)

Message processing goes through a total function of type A => Unit, making the actor typed, helping avoid the annoyingly common error of sending the wrong thing to the wrong actor. The need for error handling is made explicit with onError, reminding one that an exception in the message processing loop will cause an actor to terminate, and needs to be handled.

Additionally, actor scheduling is made explicit as a concept by requiring an implicit (pardon) instance of scalaz.concurrent.Stragegy. The default strategy schedules actors on a thread pool (scalaz.concurrent.Strategys.DefaultStrategy), but the library also provides for a naïve thread/actor strategy (scalaz.concurrent.StrategysLow.Naive), and the fully-synchronous scalaz.concurrent.StrategysLow.Sequential, which causes everything to run in the current thread, and is really useful in testing.

An improbably useful Scalaz actor that tries to parse a string into an Int and print it, or fails loudly, would be created thusly (import "org.scalaz" %% "scalaz-concurrent" % "7.0.0" into your SBT project; if you’re using another build tool, scalaz7 is built for Scalas 2.9.2 and 2.10):

Scalaz Actor
1
2
3
4
5
6
7
import scalaz.concurrent._
import Actor
import Strategy._

val usefulnessIncarnate = Actor[String](
                              handler = println(_.toInt),
                              onError = e => println("Ouch: " + e.getMessage))

Contramap

Scalaz Actor’s best kept secret, though, is contramap. It is what is sounds like - a map in reverse. Consider the following chain of calls on an Option[String]:

Chain of Option maps
1
Some("1") map (_.toInt) map (_ * 2.0) foreach (println(_))

It takes you from Option[String] to Option[Int] to Option[Float], and terminates in a side-effect - a print-out - all happening on the same thread. Now imagine, if you will, that the terminating print-out happens in an actor message loop (assume scalaz-concurrent is imported):

Chain of Option maps
1
val printer = Actor[Float](println(_))

Contramap, then, works as follows: given val z = Actor[Z], and a function f: Y => Z, z contramap f will return an val y = Actor[Y] that will apply f to the messages it receives, and ! the result to z.

It’s easy to see how it will allow us to build the same chain of transformations as above, but with each step performed asynchronously: we’ll simply have to build the chain backwards. Witness:

Chain of Option maps
1
2
val pipeline = Actor[Float](println(_)) contramap (s: Int => s * 2.0) contramap (s: String => s.toInt)
pipeline ! "1"

That’s it. There you have it - a fully asynchronous processing pipeline assembled by contramap‘ing simple functions.

For a real-world use, consider a simple API that receives a JSON message, de-serializes it into a case class (you’re probably going to want to do that synchronously), transforms it into a DBObject, and saves to Mongo. All you now need to do is provide a function of type YourCaseClass => DBObject, and the persistence side-effect DBObject => Unit - and voila!, a high-performance non-blocking pipeline in one elegant line.

Enjoy!

Streaming APIs With Jetty WebSockets

| Comments

Firstly: if you’re looking for ways to develop a Comet app that would stream data to the browser, stop and don’t go no further. You don’t want to use raw WebSockets, you want to use a framework. If you’re on JVM, there’s Atmosphere, Direct Web Remoting, CometD, Play, and last but not least, Lift, which I would recommend as still having best-in-class™ Comet support. On Node.JS there’s Socket.IO.

Sometimes, though, you want to publish a streaming API, and that’s where raw WebSockets are very helpful. We’ll assume you’re working on the JVM, and will use the ones provided by Jetty; they are somewhat of a reference implementation. We’ll code in Scala, but will keep it sufficiently simple that a back-port to Java would be trivial if required. Include jetty-websocket in your Java web-app project, and let’s go.

WebSocketServlet

There are two ways to expose WebSockets from Jetty: raw, and through the Servlet API. We’ll use the latter - it’s more robust. You can use the initial servlet request to authenticate and pass parameters to configure the socket, as opposed to having to do it all yourself in-stream. You will need to implement org.eclipse.jetty.websocket.WebSocketServlet:

Your implementation of WebSocketServlet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class YourWebSocketServlet extends WebSocketServlet {

  /**
   * Do your authentication and parameter setting here
   */
  override def service(req: ServletRequest, res: ServletResponse) {
      if(youThinkItsOkForThisConnectionToGoAhead) super.service(req, res)
      else res.asInstanceOf[HttpServletResponse] setStatus 401
    }
  }

  /**
   * Return your in-bound message handler here
   */
  def doWebSocketConnect(request: HttpServletRequest, protocol: String) = YourWebSocket()
}

WebSocket

YourWebSocket from line #15 above should be an implementation of one or more sub-interfaces of the confusingly named org.eclipse.jetty.websocket.WebSocket. It’s not really a socket, it’s a message handler and a connection end-point; the WebSocket hydraulics are taken care of under the hood.

Assuming you’re exposing a text-based API (e.g., a JSON stream), you’ll want to implement org.eclipse.jetty.websocket.WebSocket.OnTextMessage:

Your implementation of WebSocket
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
case class YourWebSocket(handler: String => Unit) extends WebSocket.OnTextMessage {

  private var connection: Option[Connection] = None

  def onMessage(data: String) {
    handle(data)
  }

  def onOpen(conn: Connection) {
    connection = Some(conn)
  }

  def send(msg: String) {
    connection map (_ sendMessage msg) getOrElse {
      throw new IllegalStateException("Socket is not connected")
    }
  }
}

The underlying org.eclipse.jetty.websocket.WebSocketFactory will call onOpen() and pass in the connection you can use to send messages down the socket if you would want to. All that’s left is to hook your servlet up in web.xml:

Bootstrapping WebSocketServlet
1
2
3
4
5
6
7
8
9
10
11
<web-app>
  <servlet>
      <servlet-name>WebSocketServlet</servlet-name>
      <servlet-class>YourWebSocketServlet</servlet-class>
  </servlet>

  <servlet-mapping>
      <servlet-name>WebSocketServlet</servlet-name>
      <url-pattern>/streaming-api/*</url-pattern>
  </servlet-mapping>
</web-app>

You should be all set. The WebSocket URLs start with the protocol identifier ws, so you’ll call your newly minted WebSockets API like so (assuming the default Java web app port of 8080): ws://localhost:8080/streaming-api

WebSocketClient

To test your API, use org.eclipse.jetty.websocket.WebSocketClient:

WebSocketClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
object TestClient {
  
  def main(args: Array[String]) {
    val factory = new WebSocketClientFactory()
    factory.start()

    val connection = factory.newWebSocketClient().open(
      new URI("ws://localhost:8080/streaming-api"),
      new WebSocket.OnTextMessage() {
        def onOpen(connection: Connection) {
        }

        def onClose(closeCode: Int, message: String) {
        }

        def onMessage(data: String) {
          println("Received %s from server" format s)
        }
      }).get(5, TimeUnit.SECONDS)

  connection send ("Wassup?")
  }
}

Happy streaming!