2.2. Programming Idioms

In this section we give Orc implementations of some standard idioms from concurrent and functional programming. Despite the austerity of Orc's four combinators, we are able to encode a variety of idioms straightforwardly.

2.2.1. Channels

Orc has no communication primitives like pi-calculus channels[1] or Erlang mailboxes[2]. Instead, it makes use of sites to create channels of communication.

The most frequently used of these sites is Buffer. When called, it publishes a new asynchronous FIFO channel. That channel is a site with two methods: get and put. The call c.get() takes the first value from channel c and publishes it, or blocks waiting for a value if none is available. The call c.put(v) puts v as the last item of c and publishes a signal.

A channel may be closed to indicate that it will not be sent any more values. If the channel c is closed, c.put(v) always halts (without modifying the state of the channel), and c.get() halts once c becomes empty. The channel c may be closed by calling either c.close(), which returns a signal once c becomes empty, or c.closenb(), which returns a signal immediately.

2.2.2. Lists

In the section on Cor, we were introduced to lists: how to construct them, and how to match them against patterns. While it is certainly feasible to write a specific function with an appropriate pattern match every time we want to access a list, it is helpful to have a handful of common operations on lists and reuse them.

One of the most common uses for a list is to send each of its elements through a sequential combinator. Since the list itself is a single value, we want to walk through the list and publish each one of its elements in parallel as a value. The library function each does exactly that.

Suppose we want to send the message invite to each email address in the list inviteList:

each(inviteList) >address> Email(address, invite)

Orc also adopts many of the list idioms of functional programming. The Orc library contains definitions for most of the standard list functions, such as map and fold. Many of the list functions internally take advantage of concurrency to make use of any available parallelism; for example, the map function dispatches all of the mapped calls concurrently, and assembles the result list once they all return using a fork-join.

2.2.3. Streams

Sometimes a source of data is not explicitly represented by a list or other data structure. Instead, it is made available through a site, which returns the values one at a time, each time it is called. We call such a site a stream. It is analogous to an iterator in a language like Java. Functions can also be used as streams, though typically they will not be pure functions, and should only return one value. A call to a stream may halt, to indicate that the end of the data has been reached, and no more values will become available. It is often useful to detect the end of a stream using the otherwise combinator.

Streams are common enough in Orc programming that there is a library function to take all of the available publications from a stream; it is called repeat, and it is analogous to each for lists.

def repeat(f) = f() >x> (x | repeat(f))

The repeat function calls the site or function f with no arguments, publishes its return value, and recurses to query for more values. repeat should be used with sites or functions that block until a value is available. Notice that if any call to f halts, then repeat(f) consequently halts.

For example, it is very easy to treat a channel c as a stream, reading any values put on the channel as they become available:

repeat(c.get)

2.2.4. Arrays

While lists are a very useful data structure, they are not indexed; it is not possible to get the nth element of a list in constant time. However, this capability is often needed in practice, so the Orc standard library provides a function IArray to create immutable arrays. Once initialized, an immutable array cannot be rewritten; it can only be read.

IArray takes two arguments: an array size, and a function to initialize the array. The function takes the index being initialized as an argument (indices start at 0), and publishes the value to be stored at that array position. Here are a few examples:

{- Create an array of 10 elements; element i is the ith power of 2 -}
IArray(10, lambda(i) = 2 ** i)
{- Create an array of 5 elements; each element is a newly created buffer -}
IArray(5, lambda(_) = Buffer())

The array is used like a function; the call A(i) returns the ith element of the array A. A call with an out-of-bounds index halts.

{- Create an array of 3 buffers -}
val A = IArray(10, lambda(_) = Buffer())

{- Send true on the 0th channel
   and listen for a value on the 0th channel. -}
A(0).put(true) | A(0).get()

Since arrays are accessed by index, there is a library function specifically designed to make programming with indices easier. The function upto(n) publishes all of the numbers from 0 to n-1 simultaneously; thus, it is very easy to access all of the elements of an array simultaneously. Suppose we have an array A of n email addresses and would like to send the message m to each one.

upto(n) >i> A(i) >address> Email(address, m)

