Scala: Collections of Futures

Version

  1. Scala 2.11.1

Overview

One of my favorite things about Scala is the amazing collections library. Scala’s collections library combines the best of standard functional idioms with an OOP call style. This makes for some down right beautiful code. However — after working with Scala for a bit over a year now, it has become very apparent to me that the Scala collection’s library was not written with asyncronous/reactive programming in mind.

Note
The Scala/Akka standard library Future is not lazy. Once a Future is constructed, it is “hot” — executing immediately. There are other libraries whose Future implementation is lazy (such as Scalaz2). Lazy futures allow building up an execution “plan” which is eventually run by the ultimate caller. This approach has many advantages, but sadly is not the direction Scala/Akka took. This article focuses exclusively on the Scala/Akka standard library Future1.
Note

I’ve tried to make sure all of the code examples here can be pasted directly into the Scala console. Some boilerplate is required to make these work:

Make sure to paste this into the console before trying the code below.

Note
Scala has wonderful type inference which ordinarinally makes it unnecessary to explicitly declare the type of vals. However, for maximum readability in my examples, I explicitly label the type of vals.

Calling a method that returns a Future N times

Once I decided to start using futures, they started to bleed into function return-type signatures everywhere. One place in particular that they started to show up was in the service layer:

Listing 1

Using a service layer like this was straightforward, until I needed to call it N times:

Listing 2

This was my first intuition about how to call the service N times. It seemed straightforward to me at the time and it even compiles and runs! But what this actually does is to immediately create a List[Future[Unit]] of 20 “hot” futures — all 20 futures have been submitted for execution! While this might be ok for 20 futures, it’s not ok for 1,000, 10,000 or 100,000+ futures.

Internally, the executor stores futures in a queue and executes as many futures as it has workers simulatenously. Dumping too many futures into the executor queue at once will starve other code that uses the same executor and can cause an out of memory error. Definitely not what I wanted.

Also, there is another problem here: each future returned by svc.doSomething is discarded by assignment to Unit. Not only am I not properly waiting on my futures to complete, but by assigning my Future to Unit, I’m throwing away any exception that might be thrown! Also not what I want.

Tip
Assigning a Future to Unit is always an error. Perhaps in the future the compiler will emit a warning about this, but for now the only way to discover this is by code review of programmers who may be new to futures. Even worse, this code may not fail at runtime. For actions returned in futures that complete quickly and are 99% exception free, this bug might go unnoticed for sometime.

Never assign Future to Unit

So what can I do? I need to stop assigning Future to Unit. I can do this by using map instead of foreach. Also, I need a way to properly wait for all my futures to complete:

Listing 3

  1. Use map instead of foreach to ensure I don’t discard any futures
  2. Use Future.sequence to convert a List[Future[Unit]] to Future[List[Unit]]
  3. Properly wait on all my futures to complete. Also, now that I’ve waited on all my futures, I can safely discard my List[Unit] because no exceptions were thrown

If any exceptions are thrown during svc.doSomething calls, the exception will percolate up through Await.result. But how can I stop dumping all futures into the executor at once?

Future.sequence and the functional “sequence” idiom

Most of this is standard Scala collections stuff, but what is Future.sequence? Future.sequence is a Future specific version of the “sequence” functional idiom (not to be confused with the Scala collection type Sequence). There is much more to know about “sequence”, but it is basically a way to invert the nesting of two monads, i.e. G[F[_]] to F[G[_]] (given certain properties of types G and F). Unfortunately, standard Scala requires some compiler magic3 to generically implement sequence. Without resorting to this magic, I have to copy the sequence pattern for different monad types, such as has been done for Future.sequence.

In addition to making Future the outer monad, Future.sequence also takes care of waiting for all of our futures to complete. In listing 3, futResult is of type Future[List[Unit]], which is completed once all of my inner futures complete

Controlling the flow of Future execution

I’ve solved the problem of discarding futures, but I still need to somehow regulate the flow of how many futures go “hot” simulatenously.

Listing 4

  1. Group someInts into a group size that I want to execute simulatenously
  2. For each group create a Future[List[Unit]]
  3. Use Await.result inside the map to wait for each group to complete
  4. Because I divided someInts into groups, I need to flatten the results (Note: this isn’t strictly necessary since result is Unit in this example. I’m going to discard List[Unit] anyway, but if result wasn’t Unit it would be necessary to flatten)

Ok this works. I’ve ensured that no more than N svc.doSomething calls are happening at once and exceptions are never discarded. However, this pattern has a fatal flaw. It does not pass a future back as a result. For the purposes of writing example code, this kind of thing gets the job done. However, when writing code that will live in an asyncronous eco-system, I must make my result a Future.

Tip
When writing a method that calls other functions or methods that return Future, I need to make sure to return a Future to callers of my method. This allows callers to use the Future of my method’s return value in the same way that I did when I called other methods that returned me a Future.

Returning a Future to callers

This has gotten complicated fast! But I feel like I’m almost there, so I will keep going. I’m modifying Listing 3 to ensure my result is a Future:

Pattern 1.0

  1. I’ve replaced map with foldLeft. This will ensure that each group is processed one at a time, from left to right and will accumulate the Future[List[Unit]] result after each group completes. The accumulator is initialized with already completed Future of an empty List[Unit].
  2. Future.flatMap is used here instead of Future.map to flatten the inner return type of Future[List[Unit]] over the entire collection (If Future.map had been used, it would return Future[Future[List[Unit]]]).
  3. After a group completes, the result accumulates

Ok this is much better. I’m ensuring that I don’t discard exceptions, I control the flow of futures AND now I return a Future to callers. But I call a Future returning method N times in many places. This is a pretty tedious pattern to have to repeat everywhere. Scala gives me some amazing utilities for cleaning up complexity like this.

Pimp My Future: Pattern 1.1

I’m going to cleanup Pattern 1.0 using a for-comprehension7 and the Pimp-My-Library Pattern5 with the Scala Value Class6. The pimp-my-library pattern allows creating an implicit wrapper class that can “add” a method to an existing class, essentially making an OOP style call convention for the new method. The Value Class (added in Scala 2.10) makes the implicit wrapper class free — the compiler optimizes away the wrapper in emitted bytecode.

Listing 4

Pattern 1.1

  1. Replaced Future.flatMap and nested Future.map with a much cleaner more readable for-comprehension7
  2. Replaced Future.sequence with sugar method
  3. Replace Await.result with sugar method

I like the OOP style call convention, but this pattern is still tedious. Perhaps, I can make this even simpler?

Pimp My Future: Pattern 1.2

I’m going to further cleanup Pattern 1.1 by creating another pimp-my-library method on a new value class Future_PimpMyTraversableOnce.

Listing 5

  1. Convert to List here for efficient accumulation of results (and no grouped method on TraversableOnce)
  2. Convert back to desired collection
Pattern 1.2

Much better! My code is now simple, readable, idiomatic, doesn’t discard exceptions, doesn’t flood the executor with futures and returns a future to the caller!

Further Exploration: Problems with Pattern 1

Pattern 1 solves the problem of discarding exceptions, regulating the flow of “hot” futures and returning a future to callers, but it isn’t the most efficient way of handling this problem. Because it has to wait for each group to complete, one of the svc.doSomething calls could take an extra long time. If it does, even though the other futures in its group have completed, I have to wait for that one long call to complete before moving on to the next group. Ideally, I should make it so that there are always N futures running simultaneously instead of grouping them. Work for another day!

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">