In this section we give Orc implementations of some standard idioms from concurrent and functional programming. Despite the austerity of Orc's four combinators, we are able to encode a variety of idioms straightforwardly.
Orc has no communication primitives like pi-calculus channels[1] or Erlang mailboxes[2]. Instead, it makes use of sites to create channels of communication.
The most frequently used of these sites is Buffer. When called, it
publishes a new asynchronous FIFO channel. That channel is a site with two
methods: get and put. The call c.get()
takes the first value from channel c and publishes it, or blocks
waiting for a value if none is available. The call c.put(v) puts
v as the last item of c and publishes a signal.
A channel may be closed to indicate that it will not be sent any more values.
If the channel c is closed, c.put(v) always halts
(without modifying the state of the channel), and c.get() halts
once c becomes empty. The channel c may be closed by
calling either c.close(), which returns a signal once
c becomes empty, or c.closenb(), which returns a
signal immediately.
In the section on Cor, we were introduced to lists: how to construct them, and how to match them against patterns. While it is certainly feasible to write a specific function with an appropriate pattern match every time we want to access a list, it is helpful to have a handful of common operations on lists and reuse them.
One of the most common uses for a list is to send each of its elements through
a sequential combinator. Since the list itself is a single value, we want
to walk through the list and publish each one of its elements in parallel
as a value. The library function each does exactly that.
Suppose we want to send the message invite to each email
address in the list inviteList:
each(inviteList) >address> Email(address, invite)
Orc also adopts many of the list idioms of functional programming. The Orc library contains definitions
for most of the standard list functions, such as map and fold. Many of the
list functions internally take advantage of concurrency to make use of any available parallelism; for
example, the map function dispatches all of the mapped calls concurrently, and assembles
the result list once they all return using a fork-join.
Sometimes a source of data is not explicitly represented by a list or other data structure. Instead, it is made available through a site, which returns the values one at a time, each time it is called. We call such a site a stream. It is analogous to an iterator in a language like Java. Functions can also be used as streams, though typically they will not be pure functions, and should only return one value. A call to a stream may halt, to indicate that the end of the data has been reached, and no more values will become available. It is often useful to detect the end of a stream using the otherwise combinator.
Streams are common enough in Orc programming that there is a library function to take all of the
available publications from a stream; it is called repeat, and it is analogous to
each for lists.
def repeat(f) = f() >x> (x | repeat(f))
The repeat function calls the site or function f with no arguments,
publishes its return value, and recurses to query for more values. repeat should
be used with sites or functions that block until a value is available. Notice that if any
call to f halts, then repeat(f) consequently halts.
For example, it is very easy to treat a channel c as a stream, reading any
values put on the channel as they become available:
repeat(c.get)
While lists are a very useful data structure, they are not indexed; it is not possible to get the nth element of
a list in constant time. However, this capability is often needed in practice, so the Orc standard library
provides a function IArray to create immutable arrays. Once initialized, an immutable array cannot
be rewritten; it can only be read.
IArray takes two arguments: an array size, and a function to initialize the array. The function
takes the index being initialized as an argument (indices start at 0), and publishes the value to be stored
at that array position. Here are a few examples:
{- Create an array of 10 elements; element i is the ith power of 2 -} IArray(10, lambda(i) = 2 ** i)
{- Create an array of 5 elements; each element is a newly created buffer -} IArray(5, lambda(_) = Buffer())
The array is used like a function; the call A(i) returns the ith element of the
array A. A call with an out-of-bounds index halts.
{- Create an array of 3 buffers -} val A = IArray(10, lambda(_) = Buffer()) {- Send true on the 0th channel and listen for a value on the 0th channel. -} A(0).put(true) | A(0).get()
Since arrays are accessed by index, there is a library function specifically
designed to make programming with indices easier. The function upto(n)
publishes all of the numbers from 0 to
n-1 simultaneously; thus, it is very easy to access all of the elements of
an array simultaneously. Suppose we have an array A of n email
addresses and would like to send the message m to each one.
upto(n) >i> A(i) >address> Email(address, m)
Variables in Orc are immutable. There is no assignment operator, and there is no way to
change the value of a bound variable. However, it is often useful to have mutable state
when writing certain algorithms. The Orc library contains two sites that offer simple
mutable storage: Ref and Cell.
The Ref site creates rewritable reference cells.
val r = Ref(0) println(r.read()) >> r.write(2) >> println(r.read()) >> stop
These are very similar to ML's ref cells. r.write(v) stores
the value v in the reference r, overwriting any previous
value, and publishes a signal. r.read() publishes the current value stored
in r.
However, unlike in ML, a reference cell can be left initially empty by calling Ref
with no arguments. A read operation on an empty cell blocks until the cell is written.
{- Create a cell, and wait 1 second before initializing it. -} {- The read operation blocks until the write occurs. -} val r = Ref() r.read() | Rtimer(1000) >> r.write(1) >> stop
The Orc library also offers write-once reference cells, using the Cell site.
A write-once cell has no initial value. Read operations block until the cell has been
written. A write operation succeeds only if the cell is empty; subsequent write operations
simply halt.
{- Create a cell, try to write to it twice, and read it -} {- The read will block until a write occurs and only one write will succeed. -} val r = Cell() Rtimer(1000) >> r.write(2) >> println("Wrote 2") >> stop | Rtimer(1000) >> r.write(3) >> println("Wrote 3") >> stop | r.read()Write-once cells are very useful for concurrent programming, and they are often safer than rewritable reference cells, since the value cannot be changed once it has been written. The use of write-once cells for concurrent programming is not a new idea; they have been studied extensively in the context of the Oz programming language.
A word of caution: References, cells, and other mutable objects may be accessed concurrently by many different parts of an Orc program, so race conditions may arise.
Orc provides syntactic sugar for reading and writing mutable storage:
x? is equivalent to x.read(). This operator
is of equal precedence with the dot operator and function application, so
you can write things like x.y?.v?. This operator is very similar
to the C languages's * operator, but is postfix instead of prefix.
x := y is equivalent to x.write(y).
This operator has higher precedence than the concurrency combinators and if/then/else,
but lower precedence than any of the other operators.Here is a previous example rewritten using this syntactic sugar:
{- Create a cell, try to write to it twice, and read it -} {- The read will block until a write occurs and only one write will succeed. -} val r = Cell() Rtimer(1000) >> r := 2 >> println("Wrote 2") >> stop | Rtimer(1000) >> r := 3 >> println("Wrote 3") >> stop | r?
Orc does not have any explicit looping constructs. Most of the time, where a loop might be used in other languages, Orc programs use one of two strategies:
each, repeat, and upto.
foldl, applies. The library also defines a
function while, which handles many of the common use cases of
while loops.
Matching a value against multiple patterns, as we have seen it so far, is a linear
process, and requires a def whose clauses have patterns in their
argument lists. Such a match is linear; each pattern is tried in order until
one succeeds.
What if we want to match a value against multiple patterns in parallel, executing every clause that succeeds? Fortunately, this is very easy to do in Orc. Suppose we have an expression F which publishes pairs of integers, and we want to publish a signal for each 3 that occurs.
We could write:
F >(x,y)> ( if(x=3) >> signal | if(y=3) >> signal )But there is a more general alternative:
F >x> ( x >(3,_)> signal | x >(_,3)> signal )The interesting case is the pair
(3,3), which is counted twice
because both patterns match it in parallel.
This parallel matching technique is sometimes used as an alternative to pattern matching using function clauses, but only when the patterns are mutually exclusive.
For example,
def helper([]) = 0 def helper([_]) = 1 def helper(_:_:_) = 2 helper([4,6])is equivalent to
[4,6] >x> x >[]> 0 | x >[_]> 1 | x >_:_:_> 2whereas
def helper([]) = 0 def helper([_]) = 1 def helper(_) = 2 helper([5])is not equivalent to
[5] >x> x >[]> 0 | x >[_]> 1 | x >_> 2because the clauses are not mutually exclusive. Function clauses must attempt to match in linear order, whereas this expression matches all of the patterns in parallel. Here, it will match
[5] two different ways,
publishing both 1 and 2.
One of the most common concurrent idioms is a fork-join: run two processes concurrently,
and wait for a result from each one. This is very easy to express in Orc. Whenever we write a val
declaration, the process computing that value runs in parallel with the rest of the program. So if we write
two val declarations, and then form a tuple of their results, this performs a fork-join.
val x = F val y = G (x,y)
Fork-joins are a fundamental part of all Orc programs, since they are created by all nested expression translations. In fact, the fork-join we wrote above could be expressed even more simply as just:
(F,G)
In Orc programs, we often use fork-join and recursion together to dispatch many tasks in parallel and wait
for all of them to complete. Suppose that given a machine m, calling m.init()
initializes m and then publishes a signal when initialization is complete. The function
initAll initializes a list of machines.
def initAll([]) = signal def initAll(m:ms) = ( m.init() , initAll(ms) ) >> signal
For each machine, we fork-join the initialization of that machine (m.init()) with the initialization
of the remaining machines (initAll(ms)). Thus, all of the initializations proceed in parallel, and
the function returns a signal only when every machine in the list has completed its initialization.
Note that if some machine fails to initialize, and does not return a signal, then the initialization procedure will never complete.
We can also use a recursive fork-join to obtain a value, rather than just signaling completion. Suppose we
have a list of bidders in a sealed-bid, single-round auction. Calling b.ask() requests a bid
from the bidder b. We want to ask for one bid from each bidder, and then return the highest
bid. The function auction performs such an auction for a list of bidders (max
finds the maximum of its arguments):
def auction([]) = 0 def auction(b:bs) = max(b.ask(), auction(bs))
Note that all bidders are called simultaneously. Also note that if some bidder fails to return a bid, then the auction will never complete. Later we will see a different solution that addresses the issue of non-termination.
Consider an expression of the following form, where F and G are expressions and M and N are sites:
M() >x> F | N() >y> G
Suppose we would like to synchronize F and G, so that both start
executing at the same time, after both M() and N() respond. This is easily done
using the fork-join idiom. In the following, we assume that x does not occur
free in G, nor y in F.
( M() , N() ) >(x,y)> ( F | G )
Previous sections illustrate how Orc can use the fork-join idiom to process a
fixed set of expressions or a list of values. Suppose that instead we wish to
process all the publications of an expression F, and once this processing is
complete, execute some expression G. For example, F publishes the contents
of a text file, one line at a time, and we wish to print each line to the
console using the site println, then publish a signal after all lines
have been printed.
Sequential composition alone is not sufficient, because we have no way to
detect when all of the lines have been processed. A recursive fork-join
solution would require that the lines be stored in a traversable data structure
like a list, rather than streamed as publications from F. A better solution
uses the ; combinator to detect when processing is complete:
F >x> println(x) >> stop ; signal
Since ; only evaluates its right side if the left side does not publish,
we suppress the publications on the left side using stop. Here, we
assume that we can detect when F halts. If, for example,
F is publishing the lines of the file as it receives them over a socket,
and the sending party never closes the socket, then F never halts and no
signal is published.
The otherwise combinator is also useful for trying alternatives in sequence. Consider an expression of
the form F0 ; F1 ; F2 ; .... If Fi does not publish
and halts, then Fi+1 is executed. We can think of the Fi's as a series of
alternatives that are explored until a publication occurs.
Suppose that we would like to poll a list of buffers for available data. The list of buffers is ordered by priority. The first buffer in the list has the highest priority, so it is polled first. If it has no data, then the next buffer is polled, and so on.
Here is a function which polls a prioritized list of buffers in this way. It
publishes the first item that it finds, removing it from the originating
buffer. If all buffers are empty, the function halts. We use the getnb ("get non-blocking") method of the buffer, which retrieves the first
available item if there is one, and halts otherwise.
def priorityPoll([]) = stop def priorityPoll(b:bs) = b.getnb() ; priorityPoll(bs)
``Parallel or'' is a classic idiom of parallel programming. The ``parallel or'' operation executes two
expressions F and G in parallel, each of which may publish a single boolean,
and returns the disjunction of their publications as soon as possible.
If one of the expressions publishes true, then the disjunction is true,
so it is not necessary to wait for the other expression to publish a value.
This holds even if one of the expressions is silent.
The ``parallel or'' of expressions F and G may be expressed in Orc as follows:
let( val a = F val b = G (a || b) | if(a) >> true | if(b) >> true )
The expression (a || b) waits for both a and b to become
available and then publishes their disjunction. However if either a or
b is true we can publish true immediately regardless of whether the
other variable is available. Therefore we run if(a) >> true and if(b) >> true
in parallel to wait for either variable to become true and immediately
publish the result true. Since more than one of these expressions may
publish true, the surrounding let(...) is necessary to select and
publish only the first result.
Timeout, the ability to execute an expression for at most a specified
amount of time, is an essential ingredient of fault-tolerant and distributed
programming. Orc accomplishes timeout using pruning and the Rtimer site.
The following program runs F for at most one second, publishing its result if
available and the value 0 otherwise.
let( F | Rtimer(1000) >> 0 )
In the auction example given previously, the auction may never complete if one of the bidders does not respond. We can add a timeout so that a bidder has at most 8 seconds to provide a bid:
def auction([]) = 0 def auction(b:bs) = val bid = b.ask() | Rtimer(8000) >> 0 max(bid, auction(bs))
This version of the auction is guaranteed to complete within 8 seconds.
Sometimes, rather than just yielding a default value, we would like to
determine whether an expression has timed out, and if so, perform some other
computation. To detect the timeout, we pair the result of the original
expression with true and the result of the timer with false.
Thus, if the expression does time out, then we can distinguish that case
using the boolean value.
Here, we run expression F with a time limit t. If it publishes
within the time limit, we bind its result to r and execute G.
Otherwise, we execute H.
val (r, b) = (F, true) | (Rtimer(t), false) if b then G else HInstead of using a boolean and conditional, we could use pattern matching:
val s = Some(F) | Rtimer(t) >> None() s >Some(r)> G | s >None()> H
We can use a timer to give a window of priority to one computation over another. In this example, we run expressions F and G concurrently. For one second, F has priority; F's result is published immediately, but G's result is held until the time interval has elapsed. If neither F nor G publishes a result within one second, then the first result from either is published.
val x = F val y = G let( y | Rtimer(1000) >> x )
A timer can be used to execute an expression repeatedly at regular
intervals, for example to poll a service.
Recall the definition of metronome from the previous chapter:
def metronome(t) = signal | Rtimer(t) >> metronome()
The following example publishes "tick" once per second and "tock" once per
second after an initial half-second delay. The publications alternate: "tick
tock tick tock ...". Note that this program is not defined recursively;
the recursion is entirely contained within metronome.
metronome(1000) >> "tick" | Rtimer(500) >> metronome(1000) >> "tock"
The Orc combinators restrict the passing of values among their component
expressions. However, some programs will require greater
flexibility. For example, F <x< G provides F with the first
publication of G, but what if F needs the first n publications of G?
In cases like this we use channels or other stateful sites to redirect or
store publications. We call this technique routing
because it involves routing values from one execution to another.
The pruning combinator terminates an expression after it publishes its first
value. We have already seen how to use
pruning just for its termination capability, without binding a variable, using
the let site. Now, we use routing to terminate an expression
under different conditions, not just when it publishes a value; it may
publish many values, or none, before being terminated.
Our implementation strategy is to route the publications of the expression through a channel, so that we can put the expression inside a pruning combinator and still see its publications without those publications terminating the expression.
As a simple demonstration of this concept, we construct a more powerful form of timeout: allow an expression to execute, publishing arbitrarily many values (not just one), until a time limit is reached.
val c = Buffer() repeat(c.get) << F >x> c.put(x) >> stop | Rtimer(1000) >> c.closenb()
This program allows F to execute for one second and then terminates it. Each
value published by F is routed through channel c so that it does
not terminate F. After one second, Rtimer(1000) responds,
triggering the call c.closenb(). The call
c.closenb() closes c and publishes a signal,
terminating F. The library function repeat is used to repeatedly
take and publish values from c until it is closed.
We can also decide to terminate based on the values published. This expression executes F until it publishes a negative number, and then terminates it:
val c = Buffer() repeat(c.get) << F >x> (if x >= 0 then c.put(x) >> stop else c.closenb())
Each value published by F is tested. If it is non-negative, it is placed on
channel c (silently) and read by repeat(c.get).
If it is negative, the channel is closed, publishing a signal and causing
the termination of F.
We can use routing to interrupt an expression based on a signal from
elsewhere in the program. We set up the expression like a timeout, but instead
of waiting for a timer, we wait for the semaphore done to be released. Any
call to done.release will terminate the expression (because it will
cause done.acquire() to publish), but otherwise F executes as normal and
may publish any number of values.
val c = Buffer() val done = Semaphore(0) repeat(c.get) << F >x> c.put(x) >> stop | done.acquire() >> c.closenb()
We can limit an expression to n publications, rather than just one. Here is an expression which executes F until it publishes 5 values, and then terminates it.
val c = Buffer() val done = Semaphore(0) def allow(0) = done.release() >> stop def allow(n) = c.get() >x> (x | allow(n-1)) allow(5) << F >x> c.put(x) >> stop | done.acquire() >> c.closenb()
We use the auxiliary function allow to get only the first 5
publications from the channel c. When no more publications are allowed,
allow uses the interrupt idiom to halt F and close c.
We can use routing to create a modified version of the pruning combinator.
As in F <x< G, we'll run F and G in parallel and make the first
value published by G available to F. However instead of terminating G after
it publishes a value, we will continue running it, ignoring its remaining
publications.
val r = Ref() (F <x< r.read()) | (G >x> r.write(x))
We can also use routing to create a modified version of the otherwise combinator. We'll run F until it halts, and then run G, regardless of whether F published any values or not.
val c = Buffer() repeat(c.get) | (F >x> c.put(x) >> stop ; c.close() >> G)We use
c.close() instead of the more common c.closenb()
to ensure that G does not execute until all the publications of F have been
routed. Recall that c.close() does not return until c is
empty.
We can write a function interruptible that implements the interrupt idiom
to execute any function in an interruptible way. interruptible(g)
calls the function g, which is assumed to take no arguments, and
silences its publications. It immediately publishes another function, which we
can call at any time to terminate the execution of g. For simplicity,
we assume that g itself publishes no values.
Here is a naive implementation that doesn't quite work:
def interruptible(f) = val done = Semaphore(0) done.release << f() >> stop | done.acquire() >> c.closenb() {- wrong! -} val stopper = interruptible(g) ...
The function interruptible is correct, but the way it is used causes a strange error.
The function g executes, but is always immediately terminated! This happens because the
val declaration which binds stopper also kills all of the remaining
computation in interruptible(g), including the execution of g itself.
The solution is to bind the variable differently:
def interruptible(f) = val done = Semaphore(0) done.release << f() >> stop | done.acquire() >> c.closenb() interruptible(g) >stopper> ...
This idiom, wherein a function publishes some value that can be used to monitor or control its execution, arises occasionally in Orc programming. When using this idiom, always remember to avoid terminating that execution accidentally. Since Orc is a structured concurrent language, every process is contained with some other process; kill the containing process, and the contained processes die too.
def lifter() = val c = Buffer() def loop() = c.get() >f> ( f() >> stop | loop() ) c.put | loop() def delayedPrint() = Rtimer(1500) >> println("Delayed 1.5 seconds") lifter() >lift> ( println("Running...") << Rtimer(1000) | delayedPrint() | lift(delayedPrint) )The timeout stops the execution of
delayedPrint(), so it does not print a result.
However, the lifted execution of delayedPrint does succeed, since it is executing
within the loop of lifter(), unaffected by the timeout.
We consider various concurrent implementations of the classic "list fold" function from functional programming:
def fold(_, [x]) = x def fold(f, x:xs) = f(x, fold(xs))
This is a seedless fold (sometimes called fold1) which requires that the
list be nonempty and uses its first element as a seed. This implementation is
short-circuiting --- it may finish early if the reduction operator f does
not use its second argument --- but it is not concurrent; no two calls to f
can proceed in parallel. However, if f is associative, we can overcome this restriction
and implement fold concurrently. If f is also commutative, we can further increase concurrency.
We first consider the case when the reduction operator is associative. We
define afold(f,xs) where f is a binary associative function and
xs is a non-empty list. The implementation iteratively reduces xs
to a single value. Each step of the iteration applies the auxiliary function
step, which halves the size of xs by reducing disjoint pairs of
adjacent items.
def afold(_, [x]) = x def afold(f, xs) = def step([]) = [] def step([x]) = [x] def step(x:y:xs) = f(x,y):step(xs) afold(f, step(xs))
Notice that f(x,y):step(xs) is an implicit
fork-join. Thus, the call f(x,y)
executes in parallel with the recursive call step(xs).
As a result, all calls to f execute concurrently within
each iteration of afold.
We can make the implementation even more concurrent when the fold operator
is both associative and commutative. We define cfold(f,xs), where
f is a associative and commutative binary function and xs is a non-empty list.
The implementation initially copies all list items into a buffer in arbitrary
order using the auxiliary function xfer, counting the total
number of items copied. The auxiliary function combine repeatedly
pulls pairs of items from the buffer, reduces
them, and places the result back in the buffer. Each pair of items is reduced
in parallel as they become available. The last item in the buffer is the
result of the overall fold.
def cfold(f, xs) = val c = Buffer() def xfer([]) = 0 def xfer(x:xs) = c.put(x) >> stop | xfer(xs)+1 def combine(0) = stop def combine(1) = c.get() def combine(m) = c.get() >x> c.get() >y> ( c.put(f(x,y)) >> stop | combine(m-1)) xfer(xs) >n> combine(n)