2.2.5. Mutable Storage

Variables in Orc are immutable. There is no assignment operator, and there is no way to change the value of a bound variable. However, it is often useful to have mutable state when writing certain algorithms. The Orc library contains two sites that offer simple mutable storage: Ref and Cell.

The Ref site creates rewritable reference cells.

val r = Ref(0)
println(r.read()) >>
r.write(2) >> 
println(r.read()) >>
stop

These are very similar to ML's ref cells. r.write(v) stores the value v in the reference r, overwriting any previous value, and publishes a signal. r.read() publishes the current value stored in r.

However, unlike in ML, a reference cell can be left initially empty by calling Ref with no arguments. A read operation on an empty cell blocks until the cell is written.

{- Create a cell, and wait 1 second before initializing it. -}
{- The read operation blocks until the write occurs. -}
val r = Ref()
r.read() | Rtimer(1000) >> r.write(1) >> stop

The Orc library also offers write-once reference cells, using the Cell site. A write-once cell has no initial value. Read operations block until the cell has been written. A write operation succeeds only if the cell is empty; subsequent write operations simply halt.

{- Create a cell, try to write to it twice, and read it -}
{- The read will block until a write occurs
   and only one write will succeed. -}
val r = Cell()
  Rtimer(1000) >> r.write(2) >> println("Wrote 2") >> stop
| Rtimer(1000) >> r.write(3) >> println("Wrote 3") >> stop
| r.read()
Write-once cells are very useful for concurrent programming, and they are often safer than rewritable reference cells, since the value cannot be changed once it has been written. The use of write-once cells for concurrent programming is not a new idea; they have been studied extensively in the context of the Oz programming language.

A word of caution: References, cells, and other mutable objects may be accessed concurrently by many different parts of an Orc program, so race conditions may arise.

2.2.5.1. Reference Syntax

Orc provides syntactic sugar for reading and writing mutable storage:

  • x? is equivalent to x.read(). This operator is of equal precedence with the dot operator and function application, so you can write things like x.y?.v?. This operator is very similar to the C languages's * operator, but is postfix instead of prefix.
  • x := y is equivalent to x.write(y). This operator has higher precedence than the concurrency combinators and if/then/else, but lower precedence than any of the other operators.

Here is a previous example rewritten using this syntactic sugar:

{- Create a cell, try to write to it twice, and read it -}
{- The read will block until a write occurs
   and only one write will succeed. -}
val r = Cell()
  Rtimer(1000) >> r := 2 >> println("Wrote 2") >> stop
| Rtimer(1000) >> r := 3 >> println("Wrote 3") >> stop
| r?

2.2.6. Loops

Orc does not have any explicit looping constructs. Most of the time, where a loop might be used in other languages, Orc programs use one of two strategies:

  1. When the iterations of the loops can occur in parallel, write an expression that expands the data into a sequence of publications, and use a sequential operator to do something for each publication. This is the strategy that uses functions like each, repeat, and upto.
  2. When the iterations of the loops must occur in sequence, write a tail recursive function that iterates over the data. Any loop can be rewritten as a tail recursion. Typically the data of interest is in a list, so one of the standard list functions, such as foldl, applies. The library also defines a function while, which handles many of the common use cases of while loops.

2.2.7. Parallel Matching

Matching a value against multiple patterns, as we have seen it so far, is a linear process, and requires a def whose clauses have patterns in their argument lists. Such a match is linear; each pattern is tried in order until one succeeds.

What if we want to match a value against multiple patterns in parallel, executing every clause that succeeds? Fortunately, this is very easy to do in Orc. Suppose we have an expression F which publishes pairs of integers, and we want to publish a signal for each 3 that occurs.

We could write:

F >(x,y)>
  ( if(x=3) >> signal
  | if(y=3) >> signal ) 

But there is a more general alternative:
F >x>
  ( x >(3,_)> signal
  | x >(_,3)> signal ) 

The interesting case is the pair (3,3), which is counted twice because both patterns match it in parallel.

This parallel matching technique is sometimes used as an alternative to pattern matching using function clauses, but only when the patterns are mutually exclusive.

For example,

