Reactive Streams in Scala: Akka Streams vs Monix - part 1

Jacek Kunicki

15 Jun 2017.4 minutes read

This is the first part of the Akka Streams vs Monix series. In this part we're going to refresh the general concepts of stream processing and define our example use case. Subsequent installments can be located under the following:

Background

In the land of Scala Reactive Streams implementations there is a number of libraries available. With Akka Streams being arguably the most popular one, there are also some less common ones like Monix or FS2 (formerly Scalaz-Stream).

In this series I'm going to compare two of them: Akka Streams and Monix. Since doing a full API comparison would certainly be out of the scope of a single blog post, we're going to go through an example use case and compare the implementations.

The use case

If you have seen my Practical Introduction to Akka Streams talk somewhere, the use case will look familiar. If not, here is a summary: we have a number of Gzipped CSV files that we want to import to Cassandra with some on-the-fly aggregations.

The CSV files contain some kind of readings, i.e. (id, value) pairs, where every id has two associated values and the records for a given id appear in the subsequent lines in the file. Any of the values may occasionally be an invalid number. Example:

93500;0.5287942176336127
93500;0.3404326895942348
961989;invalid_value
961989;0.27452559752437566
136308;0.07525660747531115
136308;0.6509485024097678

The importer we are going to implement streams the Gzipped files and extracts them on the fly, then converts every line to a domain object representing either a valid or an invalid reading. The next step is to compute an average value for the readings under a given id when any of the readings is valid. When both readings for a given id are invalid, the average is assumed to be -1. Finally, the computed average values are written to Cassandra.

Our goal when implementing the importer will be to divide the import process into small, independent steps, and then combine them into a stream processing pipeline.

Comparing the APIs

Our domain model is pretty straightforward and consists of a single ADT to represent valid and invalid readings:

sealed trait Reading {
  def id: Int
}

case class ValidReading(id: Int, value: Double) extends Reading
case class InvalidReading(id: Int) extends Reading

For both implementations let's assume that we have already implemented the following method:

def parseLine(line: String): Future[Reading]

which, as you have probably guessed, converts a line from a file to a domain object - either a valid or an invalid reading. Since Monix uses its Tasks instead of the plain Scala Futures, the signature is going to be slightly different in the Monix version:

def parseLine(line: String): Task[Reading]

Anyway, the concept of asynchronously creating a Reading remains unchanged.

Before we dig into the implementation of the actual building blocks, let's have a quick overview of the APIs. In general, a stream processing pipeline consists of a data producer, a consumer, and an arbitrary number of intermediate processing stages.

Let's also assume we have a couple of settings loaded from a configuration file:

  • importDirectory - the directory where our data files live,
  • linesToSkip - the number of initial lines to skip in every file,
  • concurrentFiles - the number of files that we want to process in parallel,
  • concurrentWrites - the number of parallel writes to the database,
  • nonIOParallelism - the parallelism level for non-IO operations (like in-memory computations).

Akka Streams

In Akka Streams, the processing pipeline (the graph) consists of three types of elements: a Source (the producer), a Sink (the consumer), and Flows - the processing stages.

Using those components, you define your graph, which is nothing more than a recipe for processing your data - it doesn't do any computations so far. To actually execute the pipeline, you need to materialize the graph, i.e. convert it to a runnable form. In order to do it, you need a so-called materializer which optimizes the graph definition and actually runs it. Therefore, the definition of the graph is completely decoupled from the way of running it, which, in theory, lets you use any materializer to run the pipeline. However, the built-in ActorMaterializer is actually the status quo, so chances are you won't be using any other implementation.

When you look carefully at the type parameters of the components, you will notice that each of them, apart from the respective input/output types, has a mysterious Mat type. It refers to the so-called materialized value, which is a value that is accessible from outside the graph (as opposed to the input/output types which are internal to the communication between the graph stages). In this series we're not going to use the materialized values, so we're going to use a special type parameter - NotUsed - which is nothing more than a unified representation of Scala's Unit and Java's Void

Monix

In Monix, the API is defined in terms of an Observable (the producer), a Consumer, and Transformers - the processing stages. And here a Transformer is just a type alias for a function that transforms an Observable:

type Transformer[-A,+B] = Observable[A] => Observable[B]

Contrary to Akka Streams, the pipeline definition in Monix is not clearly decoupled from the runtime environment, i.e there is no way to provide something like the Akka Streams materializer to influence the way the processing gets executed. Moreover, Monix doesn't introduce anything like the Akka Streams' materialized value - which, overall, makes the API a bit more straightforward.

Summary

In this introductory post you have learned the general concepts of defining stream processing pipelines in Akka Streams and Monix, and familiarized yourself with the example use case.

In the next part of the series, we are going to dive deeper into the implementation of the respective building blocks of the pipeline to learn the nuts and bolts of both libraries.

Blog Comments powered by Disqus.