I was recently introduced to the scalaz-stream library by the excellent blog post Actors are Overly Nondeterministic. I decided to take a look at them, and I realize they have a lot to offer.

One of the cools things I noticed could be built with them is Clojure-style Agents. But Agents implemented using scalaz-stream are actually more general than clojure agents - asynchronous multithreaded processing is not required. If you want to run agents in a separate thread using a message queue for communication, you can. But you can also run them in a single thread if that's more efficient. Using scalaz-stream, we can actually generalize over the specific dispatch method.

In this post I'll both explain a little bit about scalaz-stream and also describe the scalaz agent library I built on top of it.

Please note that the agent library is in the very preliminary stages and is probably under 100 lines of code. You can't do a lot with it yet.

scalaz.concurrent.Process

A Process[F[_], T] is a stream of values of type T, but which has the ability to make external requests of type F[_]. So for example, a Process might be a stream of Int objects, but while processing the Int objects it could also make requests return an Option[X] to assist in the computation.

I'm going to avoid more abstract nonsense and give an example. My working example will be building a conversion rate tracker, so I'll define the basic relevant data types:

sealed trait Event
case object Display extends Event
case object Conversion extends Event

The Display object represents the display of an ad to a user, while Conversion represents the user clicking on the ad.

An event stream will be an object of type Process[F[_], Event]. The simplest way to build one is as follows:

val testSeq: Process[Nothing,Event] = Process.emitSeq(Seq[Event](Display,Conversion,Display,Display))

This is little more than a wrapper around an underlying Seq. In fact, it literally evaluates to exactly that:

scala> testSeq
res2: scalaz.stream.Process[Nothing,ScalazStreaming.Event] =
    Emit(List(Display, Conversion, Display, Display),Halt(scalaz.stream.Process$End$))

The underlying monad here is Nothing - i.e., there is no monadic effect in play here.

An infinite stream

We might also want to play with an infinite stream. Defining this takes a bit more work:

import scalaz.stream._
import scalaz.stream.io._
import scalaz.concurrent.Task

val rnd = new Random
val conversionRate = 0.10

def simulatedEvent: Process[Task,Event] = Process.await(Task.delay({ () }))(recv = _ => {
  if (rnd.nextDouble < conversionRate) {
    Process.emitSeq( Seq(Display,Conversion) )
  } else {
    Process.emit(Display)
  }
})

The simulatedEvent function represents a single lazily-evaluated event where we displayed an ad to a user, following which the user may or may not have converted. 90% of the time, we will see the following:

scala> simulatedEvent.toSeq
Seq(Display)

10% of the time we get the following:

scala> simulatedEvent.toSeq
Seq(Display, Conversion)

Now to create an infinite stream of events, we simply use the repeat method:

def eventStream = simulatedEvent.repeat

Agents

An agent is merely a piece of data (the state) together with a function that takes a message and returns a new agent with altered state.

trait Agent[T, A <: Agent[T,A]] {
  def receive(d: T): A
}

The type signature is fairly complicated, but an example will clear things up. We'll build a StatisticsCalculator, which is a class that receives an event (either a Display or a Conversion) and updates it's statistics:

case class StatisticsCalculator(displays: Long, conversions: Long) extends Agent[Event, StatisticsCalculator] {
  def receive(e: Event): StatisticsCalculator = e match {
    case Display => this.copy(displays=displays+1)
    case Conversion => this.copy(conversions=conversions+1)
  }
}

This is more or less how we would build a mutable statistics counter - the only difference is that this version is purely functional. The complicated type signature is necessary to ensure that send returns a StatisticsCalculator rather than an Agent[Event,StatisticsCalculator].

For those familiar with akka actors, the receive method should be considered as an immutable version of Akka's receive.

scan - a streaming version of cumulative sum

The Process type provides a method called scan:

def scan[B](b: B)(f: (B,O) => B): Process[F,B]

The scan method is sort of a cumulative fold. Rather than simply providing the result of folding the entire stream, it provides the intermediate results along the way as well. I.e., suppose ones were a stream repeating an infinite sequence of 1. Then:

ones.scan(0)( (x, y) => x+y )

would return the stream 0,1,2,3,....

Using scan, we can define a runAgent function:

