Composition in Apache Spark

January 6, 2015

Apache Spark is well-suited for distributed batch processing applications. The Spark API provides many standard functional constructs, but because it lacks tools for composing computations, complex Spark applications tend to favor an imperative style.

In this post, we'll develop a lightweight API around Spark's RDD (Resilient Distributed Dataset) class. With it, we'll be able to build arbitrarily complex computations out of small, manageable chunks.

An example Spark program

We'll refine an example program throughout this post. Our dataset is a simple CSV with information about people. We'll create some functions to parse the data and print some statistics.

# people.csv
# name,age,city
Mary,28,New York
Bill,17,Philadelphia
Craig,34,Philadelphia
Leah,26,Rochester

We'll start with pure Spark implementations of the processing functions (all code is Scala).

case class Person(name: String, age: Int, city: String)

// Parse CSV -> People
def parse(input: RDD[String]): RDD[Person] =
  input.map(line => {
    val parts = line.split(",")
    Person(parts(0), Integer.parseInt(parts(1)), parts(2))
  })

// Names of people
def names(people: RDD[Person]): RDD[String] =
  people.map(_.name)

// People who can vote
def eligibleVoters(people: RDD[Person]): RDD[Person] =
  people.filter(_.age >= 18)

// Residents of a city
def residents(city: String, people: RDD[Person]): RDD[Person] =
  people.filter(_.city == city)

printStats shows them in action.

def printStats() = {
  val sc     = new SparkContext("local", "Example", new SparkConf())
  val input  = sc.textFile("people.csv")
  val people = parse(input)

  val voters           = eligibleVoters(people)
  val phillyResidents  = residents("Philadelphia", people)
  val phillyVoterNames =
    names(residents("Philadelphia", eligibleVoters(people)))

  println("Eligible voters: " + voters.count)
  // => "Eligible voters: 3"
  println("Philly residents: " + phillyResidents.count)
  // => "Philly residents: 2"
  println("Philly voters: " + phillyVoterNames.collect.mkString(","))
  // => "Philly voters: Craig"
}

We read the file into an RDD and pass the intermediate results to each function. This is a fine start. Each step has only one responsibility.

But, there's a problem: we need to juggle the input and pass it to each function. It would be cleaner if we could separate the processing steps from their input, only supplying it at the last moment.

A concrete example would be defining phillyVoterNames purely in terms of the residents, eligibleVoters, and names functions. Something like:

// How do we define the 'andThen' function?
val phillyVoterNames =
  residents("Philadelphia") `andThen` eligibleVoters `andThen` names

println("Philly voters: " + phillyVoterNames.collect.mkString(","))

A trait for sequencing RDDs

We need something to handle sequencing the input behind the scenes. We'll create a Pipe trait with a method | (pronounced "pipe"). This method will return a composite Pipe that executes the first step and passes the output to the next one.

trait Pipe[In, Out] extends Serializable {
  def apply(rdd: RDD[In]): RDD[Out]

  def |[Final](next: Pipe[Out, Final]): Pipe[In, Final] = {
    // Close over outer object
    val self = this
    new Pipe[In, Final] {
      // Run first transform, pass results to next
      def apply(rdd: RDD[In]) = next(self(rdd))
    }
  }
}

Sequencing with | gives us a new Pipe of type In -> Final. We've created a composite object that hides the intermediate Out step.

Let's convert our processing functions to the new style.

// Parse CSV -> People
val parse = new Pipe[String, Person] {
  def apply(input: RDD[String]): RDD[Person] =
    input.map(line => {
      val parts = line.split(",")
      Person(parts(0), Integer.parseInt(parts(1)), parts(2))
    })
}

// People's names
val names = new Pipe[Person, String] {
  def apply(people: RDD[Person]): RDD[String] =
    people.map(_.name)
}

// People who can vote
val eligibleVoters = new Pipe[Person, Person] {
  def apply(people: RDD[Person]): RDD[Person] =
    people.filter(_.age >= 18)
}

// Residents of a city
def residents(city: String) = new Pipe[Person, Person] {
  def apply(people: RDD[Person]): RDD[Person] =
    people.filter(_.city == city)
}

Not too different, right? We put each function call into the apply method of a new Pipe.

Here's the new printStats function.

def printStats() = {
  val sc               = new SparkContext("local", "Example", new SparkConf())
  val phillyResidents  = residents("Philadelphia")
  val phillyVoterNames = phillyResidents | eligibleVoters | names

  val input  = sc.textFile("people.csv")
  val people = parse(input)

  println("Eligible voters: " + eligibleVoters(people).count)
  // => "Eligible voters: 3"
  println("Philly residents: " + phillyResidents(people).count)
  // => "Philly residents: 2"
  println("Philly voters: " + phillyVoterNames(people).collect.mkString(","))
  // => "Philly voters: Craig"
}

This is cleaner: input is no longer cluttering up each call. For more fun, we can include the parse step in the pipeline and pass the input directly to the composed Pipe.

val voters           = parse | eligibleVoters
val phillyResidents  = parse | residents("Philadelphia")
val phillyVoterNames =
  parse | residents("Philadelphia") | eligibleVoters | names

val input  = sc.textFile("people.csv")

println("Eligible voters: " + voters(input).count)
println("Philly residents: " + phillyResidents(input).count)
println("Philly voters: " + phillyVoterNames(input).collect.mkString(","))

Not bad! We can add as many steps as we want without shuffling input around.

What's to gain?

This is a simple example: what advantages does this approach offer for production applications?

Pipes can be used as the input to other Pipes.

As long as the types match, we're free to add more steps to our process. At the end, we still have a simple In -> Out type. Since Pipe objects are first-class, we can extract intermediate steps into objects and re-use them as needed.

Testability.

Separating the input from the computation gives us cleaner integration points for tests. We can also test Pipe functionality independent of a specific domain.

Readability.

Piped sequences of operations read in the direction of data flow. We can visualize each computation as a functional unit.

Composition is king.

The benefits of composition are well-documented. Here at Vistar, we converted a vanilla Spark project to this style early on. It's paid dividends: the code is simple and clear, even when revisiting it much later. If you're adventurous, you can create this pattern in other languages as well. Try it yourself-the advantages will be obvious.

If you're interested in this sort of thing, we'd love to hear from you. We're always hiring.