Versions
- Scala 2.11.0
- s_mach.concurrent 0.1
Overview
s_mach.concurrent is an open-source Scala utility library that extends the standard scala.concurrent library. It adds new types and functions for easily controlling concurrent execution flow and helps to fix a fundamental flaw in the standard scala.concurrent.Future implementation.
Imports
All code examples assume the following imports:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import scala.util._ import scala.concurrent._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import s_mach.concurrent._ import s_mach.concurrent.util._ case class Item(id: String, value: Int, relatedItemId: String) def read(id: String) : Future[Item] = Future { Thread.sleep(1000); println(id); Item(id,id.toInt,(id.toInt+1).toString) } def readFail(id: String) : Future[Item] = Future { Thread.sleep(1000); println(id); throw new RuntimeException(id.toString) } def longRead(id: String) : Future[Item] = Future { Thread.sleep(2000); println(id); Item(id,id.toInt,(id.toInt+1).toString) } def write(id: String, item: Item) : Future[Boolean] = Future { Thread.sleep(1000); println(id); true } def writeFail(id: String, item: Item) : Future[Boolean] = Future { Thread.sleep(1000); println(id); throw new RuntimeException(id.toString) } |
Concurrently
When first using Future with a for-comprehension, it is natural to assume the following will produce concurrent operation:
1 2 3 4 5 |
for { i1 <- read("1") i2 <- read("2") i3 <- read("3") } yield (i1,i2,i3) |
Sadly, this code will compile and run just fine, but it will not execute concurrently. To correctly implement concurrent operation, the following standard pattern is used:
1 2 3 4 5 6 7 8 9 10 |
val f1 = read("1") val f2 = read("2") val f3 = read("3") val future = { // necessary for pasting into repl for { i1 <- f1 i2 <- f2 i3 <- f3 } yield (i1,i2,i3) } |
To get concurrent operation, all of the futures must be started before the for-comprehension. The for-comprehension is a monadic workflow. It captures commands that must take place in a specific sequential order. The pattern in example 2 is necessary because Scala lacks an applicative workflow: a workflow that captures commands that may be run in any order. s_mach.concurrent adds an applicative workflow method for futures: concurrently. This method can more concisely express the pattern above:
1 2 3 |
for { (i1,i2,i3) <- concurrently(read("1"), read("2"), read("3")) } yield (i1,i2,i3) |
In the example above, all futures are started at the same time and fed to the concurrently method. The method returns a Future[(Int,Int,Int)] which completes once all supplied futures complete. After this returned Future completes, the tuple value results can be extracted using normal Scala idioms. The concurrently method also fixes problems with scala.concurrent exception handling (see the Under the hood: Merge section below).
Transforming and traversing collections serially and concurrently
A common task when working with futures is either transforming or traversing a collection that will call a method that returns a future. The standard idiom for performing this task only provides methods for concurrent operation and, with enough nesting, leads to difficult to read code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList val future = { // necessary for pasting into repl for { oomItem <- { println("Reading...") oomItemIdBatch // Serially perform read of each batch .foldLeft(Future.successful(List[Item]())) { (facc, idBatch) => for { acc <- facc // Concurrently read batch oomItem <- Future.sequence(idBatch.map(read)) } yield acc ::: oomItem } } _ = println("Computing...") oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(2).toList oomResult <- { println("Writing...") oomNewItemBatch // Serially perform write of each batch .foldLeft(Future.successful(List[Boolean]())) { (facc, itemBatch) => for { acc <- facc // Concurrently write batch oomResult <- Future.sequence(itemBatch.map(item => write(item.id, item))) } yield acc ::: oomResult } } } yield oomResult.forall(_ == true) } |
The same code, rewritten using s_mach.concurrent:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList val future = { // necessary for pasting into repl for { oomItem <- { println("Reading...") oomItemIdBatch.serially.flatMap(_.concurrently.map(read)) } _ = println("Computing...") oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(10).toVector oomResult <- { println("Writing...") oomNewItemBatch.serially.flatMap(_.concurrently.map(item => write(item.id, item))) } } yield oomResult.forall(_ == true) } |
Transforming and traversing collections using workers
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList val future = { // necessary for pasting into repl for { oomItem <- { println("Reading...") oomItemIdBatch.workers(2).flatMap(_.workers(4).map(read)) } _ = println("Computing...") oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(10).toVector oomResult <- { println("Writing...") oomNewItemBatch.workers(2).flatMap(_.workers(4).map(item => write(item.id, item))) } } yield oomResult.forall(_ == true) } |
Under the hood: Merge method
Powering both the general concurrently method and the collection .concurrently.map, .concurrently.flatMap and .concurrently.foreach methods are the merge and flatMerge methods. The merge method performs the same function as Future.sequence (it calls Future.sequence internally) but it ensures that the returned future completes immediately after an exception occurs in any of the futures. Because Future.sequence waits on all futures in left to right order before completing, an exception thrown at the beginning of the computation by a future at the far right will not be detected until after all other futures have completed. For long running computations, this can mean a significant amount of wasted time waiting on futures to complete whose results will be discarded. Also, while the scala parallel collections correctly handle multiple concurrent exceptions, Future.sequence only returns the first exception encountered. In Future.sequence, all further exceptions past the first are discarded. The merge and flatMerge methods fixes these problems by throwing ConcurrentThrowable. ConcurrentThrowable has a member method to access both the first exception thrown and a future of all exceptions thrown during the computation.
1 2 3 4 5 6 7 8 |
scala> val t = Future.sequence(Vector(longRead("1"),readFail("2"),readFail("3"),read("4"))).getTry 3 4 2 1 t: scala.util.Try[scala.collection.immutable.Vector[Item]] = Failure(java.lang.RuntimeException: 2) scala> |
1 2 3 4 5 6 7 8 9 10 |
scala> val t = Vector(longRead("1"),readFail("2"),readFail("3"),read("4")).merge.getTry 2 t: scala.util.Try[scala.collection.immutable.Vector[Item]] = Failure(ConcurrentThrowable(java.lang.RuntimeException: 2)) 3 scala> 4 1 scala> val allFailures = t.failed.get.asInstanceOf[ConcurrentThrowable].futAllFailure.get allFailures: Vector[Throwable] = Vector(java.lang.RuntimeException: 2, java.lang.RuntimeException: 3) |
Concurrent Semaphore
TODO
1 2 3 4 5 6 7 8 9 10 |
val s = Semaphore(10) val promise = Promise[Int]() val f1 = s.acquire(10) { () => println(1);promise.future } val f2 = s.acquire(8) { () => println(2);Thread.sleep(1000);2.future } val f3 = s.acquire(2) { () => println(3);Thread.sleep(1000);3.future } val f4 = s.acquire(1) { () => println(4);4.future } promise.success(1) |
Concurrent Lock
TODO
1 2 3 4 5 6 7 8 |
val lock = Lock() val promise = Promise[Int]() val f1 = lock { () => println("1");promise.future } val f2 = lock { () => println("2");2.future } val f3 = lock { () => println("3");3.future } promise.success(1) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
scala> :paste // Entering paste mode (ctrl-D to finish) val lock = Lock() val promise = Promise[Int]() val f1 = lock { () => println(1);promise.future } val f2 = lock { () => println(2);2.future } val f3 = lock { () => println(3);3.future } // Exiting paste mode, now interpreting. 1 lock: s_mach.concurrent.util.Lock = s_mach.concurrent.util.Lock$LockImpl@2578c32a promise: scala.concurrent.Promise[Int] = scala.concurrent.impl.Promise$DefaultPromise@1b550d9e f1: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@1b550d9e f2: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@6cf10e16 f3: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@45d3fe3a scala> promise.success(1) 2 res2: promise.type = scala.concurrent.impl.Promise$DefaultPromise@37af4505 3 |
ConcurrentQueue
s_mach.concurrent provides a basic concurrent queue trait ConcurrentQueue that allows for asynchronous buffering operations, including operations on collections of items. Currently only one implementation, ConcurrentListQueue is provided.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
val q = new ConcurrentListQueue[String]() // Completes when input is available val f1 = q.poll() f1 foreach println // Completes the future above q.offer("some input") // Complete when all input is available val f2 = q.poll(3) f2 foreach println // Completes the future above q.offer("input1") q.offer(List("input2","input3","input4")) // If input is available now, future completes immediately: q.poll() foreach println |
Utility methods
s_mach.concurrent provides a few utility methods for writing more concise and DRY code when working with Future:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
val fr : Future[Item] = read("1") // Convert a Future[A] to Future[Try[A]] that always succeeds val tt : Future[Try[Item]] = fr.toTry // Convert a Future[A] to a Future[X] that always succeeds val f : Future[String] = fr.fold({ i:Item => i.id.toString }, { t:Throwable => t.toString}) // Convert a Future[A] to a Future[Future[X]] that is flattened to Future[X] and that always succeeds val f : Future[String] = fr.flatFold( { i:Item => Future.successful(i.id.toString) }, { t:Throwable => Future.successful(t.toString) } ) // Throw away the result, but ensure exceptions are reported to ExecutionContext.reportFailure fr.discard |
Sugar methods
s_mach.concurrent also provides a number of syntatic-sugar methods for writing more concise and DRY code when working with Future:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
val vf = Vector(read("1"), read("2"), read("3")) val fr = read("4") // Future.sequence(vf) val fv : Future[Vector[Item] = vf.sequence // Await.result(fr, Duration.Inf) val g : Item = fr.get // Await.result(fr, 5.seconds) val g : Int = fr.get(5.seconds) // Await.ready(fr, Duration.Inf).value.get val gt : Future[Try[Int]] = fr.getTry // Await.ready(fr, Duration.Inf).value.get val gt : Future[Try[Int]] = fr.getTry(5.seconds) |