Version
- Scala 2.11.1
Overview
One of my favorite things about Scala is the amazing collections library. Scala’s collections library combines the best of standard functional idioms with an OOP call style. This makes for some down right beautiful code. However — after working with Scala for a bit over a year now, it has become very apparent to me that the Scala collection’s library was not written with asyncronous/reactive programming in mind.
Note
|
The Scala/Akka standard library Future is not lazy. Once a Future is constructed, it is “hot” — executing immediately. There are other libraries whose Future implementation is lazy (such as Scalaz2). Lazy futures allow building up an execution “plan” which is eventually run by the ultimate caller. This approach has many advantages, but sadly is not the direction Scala/Akka took. This article focuses exclusively on the Scala/Akka standard library Future1. |
Note
|
I’ve tried to make sure all of the code examples here can be pasted directly into the Scala console. Some boilerplate is required to make these work:
Make sure to paste this into the console before trying the code below. |
Note
|
Scala has wonderful type inference which ordinarinally makes it unnecessary to explicitly declare the type of vals. However, for maximum readability in my examples, I explicitly label the type of vals. |
Calling a method that returns a Future N times
Once I decided to start using futures, they started to bleed into function return-type signatures everywhere. One place in particular that they started to show up was in the service layer:
1 2 3 4 5 6 |
trait MyService { def doSomething(i: Int) : Future[Unit] } class MyServiceImpl extends MyService { def doSomething(i: Int) : Future[Unit] = Future { Thread.sleep(500);println(i) } } |
Using a service layer like this was straightforward, until I needed to call it N times:
1 2 3 |
val svc : MyService = new MyServiceImpl val someInts : List[Int] = (1 to 20).toList val result : Unit = someInts.foreach(svc.doSomething _) |
This was my first intuition about how to call the service N times. It seemed straightforward to me at the time and it even compiles and runs! But what this actually does is to immediately create a List[Future[Unit]] of 20 “hot” futures — all 20 futures have been submitted for execution! While this might be ok for 20 futures, it’s not ok for 1,000, 10,000 or 100,000+ futures.
Internally, the executor stores futures in a queue and executes as many futures as it has workers simulatenously. Dumping too many futures into the executor queue at once will starve other code that uses the same executor and can cause an out of memory error. Definitely not what I wanted.
Also, there is another problem here: each future returned by svc.doSomething is discarded by assignment to Unit. Not only am I not properly waiting on my futures to complete, but by assigning my Future to Unit, I’m throwing away any exception that might be thrown! Also not what I want.
Tip
|
Assigning a Future to Unit is always an error. Perhaps in the future the compiler will emit a warning about this, but for now the only way to discover this is by code review of programmers who may be new to futures. Even worse, this code may not fail at runtime. For actions returned in futures that complete quickly and are 99% exception free, this bug might go unnoticed for sometime. |
Never assign Future to Unit
So what can I do? I need to stop assigning Future to Unit. I can do this by using map instead of foreach. Also, I need a way to properly wait for all my futures to complete:
1 2 3 4 5 |
val futResult : Future[List[Unit]] = Future.sequence { someInts.map(svc.doSomething _) // # <b><1></b> } // # <b><2></b> val result : Unit = Await.result(futResult,Duration.Inf) // # <b><3></b> |
- Use map instead of foreach to ensure I don’t discard any futures
- Use Future.sequence to convert a List[Future[Unit]] to Future[List[Unit]]
- Properly wait on all my futures to complete. Also, now that I’ve waited on all my futures, I can safely discard my List[Unit] because no exceptions were thrown
If any exceptions are thrown during svc.doSomething calls, the exception will percolate up through Await.result. But how can I stop dumping all futures into the executor at once?
Controlling the flow of Future execution
I’ve solved the problem of discarding futures, but I still need to somehow regulate the flow of how many futures go “hot” simulatenously.
1 2 3 4 5 6 7 8 9 10 11 12 |
val result : Unit = someInts .grouped(3) // # <b><1></b> .toList .map { group => val innerFutResult : Future[List[Unit]] = // # <b><2></b> Future.sequence { group.map(svc.doSomething _) } Await.result(innerFutResult, Duration.Inf) // # <b><3></b> } .flatten // # <b><4></b> |
- Group someInts into a group size that I want to execute simulatenously
- For each group create a Future[List[Unit]]
- Use Await.result inside the map to wait for each group to complete
- Because I divided someInts into groups, I need to flatten the results (Note: this isn’t strictly necessary since result is Unit in this example. I’m going to discard List[Unit] anyway, but if result wasn’t Unit it would be necessary to flatten)
Ok this works. I’ve ensured that no more than N svc.doSomething calls are happening at once and exceptions are never discarded. However, this pattern has a fatal flaw. It does not pass a future back as a result. For the purposes of writing example code, this kind of thing gets the job done. However, when writing code that will live in an asyncronous eco-system, I must make my result a Future.
Tip
|
When writing a method that calls other functions or methods that return Future, I need to make sure to return a Future to callers of my method. This allows callers to use the Future of my method’s return value in the same way that I did when I called other methods that returned me a Future. |
Returning a Future to callers
This has gotten complicated fast! But I feel like I’m almost there, so I will keep going. I’m modifying Listing 3 to ensure my result is a Future:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
val futResult : Future[List[Unit]] = someInts .grouped(3) .toList .foldLeft(Future.successful(List[Unit]())) { (futAccumulator,group) => // # <b><1></b> futAccumulator.flatMap { accumulator => // # <b><2></b> val futInnerResult : Future[List[Unit]] = Future.sequence { group.map(svc.doSomething _) } futInnerResult.map(innerResult => accumulator ::: innerResult) // # <b><3></b> } } val result : Unit = Await.result(futResult,Duration.Inf) |
- I’ve replaced map with foldLeft. This will ensure that each group is processed one at a time, from left to right and will accumulate the Future[List[Unit]] result after each group completes. The accumulator is initialized with already completed Future of an empty List[Unit].
- Future.flatMap is used here instead of Future.map to flatten the inner return type of Future[List[Unit]] over the entire collection (If Future.map had been used, it would return Future[Future[List[Unit]]]).
- After a group completes, the result accumulates
Ok this is much better. I’m ensuring that I don’t discard exceptions, I control the flow of futures AND now I return a Future to callers. But I call a Future returning method N times in many places. This is a pretty tedious pattern to have to repeat everywhere. Scala gives me some amazing utilities for cleaning up complexity like this.
Pimp My Future: Pattern 1.1
I’m going to cleanup Pattern 1.0 using a for-comprehension7 and the Pimp-My-Library Pattern5 with the Scala Value Class6. The pimp-my-library pattern allows creating an implicit wrapper class that can “add” a method to an existing class, essentially making an OOP style call convention for the new method. The Value Class (added in Scala 2.10) makes the implicit wrapper class free — the compiler optimizes away the wrapper in emitted bytecode.
1 2 3 4 5 6 7 8 |
implicit class Future_PimpMyFuture[T](val self: Future[T]) extends AnyVal { def get : T = Await.result(self, Duration.Inf) } implicit class Future_PimpMyTraversableOnceOfFutures[A, M[AA] <: TraversableOnce[AA]](val self: M[Future[A]]) extends AnyVal { /** @return a Future of M[A] completes once all futures have completed */ def sequence(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], ec: ExecutionContext) : Future[M[A]] = Future.sequence(self) } |
1 2 3 4 5 6 7 8 9 10 11 12 |
val futResult : Future[List[Unit]] = someInts .grouped(3) .toList .foldLeft(Future.successful(List[Unit]())) { (futAccumulator,group) => for { // # <b><1></b> accumulator <- futAccumulator innerResult <- group.map(svc.doSomething _).sequence // # <b><2></b> } yield accumulator ::: innerResult } val result : Unit = futResult.get // # <b><3></b> |
- Replaced Future.flatMap and nested Future.map with a much cleaner more readable for-comprehension7
- Replaced Future.sequence with sugar method
- Replace Await.result with sugar method
I like the OOP style call convention, but this pattern is still tedious. Perhaps, I can make this even simpler?
Pimp My Future: Pattern 1.2
I’m going to further cleanup Pattern 1.1 by creating another pimp-my-library method on a new value class Future_PimpMyTraversableOnce.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
implicit class Future_PimpMyTraversableOnce[A, M[AA] <: TraversableOnce[AA]](val self: M[A]) extends AnyVal { /** @return a Future of M[B] that completes once all futures have completed */ def mapAsync[B](groupSize: Int)(f: A => Future[B])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], cbf2: CanBuildFrom[Nothing, B, M[B]], ec: ExecutionContext) : Future[M[B]] = { self .toList // # <b><1></b> .grouped(groupSize) .foldLeft(Future.successful(List[B]())) { (futAccumulator,group) => for { accumulator <- futAccumulator innerResult <- group.map(f).sequence } yield accumulator ::: innerResult } .map(_.to[M]) // # <b><2></b> } } |
- Convert to List here for efficient accumulation of results (and no grouped method on TraversableOnce)
- Convert back to desired collection
1 2 |
val futResult : Future[List[Unit]] = someInts.mapAsync(3)(svc.doSomething _) val result : Unit = futResult.get |
Much better! My code is now simple, readable, idiomatic, doesn’t discard exceptions, doesn’t flood the executor with futures and returns a future to the caller!
Further Exploration: Problems with Pattern 1
Pattern 1 solves the problem of discarding exceptions, regulating the flow of “hot” futures and returning a future to callers, but it isn’t the most efficient way of handling this problem. Because it has to wait for each group to complete, one of the svc.doSomething calls could take an extra long time. If it does, even though the other futures in its group have completed, I have to wait for that one long call to complete before moving on to the next group. Ideally, I should make it so that there are always N futures running simultaneously instead of grouping them. Work for another day!
Sources
- http://docs.scala-lang.org/overviews/core/futures.html
- https://github.com/scalaz/scalaz
- http://stackoverflow.com/questions/8736164/what-are-type-lambdas-in-scala-and-what-are-their-benefits
- http://stackoverflow.com/questions/6750609/list-of-options-equivalent-of-sequence-in-scala
- http://www.artima.com/weblogs/viewpost.jsp?thread=179766
- http://docs.scala-lang.org/overviews/core/value-classes.html
- http://stackoverflow.com/questions/12792595/how-to-convert-this-map-flatmap-into-a-for-comprehension-in-scala