def runAgent[F[_], T, A <: Agent[T,A]](process: Process[F,T], initialState: =>A): Process[F,A]
    = process.scan(initialState)( (a, t) => a.receive(t) )

That's a complicated type signature, but only due to the signature of the Agent.

Computing the history of the agent

If we now apply the runAgent function, we get a stream of agent values:

scala> val testSeq: Process[Nothing,Event] = Process.emitSeq(Seq[Event](Display,Conversion,Display,Display))

scala> runAgent(testSeq, StatisticsCalculator(0,0)).toSeq
Seq[com.chrisstucchio.spire_examples.ScalazStreaming.StatisticsCalculator] =
    Vector(StatisticsCalculator(0,0), StatisticsCalculator(1,0), StatisticsCalculator(1,1),
           StatisticsCalculator(2,1), StatisticsCalculator(3,1))

So using the Agent, we've synchronously run a calculation on a sequence of events. At the start, the Agent has the state StatisticsCalculator(0,0). After seeing the first event (a Display), it's state is StatisticsCalculator(1,0) representing that it saw a single Display and 0 Conversions. Schematically, we've done the following:

Input data: ()   , Display, Conversion, Display, Display
State:      (0,0), (1,0),   (1,1),      (2,1),   (3,1)

So far this is nothing special - it's just a fancy way of simulating stateful code in a purely functional system.

Changing the execution model

In the previous section, we merely used agents to run a purely functional computation. The function Process.emitSeq did little more than put a wrapper around a standard scala Seq. The runAgent function merely ran a fold-like operation on the immutable StatisticsCalculator objects.

However, we can also run an infinite calculation using the exact same code. Recall the eventStream:

def eventStream: Process[Task,Event] = simulatedEvent.repeat

Note that the type of this is now Process[Task,Event] instead of Process[Nothing,Event]. I.e., there is an effectful monad underlying this stream.

scala> runAgent(eventStream, StatisticsCalculator(0,0))
res3: scalaz.stream.Process[scalaz.concurrent.Task,ScalazStreaming.StatisticsCalculator] =
    Emit(List(StatisticsCalculator(0,0)),Await(scalaz.concurrent.Task@5fd243ea,<function1>,Halt(scalaz.stream.Process$End$),Halt(scalaz.stream.Process$End$)))

This looks like a little bit more than a wrapper around a Seq. The result of runAgent on the eventStream is now the first object, together with a closure representing how to continue the computation. If we want to actually run the computation, we need to call the .run method. But that will be an infinite computation with no side effects, so lets do something more interesting. Lets produce some side effects:

scala> val s = runAgent(eventStream, StatisticsCalculator(0,0)).map(x => println(x))
s: scalaz.stream.Process[scalaz.concurrent.Task,Unit] =
      Emit(List(()),Await(scalaz.concurrent.Task@7150c54a,<function1>,Halt(scalaz.stream.Process$End$),Halt(scalaz.stream.Process$End$)))

We've produced a value of type Process[Task,Unit] as expected, but we've also generated a side effect. The following was printed to stdout:

StatisticsCalculator(0,0)

To run the stream, we use the .run method, which returns a Task[Unit]. For those unfamiliar with scalaz.concurrent.Task, it's the scalaz alternative to scala.concurrent.Future. So running the Process[Task,Unit] is basically a way of sequencing a bunch of Future-like objects together.

The Future (oops Task) can then be run with it's own .run method (equivalent to Await.ready(future, duration) from Akka):

scala> s.run.run
StatisticsCalculator(0,0)
StatisticsCalculator(1,0)
StatisticsCalculator(2,0)
StatisticsCalculator(2,1)
StatisticsCalculator(3,1)
...etc...

