The Orc combinators restrict the passing of values among their component
expressions. However, some programs will require greater
flexibility. For example, F <x< G
provides F with the first
publication of G, but what if F needs the first n publications of G?
In cases like this we use channels or other stateful sites to redirect or
store publications. We call this technique routing
because it involves routing values from one execution to another.
The pruning combinator terminates an expression after it publishes its first
value. We have already seen how to use
pruning just for its termination capability, without binding a variable, using
the let
site. Now, we use routing to terminate an expression
under different conditions, not just when it publishes a value; it may
publish many values, or none, before being terminated.
Our implementation strategy is to route the publications of the expression through a channel, so that we can put the expression inside a pruning combinator and still see its publications without those publications terminating the expression.
As a simple demonstration of this concept, we construct a more powerful form of timeout: allow an expression to execute, publishing arbitrarily many values (not just one), until a time limit is reached.
val c = Channel() repeat(c.get) << F >x> c.put(x) >> stop | Rwait(1000) >> c.closeD()
This program allows F to execute for one second and then terminates it. Each
value published by F is routed through channel c
so that it does
not terminate F. After one second, Rwait(1000)
responds,
triggering the call c.closeD()
. The call
c.closeD()
closes c
and publishes a signal,
terminating F. The library function repeat
is used to repeatedly
take and publish values from c
until it is closed.
We can also decide to terminate based on the values published. This expression executes F until it publishes a negative number, and then terminates it:
val c = Channel() repeat(c.get) << F >x> (if x >= 0 then c.put(x) >> stop else c.closeD())
Each value published by F is tested. If it is non-negative, it is placed on
channel c
(silently) and read by repeat(c.get)
.
If it is negative, the channel is closed, publishing a signal and causing
the termination of F.
We can use routing to interrupt an expression based on a signal from
elsewhere in the program. We set up the expression like a timeout, but instead
of waiting for a timer, we wait for the semaphore done
to be released. Any
call to done.release
will terminate the expression (because it will
cause done.acquire()
to publish), but otherwise F executes as normal and
may publish any number of values.
val c = Channel() val done = Semaphore(0) repeat(c.get) << F >x> c.put(x) >> stop | done.acquire() >> c.closeD()
We can limit an expression to n publications, rather than just one. Here is an expression which executes F until it publishes 5 values, and then terminates it.
val c = Channel() val done = Semaphore(0) def allow(0) = done.release() >> stop def allow(n) = c.get() >x> (x | allow(n-1)) allow(5) << F >x> c.put(x) >> stop | done.acquire() >> c.closeD()
We use the auxiliary function allow
to get only the first 5
publications from the channel c
. When no more publications are allowed,
allow
uses the interrupt idiom to halt F and close c
.
We can use routing to create a modified version of the pruning combinator.
As in F <x< G
, we'll run F and G in parallel and make the first
value published by G available to F. However instead of terminating G after
it publishes a value, we will continue running it, ignoring its remaining
publications.
val r = Cell() # (F <x< c.read()) | (G >x> c.write(x))
We can also use routing to create a modified version of the otherwise combinator. We'll run F until it halts, and then run G, regardless of whether F published any values or not.
val c = Channel() repeat(c.get) | (F >x> c.put(x) >> stop ; c.close() >> G)We use
c.close()
instead of the more common c.closeD()
to ensure that G does not execute until all the publications of F have been
routed. Recall that c.close()
does not return until c
is
empty.