def helper([]) = 0
def helper([_]) = 1
def helper(_:_:_) = 2
helper([4,6])
is equivalent to
[4,6] >x>
  x >[]> 0
| x >[_]> 1
| x >_:_:_> 2
whereas
def helper([]) = 0
def helper([_]) = 1
def helper(_) = 2
helper([5])
is not equivalent to
[5] >x>
  x >[]> 0
| x >[_]> 1
| x >_> 2
because the clauses are not mutually exclusive. Function clauses must attempt to match in linear order, whereas this expression matches all of the patterns in parallel. Here, it will match [5] two different ways, publishing both 1 and 2.

2.2.8. Fork-join

One of the most common concurrent idioms is a fork-join: run two processes concurrently, and wait for a result from each one. This is very easy to express in Orc. Whenever we write a val declaration, the process computing that value runs in parallel with the rest of the program. So if we write two val declarations, and then form a tuple of their results, this performs a fork-join.

val x = F
val y = G
(x,y)

Fork-joins are a fundamental part of all Orc programs, since they are created by all nested expression translations. In fact, the fork-join we wrote above could be expressed even more simply as just:

(F,G)

2.2.8.1. Example: Machine initialization

In Orc programs, we often use fork-join and recursion together to dispatch many tasks in parallel and wait for all of them to complete. Suppose that given a machine m, calling m.init() initializes m and then publishes a signal when initialization is complete. The function initAll initializes a list of machines.

def initAll([]) = signal
def initAll(m:ms) = ( m.init() , initAll(ms) ) >> signal

For each machine, we fork-join the initialization of that machine (m.init()) with the initialization of the remaining machines (initAll(ms)). Thus, all of the initializations proceed in parallel, and the function returns a signal only when every machine in the list has completed its initialization.

Note that if some machine fails to initialize, and does not return a signal, then the initialization procedure will never complete.

2.2.8.2. Example: Simple parallel auction

We can also use a recursive fork-join to obtain a value, rather than just signaling completion. Suppose we have a list of bidders in a sealed-bid, single-round auction. Calling b.ask() requests a bid from the bidder b. We want to ask for one bid from each bidder, and then return the highest bid. The function auction performs such an auction for a list of bidders (max finds the maximum of its arguments):

def auction([]) = 0
def auction(b:bs) = max(b.ask(), auction(bs))

Note that all bidders are called simultaneously. Also note that if some bidder fails to return a bid, then the auction will never complete. Later we will see a different solution that addresses the issue of non-termination.

2.2.8.3. Example: Barrier synchronization

Consider an expression of the following form, where F and G are expressions and M and N are sites:

M()  >x>  F | N()  >y>  G

Suppose we would like to synchronize F and G, so that both start executing at the same time, after both M() and N() respond. This is easily done using the fork-join idiom. In the following, we assume that x does not occur free in G, nor y in F.

( M() , N() )  >(x,y)>  ( F | G )

2.2.9. Sequential Fork-Join

Previous sections illustrate how Orc can use the fork-join idiom to process a fixed set of expressions or a list of values. Suppose that instead we wish to process all the publications of an expression F, and once this processing is complete, execute some expression G. For example, F publishes the contents of a text file, one line at a time, and we wish to print each line to the console using the site println, then publish a signal after all lines have been printed.

Sequential composition alone is not sufficient, because we have no way to detect when all of the lines have been processed. A recursive fork-join solution would require that the lines be stored in a traversable data structure like a list, rather than streamed as publications from F. A better solution uses the ; combinator to detect when processing is complete:

F >x> println(x) >> stop ; signal

Since ; only evaluates its right side if the left side does not publish, we suppress the publications on the left side using stop. Here, we assume that we can detect when F halts. If, for example, F is publishing the lines of the file as it receives them over a socket, and the sending party never closes the socket, then F never halts and no signal is published.

2.2.10. Priority Poll

The otherwise combinator is also useful for trying alternatives in sequence. Consider an expression of the form F0 ; F1 ; F2 ; .... If Fi does not publish and halts, then Fi+1 is executed. We can think of the Fi's as a series of alternatives that are explored until a publication occurs.

Suppose that we would like to poll a list of buffers for available data. The list of buffers is ordered by priority. The first buffer in the list has the highest priority, so it is polled first. If it has no data, then the next buffer is polled, and so on.

Here is a function which polls a prioritized list of buffers in this way. It publishes the first item that it finds, removing it from the originating buffer. If all buffers are empty, the function halts. We use the getnb ("get non-blocking") method of the buffer, which retrieves the first available item if there is one, and halts otherwise.

def priorityPoll([]) = stop
def priorityPoll(b:bs) = b.getnb() ; priorityPoll(bs)

2.2.11. Parallel Or

``Parallel or'' is a classic idiom of parallel programming. The ``parallel or'' operation executes two expressions F and G in parallel, each of which may publish a single boolean, and returns the disjunction of their publications as soon as possible. If one of the expressions publishes true, then the disjunction is true, so it is not necessary to wait for the other expression to publish a value. This holds even if one of the expressions is silent.

The ``parallel or'' of expressions F and G may be expressed in Orc as follows:

let(
    val a = F
    val b = G

      (a || b)
    | if(a) >> true 
    | if(b) >> true 
)

The expression (a || b) waits for both a and b to become available and then publishes their disjunction. However if either a or b is true we can publish true immediately regardless of whether the other variable is available. Therefore we run if(a) >> true and if(b) >> true in parallel to wait for either variable to become true and immediately publish the result true. Since more than one of these expressions may publish true, the surrounding let(...) is necessary to select and publish only the first result.

2.2.12. Timeout

Timeout, the ability to execute an expression for at most a specified amount of time, is an essential ingredient of fault-tolerant and distributed programming. Orc accomplishes timeout using pruning and the Rtimer site. The following program runs F for at most one second, publishing its result if available and the value 0 otherwise.

let( F | Rtimer(1000) >> 0 )

2.2.12.1. Auction with timeout

In the auction example given previously, the auction may never complete if one of the bidders does not respond. We can add a timeout so that a bidder has at most 8 seconds to provide a bid:

def auction([]) = 0
def auction(b:bs) = 
  val bid = b.ask() | Rtimer(8000) >> 0
  max(bid, auction(bs))

This version of the auction is guaranteed to complete within 8 seconds.

2.2.12.2. Detecting timeout

Sometimes, rather than just yielding a default value, we would like to determine whether an expression has timed out, and if so, perform some other computation. To detect the timeout, we pair the result of the original expression with true and the result of the timer with false. Thus, if the expression does time out, then we can distinguish that case using the boolean value.

Here, we run expression F with a time limit t. If it publishes within the time limit, we bind its result to r and execute G. Otherwise, we execute H.

val (r, b) = (F, true) | (Rtimer(t), false)
if b then G else H
Instead of using a boolean and conditional, we could use pattern matching:
val s = Some(F) | Rtimer(t) >> None()
  s >Some(r)> G
| s >None()>  H

2.2.13. Priority

We can use a timer to give a window of priority to one computation over another. In this example, we run expressions F and G concurrently. For one second, F has priority; F's result is published immediately, but G's result is held until the time interval has elapsed. If neither F nor G publishes a result within one second, then the first result from either is published.

val x = F
val y = G
let( y | Rtimer(1000) >> x )

2.2.14. Metronome

A timer can be used to execute an expression repeatedly at regular intervals, for example to poll a service. Recall the definition of metronome from the previous chapter:

def metronome(t) = signal | Rtimer(t) >> metronome()

The following example publishes "tick" once per second and "tock" once per second after an initial half-second delay. The publications alternate: "tick tock tick tock ...". Note that this program is not defined recursively; the recursion is entirely contained within metronome.

  metronome(1000) >> "tick"
| Rtimer(500) >> metronome(1000) >> "tock"

2.2.15. Routing

The Orc combinators restrict the passing of values among their component expressions. However, some programs will require greater flexibility. For example, F <x< G provides F with the first publication of G, but what if F needs the first n publications of G? In cases like this we use channels or other stateful sites to redirect or store publications. We call this technique routing because it involves routing values from one execution to another.

2.2.15.1. Generalizing Termination

The pruning combinator terminates an expression after it publishes its first value. We have already seen how to use pruning just for its termination capability, without binding a variable, using the let site. Now, we use routing to terminate an expression under different conditions, not just when it publishes a value; it may publish many values, or none, before being terminated.

Our implementation strategy is to route the publications of the expression through a channel, so that we can put the expression inside a pruning combinator and still see its publications without those publications terminating the expression.

2.2.15.1.1. Enhanced Timeout

As a simple demonstration of this concept, we construct a more powerful form of timeout: allow an expression to execute, publishing arbitrarily many values (not just one), until a time limit is reached.

val c = Buffer()
repeat(c.get) << 
    F >x> c.put(x) >> stop 
  | Rtimer(1000) >> c.closenb()

This program allows F to execute for one second and then terminates it. Each value published by F is routed through channel c so that it does not terminate F. After one second, Rtimer(1000) responds, triggering the call c.closenb(). The call c.closenb() closes c and publishes a signal, terminating F. The library function repeat is used to repeatedly take and publish values from c until it is closed.

2.2.15.1.2. Test Pruning

We can also decide to terminate based on the values published. This expression executes F until it publishes a negative number, and then terminates it:

val c = Buffer()
repeat(c.get) << 
  F >x> 
    (if x >= 0 
        then c.put(x) >> stop
        else c.closenb())

Each value published by F is tested. If it is non-negative, it is placed on channel c (silently) and read by repeat(c.get). If it is negative, the channel is closed, publishing a signal and causing the termination of F.

2.2.15.1.3. Interrupt

We can use routing to interrupt an expression based on a signal from elsewhere in the program. We set up the expression like a timeout, but instead of waiting for a timer, we wait for the semaphore done to be released. Any call to done.release will terminate the expression (because it will cause done.acquire() to publish), but otherwise F executes as normal and may publish any number of values.

val c = Buffer()
val done = Semaphore(0)
repeat(c.get) <<
    F >x> c.put(x) >> stop
  | done.acquire() >> c.closenb()
2.2.15.1.4. Publication Limit

We can limit an expression to n publications, rather than just one. Here is an expression which executes F until it publishes 5 values, and then terminates it.

val c = Buffer()
val done = Semaphore(0)
def allow(0) = done.release() >> stop
def allow(n) = c.get() >x> (x | allow(n-1))
allow(5) << 
    F >x> c.put(x) >> stop 
  | done.acquire() >> c.closenb()

We use the auxiliary function allow to get only the first 5 publications from the channel c. When no more publications are allowed, allow uses the interrupt idiom to halt F and close c.

2.2.15.2. Non-Terminating Pruning

We can use routing to create a modified version of the pruning combinator. As in F <x< G, we'll run F and G in parallel and make the first value published by G available to F. However instead of terminating G after it publishes a value, we will continue running it, ignoring its remaining publications.

val r = Ref()
(F <x< r.read()) | (G >x> r.write(x))

2.2.15.3. Publication-Agnostic Otherwise

We can also use routing to create a modified version of the otherwise combinator. We'll run F until it halts, and then run G, regardless of whether F published any values or not.

val c = Buffer()
repeat(c.get) | (F >x> c.put(x) >> stop ; c.close() >> G)  
We use c.close() instead of the more common c.closenb() to ensure that G does not execute until all the publications of F have been routed. Recall that c.close() does not return until c is empty.

2.2.16. Interruption

We can write a function interruptible that implements the interrupt idiom to execute any function in an interruptible way. interruptible(g) calls the function g, which is assumed to take no arguments, and silences its publications. It immediately publishes another function, which we can call at any time to terminate the execution of g. For simplicity, we assume that g itself publishes no values.

Here is a naive implementation that doesn't quite work:

def interruptible(f) =
  val done = Semaphore(0)
  done.release
    << f() >> stop 
     | done.acquire() >> c.closenb()

{- wrong! -}
val stopper = interruptible(g)
...

The function interruptible is correct, but the way it is used causes a strange error. The function g executes, but is always immediately terminated! This happens because the val declaration which binds stopper also kills all of the remaining computation in interruptible(g), including the execution of g itself.

