Category Archives: s_mach

s_mach.concurrent: Futures utility library

Versions

  1. Scala 2.11.0
  2. s_mach.concurrent 0.1

Overview

s_mach.concurrent is an open-source Scala utility library that extends the standard scala.concurrent library. It adds new types and functions for easily controlling concurrent execution flow and helps to fix a fundamental flaw in the standard scala.concurrent.Future implementation.

Imports

All code examples assume the following imports:

Concurrently

When first using Future with a for-comprehension, it is natural to assume the following will produce concurrent operation:

Example 1: Incorrect Future concurrency

Sadly, this code will compile and run just fine, but it will not execute concurrently. To correctly implement concurrent operation, the following standard pattern is used:

Example 2: Correct Future concurrency:

To get concurrent operation, all of the futures must be started before the for-comprehension. The for-comprehension is a monadic workflow. It captures commands that must take place in a specific sequential order. The pattern in example 2 is necessary because Scala lacks an applicative workflow: a workflow that captures commands that may be run in any order. s_mach.concurrent adds an applicative workflow method for futures: concurrently. This method can more concisely express the pattern above:

Example 3: New concurrently method

In the example above, all futures are started at the same time and fed to the concurrently method. The method returns a Future[(Int,Int,Int)] which completes once all supplied futures complete. After this returned Future completes, the tuple value results can be extracted using normal Scala idioms. The concurrently method also fixes problems with scala.concurrent exception handling (see the Under the hood: Merge section below).

Transforming and traversing collections serially and concurrently

A common task when working with futures is either transforming or traversing a collection that will call a method that returns a future. The standard idiom for performing this task only provides methods for concurrent operation and, with enough nesting, leads to difficult to read code:

Example 4: Transform and traverse collections, standard method

The same code, rewritten using s_mach.concurrent:

Example 5: Using s_mach.concurrent to serially or concurrently transform and traverse collections:

Transforming and traversing collections using workers

Example 6: Using s_mach.concurrent workers to transform and traverse collections:

Under the hood: Merge method

Powering both the general concurrently method and the collection .concurrently.map, .concurrently.flatMap and .concurrently.foreach methods are the merge and flatMerge methods. The merge method performs the same function as Future.sequence (it calls Future.sequence internally) but it ensures that the returned future completes immediately after an exception occurs in any of the futures. Because Future.sequence waits on all futures in left to right order before completing, an exception thrown at the beginning of the computation by a future at the far right will not be detected until after all other futures have completed. For long running computations, this can mean a significant amount of wasted time waiting on futures to complete whose results will be discarded. Also, while the scala parallel collections correctly handle multiple concurrent exceptions, Future.sequence only returns the first exception encountered. In Future.sequence, all further exceptions past the first are discarded. The merge and flatMerge methods fixes these problems by throwing ConcurrentThrowable. ConcurrentThrowable has a member method to access both the first exception thrown and a future of all exceptions thrown during the computation.

Example 7: Future.sequence gets stuck waiting on longRead to complete and only returns the first exception:

Example 8: merge method fails immediately on the first exception and throws ConcurrentThrowable, which can retrieve all exceptions:

Concurrent Semaphore

TODO

Example 9: Semaphore

Concurrent Lock

TODO

Example 9: Lock

ConcurrentQueue

s_mach.concurrent provides a basic concurrent queue trait ConcurrentQueue that allows for asynchronous buffering operations, including operations on collections of items. Currently only one implementation, ConcurrentListQueue is provided.

Example 11: ConcurrentListQue

Utility methods

s_mach.concurrent provides a few utility methods for writing more concise and DRY code when working with Future:

Example 12: Utility methods

Sugar methods

s_mach.concurrent also provides a number of syntatic-sugar methods for writing more concise and DRY code when working with Future:

Example 13: Sugar methods

s_mach: A Scala open-source state machine library

Versions

  • Scala 2.11.0
  • SBT 0.13.1

Introduction

s_mach1 is an open-source Scala library designed to support creation of functional, composable, streamable and recoverable state machines. s_mach state machines can be composed together to form larger state machines or connected together to form a processing pipeline plan that can be run to produce a final value. s_mach state machines also support a generic error recovery system (by way of the Halted state) that may be used to report errors encountered during processing by any composed or connected s_mach state machine. Additionally, when an error is encountered, s_mach state machines may provide an optional recovery method for fixing or ignoring broken input. While running a composed or connected s_mach state machine, the client is provided with the error message and empowered with the decision of whether to proceed with processing despite any errors.

In s_mach, a state machine is defined as an object that has an initial State, s0:

A State is a class that represents the state of a state machine at some point in processing. The State class is never instantiated directly, instead one of the three basic sub-types are created:

  1. Continuation: represents the state of a state machine that requires more input to continue processing.
  2. Success: represents the state of a state machine that has completed processing and which resulted in a single value.
  3. Halted: represents the state of a state machine that has performed some processing but did not successfully complete that processing and which might be recoverable

Transitions

When input is applied to a Continuation, a Transition derived object is created which has the next state of the state machine and any output produced by processing the input by the Continuation:

The DoneTransition is a sub-type of Transition returned when EndOfInput (EOI) is applied to a Continuation state. A DoneTransition state can only return Done states: Halted or Success. A DoneTransition may also have overflow, input that was not consumed during processing.

Each of the basic State derived types has a corresponding Transition derived type:

Note
Transition derived objects are verbs and their State derived equivalents are nouns.

Type-parameters

All s_mach state machine classes ( StateMachine, State, Transition, etc) accept three type-parameters:

  1. I: The type of input consumed by the machine during processing (may be Unit)
  2. O: The type of output produced by the machine after each transition (may be Unit)
  3. A: The final value type produced by the machine once processing has completed successfully (also may be Unit)

StateMachine instances are always connected input to output, with the first state machine typically accepting Unit. Only the final value type of the last state machine is passed on when connecting StateMachine instances:

Type-Aliases

To make working with s_mach state machines easier, type-aliases are provided based on the state machine’s purpose:

An Enumerator produces output in chunks by stepping the enumerator:

An Iteratee consumes input in chunks to eventually yield a final single value:

A Transformer transforms input chunks into output chunks of the same or a different type:

A Plan to stream input from an Enumerator to an Iteratee by way of 0 or more Transformers and eventually produce a final single value: