s_mach.concurrent: Futures utility library


  1. Scala 2.11.0
  2. s_mach.concurrent 0.1


s_mach.concurrent is an open-source Scala utility library that extends the standard scala.concurrent library. It adds new types and functions for easily controlling concurrent execution flow and helps to fix a fundamental flaw in the standard scala.concurrent.Future implementation.


All code examples assume the following imports:


When first using Future with a for-comprehension, it is natural to assume the following will produce concurrent operation:

Example 1: Incorrect Future concurrency

Sadly, this code will compile and run just fine, but it will not execute concurrently. To correctly implement concurrent operation, the following standard pattern is used:

Example 2: Correct Future concurrency:

To get concurrent operation, all of the futures must be started before the for-comprehension. The for-comprehension is a monadic workflow. It captures commands that must take place in a specific sequential order. The pattern in example 2 is necessary because Scala lacks an applicative workflow: a workflow that captures commands that may be run in any order. s_mach.concurrent adds an applicative workflow method for futures: concurrently. This method can more concisely express the pattern above:

Example 3: New concurrently method

In the example above, all futures are started at the same time and fed to the concurrently method. The method returns a Future[(Int,Int,Int)] which completes once all supplied futures complete. After this returned Future completes, the tuple value results can be extracted using normal Scala idioms. The concurrently method also fixes problems with scala.concurrent exception handling (see the Under the hood: Merge section below).

Transforming and traversing collections serially and concurrently

A common task when working with futures is either transforming or traversing a collection that will call a method that returns a future. The standard idiom for performing this task only provides methods for concurrent operation and, with enough nesting, leads to difficult to read code:

Example 4: Transform and traverse collections, standard method

The same code, rewritten using s_mach.concurrent:

Example 5: Using s_mach.concurrent to serially or concurrently transform and traverse collections:

Transforming and traversing collections using workers

Example 6: Using s_mach.concurrent workers to transform and traverse collections:

Under the hood: Merge method

Powering both the general concurrently method and the collection .concurrently.map, .concurrently.flatMap and .concurrently.foreach methods are the merge and flatMerge methods. The merge method performs the same function as Future.sequence (it calls Future.sequence internally) but it ensures that the returned future completes immediately after an exception occurs in any of the futures. Because Future.sequence waits on all futures in left to right order before completing, an exception thrown at the beginning of the computation by a future at the far right will not be detected until after all other futures have completed. For long running computations, this can mean a significant amount of wasted time waiting on futures to complete whose results will be discarded. Also, while the scala parallel collections correctly handle multiple concurrent exceptions, Future.sequence only returns the first exception encountered. In Future.sequence, all further exceptions past the first are discarded. The merge and flatMerge methods fixes these problems by throwing ConcurrentThrowable. ConcurrentThrowable has a member method to access both the first exception thrown and a future of all exceptions thrown during the computation.

Example 7: Future.sequence gets stuck waiting on longRead to complete and only returns the first exception:

Example 8: merge method fails immediately on the first exception and throws ConcurrentThrowable, which can retrieve all exceptions:

Concurrent Semaphore


Example 9: Semaphore

Concurrent Lock


Example 9: Lock


s_mach.concurrent provides a basic concurrent queue trait ConcurrentQueue that allows for asynchronous buffering operations, including operations on collections of items. Currently only one implementation, ConcurrentListQueue is provided.

Example 11: ConcurrentListQue

Utility methods

s_mach.concurrent provides a few utility methods for writing more concise and DRY code when working with Future:

Example 12: Utility methods

Sugar methods

s_mach.concurrent also provides a number of syntatic-sugar methods for writing more concise and DRY code when working with Future:

Example 13: Sugar methods

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">