The solution is to bind the variable differently:

def interruptible(f) =
  val done = Semaphore(0)
  done.release
    << f() >> stop 
     | done.acquire() >> c.closenb()

interruptible(g) >stopper>
...

This idiom, wherein a function publishes some value that can be used to monitor or control its execution, arises occasionally in Orc programming. When using this idiom, always remember to avoid terminating that execution accidentally. Since Orc is a structured concurrent language, every process is contained with some other process; kill the containing process, and the contained processes die too.

2.2.17. Lifting

It is often useful to explicitly lift an execution, so that it is in some sense protected from being terminated. We can do this by running a "lifter" process, to which we can send functions that will be executed by the lifter and thus will not be terminated unless the lifter itself is terminated. Such a lifter is written by creating a channel, and running a loop which listens for functions to be sent on the channel and executes those functions as they arrive. The lifter publishes only the put method for the channel; the loop itself publishes no values, since the values published by the lifted functions are silenced. Here, we write such a lifter, and then use it to protect a function call from a timeout.
def lifter() =
  val c = Buffer()
  def loop() = c.get() >f> ( f() >> stop | loop() )
  c.put | loop()

def delayedPrint() = Rtimer(1500) >> println("Delayed 1.5 seconds")

lifter() >lift>
(
println("Running...") 
  << Rtimer(1000) | delayedPrint() | lift(delayedPrint)
)

The timeout stops the execution of delayedPrint(), so it does not print a result. However, the lifted execution of delayedPrint does succeed, since it is executing within the loop of lifter(), unaffected by the timeout.

2.2.18. Fold

We consider various concurrent implementations of the classic "list fold" function from functional programming:

def fold(_, [x])  = x
def fold(f, x:xs) = f(x, fold(xs))

This is a seedless fold (sometimes called fold1) which requires that the list be nonempty and uses its first element as a seed. This implementation is short-circuiting --- it may finish early if the reduction operator f does not use its second argument --- but it is not concurrent; no two calls to f can proceed in parallel. However, if f is associative, we can overcome this restriction and implement fold concurrently. If f is also commutative, we can further increase concurrency.

2.2.18.1. Associative Fold

We first consider the case when the reduction operator is associative. We define afold(f,xs) where f is a binary associative function and xs is a non-empty list. The implementation iteratively reduces xs to a single value. Each step of the iteration applies the auxiliary function step, which halves the size of xs by reducing disjoint pairs of adjacent items.

def afold(_, [x]) = x
def afold(f, xs) =
  def step([]) = []
  def step([x]) = [x]
  def step(x:y:xs) = f(x,y):step(xs)
  afold(f, step(xs))

Notice that f(x,y):step(xs) is an implicit fork-join. Thus, the call f(x,y) executes in parallel with the recursive call step(xs). As a result, all calls to f execute concurrently within each iteration of afold.

2.2.18.2. Associative, Commutative Fold

We can make the implementation even more concurrent when the fold operator is both associative and commutative. We define cfold(f,xs), where f is a associative and commutative binary function and xs is a non-empty list. The implementation initially copies all list items into a buffer in arbitrary order using the auxiliary function xfer, counting the total number of items copied. The auxiliary function combine repeatedly pulls pairs of items from the buffer, reduces them, and places the result back in the buffer. Each pair of items is reduced in parallel as they become available. The last item in the buffer is the result of the overall fold.

def cfold(f, xs) =
  val c = Buffer()
  
  def xfer([])    = 0
  def xfer(x:xs)  = c.put(x) >> stop | xfer(xs)+1

  def combine(0) = stop
  def combine(1) =  c.get()
  def combine(m) =  c.get() >x> c.get() >y> 
                    ( c.put(f(x,y)) >> stop | combine(m-1))

  xfer(xs) >n> combine(n)


[1] R. Milner. Communicating and Mobile Systems: the π-Calculus. Cambridge University Press, May 1999.

[2] J. Armstrong, R. Virding, C. Wikstr¨om, and M. Williams. Concurrent programming in ERLANG (2nd ed.). Prentice Hall International (UK) Ltd., Hertfordshire, UK, UK, 1996.