(There are some interesting differences between Future and Task. I'll discuss them in another post.)

Effectful Agents

One common use of Actors is to sequence work to be performed. We can use Agents for a similar purpose. For this we need to introduce the EffectfulAgent:

trait EffectfulAgent[T, A <: Agent[T,A], F[_]] extends Agent[T,A] {
  def discard: A
  def effect: F[A]
}

Here the effect method actually does work. It returns an agent wrapped inside a functor F[_] - this is interpreted as meaning the effect method does the actual work. Splitting the receive and effect methods of an agent allows us to run an agent and simulate it's state without actually running the side effects.

An EffectfulAgent should satisfy the laws:

a.discard.effect == mzero
a.discard.discard == a.discard

(The first law assumes of course that F[_] is a Monad with Zero.) This means that an agent who's effect has been discarded has no effect.

We also require that if we ignore effects:

a.effect == a.discard.point[F]

This means that calling the discard method on an EffectfulAgent changes the state of the agent in the same way as actually running it's effect.

Example - a printing statistics calculator

To provide an example of an EffectfulAgent, I'll demonstrate a StatisticsCalculator which prints it's data to stdout every 10 displays.

case class PrintingStatisticsCalculator(displays: Long, conversions: Long, toPrint: Option[String]) extends EffectfulAgent[Event,PrintingStatisticsCalculator,IO] {
  def receive(e: Event): PrintingStatisticsCalculator = e match {
    case Display => if (displays + 1 % 10 == 0) {
        this.copy(displays = displays + 1, toPrint = Some("Current state: " + this))
      } else {
        this.copy(displays=displays+1)
      }
    case Conversion => this.copy(conversions=conversions+1)
  }

  def discard: PrintingStatisticsCalculator = this.copy(toPrint = None)
  def effect: IO[PrintingStatisticsCalculator] = toPrint.map( putStrLn ).getOrElse(ioUnit).map( _ => discard)
}

The toPrint element represents the data to be printed (if any). It's pretty clear from the definition above that the laws are satisfied.

Running an effectful agent is done via the agentEffect method:

def runAgentForEffect[F[_],F2[x] >: F[x], T, A <: EffectfulAgent[T,A,F2]](process: Process[F,T], initialState: A): Process[F2,A] =
    process.scan(initialState)( (a, t) => a.discard.receive(t) )
      .evalMap(_.effect)

A complicated type signature, but the idea is simple. First we run the agent, in order to generate a Process in which the state changes. Then for each state, we run it for it's effect. Then we discard the effect and continue.

Separating state from effect

By splitting state from effect, we create the ability to reuse code by swapping out different effects. The business logic of the PrintingStatisticsCalculator does not change just because we want different effects. So if we needed multiple effect implementations, we could easily mix in different effect implementations with traits or other methods.

Testing the agent

By splitting state away from effects, testing becomes much easier. If I want to test an Agent, I simply generate input as a Seq:

Process.emitSeq(Seq[Event](Display,Conversion,Display,Display))

and then check whether the output matches the desired sequence:

assert(runAgent(testSeq, StatisticsCalculator(0,0)).toSeq === Seq(StatisticsCalculator(0,0), StatisticsCalculator(1,0), StatisticsCalculator(1,1) ... etc

This separates business logic from implementation details. This is a lot easier and faster than testing Akka code. Testing Akka actors typically involves spinning up actor systems, and code is littered with expectMsgPf(250 milliseconds) and similar things.

Note that we do not need to spin up an ActorSystem as part of the test.

Testing side effects

We can also test the business logic of EffectfulAgent objects without running their effects:

assert(runAgent(testSeq, PrintingStatisticsCalculator(0,0)).toSeq === Seq(PrintingStatisticsCalculator(0,0,None), ... etc)

I.e., we can check that at certain stages of the computation, printingStatisticsCalculator.toPrint == Some(...), but we do not need to actually run the effect as part of the test.

This is very handy in making test suites run more quickly - we can write a small number of tests which check that effects are properly performed, and write a much larger number of tests which verify the business logic of an EffectfulAgent. The tests of business logic never need to actually produce a side effect. I.e., I'll write many tests ensuring that:

streamEndState === PrintingStatisticsCalculator(20,6,Some("data to print"))

but a much smaller number of tests to determine that PrintingStatisticsCalculator(_,_,Some("data to print")).effect actually prints "data to print".

Conclusion

Akka's actors are a great system, one which I use every day. As a tool for distributed computing they are extremely helpful.

But as far as type safety, they leave a lot to be desired - a standard untyped actor is built on the type signature Any => Unit with side effects. It's my belief that we can do better for non-distributed systems, both in terms of orthogonality and type safety. After playing with scalaz-stream, I'm beginning to think that we might have other useful abstractions around which we can build systems.

Source code for the agent library is available on github.


Subscribe to the mailing list