A pipeline provides a simple form of parallelism. We show how to represent a pipeline in Orc.

Consider a server that repeatedly receives inputs along an input channel, computes f(x) for each input x, and outputs the result along an output channel. The order of the inputs is preserved among the outputs. If function f has the appropriate structure, the server can be programmed as a pipeline of processes that may operate on different inputs simultaneously.

First, consider a very simple structure of f. Let f(x) = g(x) >y> h(y)

We can then implement the server as a two-stage pipeline. First define a transducer(c,c',fn) that repeatedly accepts inputs along c, applies function fn to each input and outputs along channel c' in the appropriate order.

def transducer(c,c',fn) = c.get() >x> fn(x) >y> c'.put(y) >> transducer(c,c',fn)

The implementation of f is given by the following function pipe.

def pipe(c,c',g,h) = val c'' = Buffer() transducer(c,c'',g) | transducer(c'',c',h)

Here is a test case using pipe:

def transducer(c,c',fn) = c.get() >x> fn(x) >y> c'.put(y) >> transducer(c,c',fn) def pipe(c,c',g,h) = val c'' = Buffer() transducer(c,c'',g) | transducer(c'',c',h) val in = Buffer() val out = Buffer() def output(ch) = ch.get() >x> println(x) >> output(ch) in.put(0) >> in.put(3) >> in.put(5) >> stop | output(out)| pipe(in,out,lambda(x) = x+1,lambda(x) = x-1) >> stop

Next, we consider a more elaborate function which is defined recursively. This requires the pipeline to be also defined recursively. There are two special argument values "zero" and "unit" so that f(zero) = unit

A non-zero argument value x can be deconstructed into a pair of values x' and x" using a function decon, as follows, where g is some given function.

f(x) = decon(x) >(x',x'')> g(x',f(x''))

Thus, f is completely defined by the tuple (zero,unit,decon,g). As an example, for the factorial function,

(zero,unit,decon,g) = (0,1, lambda(x) = (x,x-1), lambda(a,b) = a*b )

To compute the mean of a non-empty list of numbers L, we compute sumlen(L) which publishes a pair (s,l), where s is the sum of the elements of L and l is its length; then, the mean equals s/l. Function sumlen can be computed by a pipeline, where

(zero,unit,decon,g) = ([], (0,0), lambda(x:xs) = ((x,1),xs), lambda((a,b),(a',b')) = (a+a',b+b') )

We give a general procedure for implementing such a recursive pipe. Function pipe, given below, reads from channel c and outputs to c'. It implements a maximum depth of recursion given by argument n. Function f is described by the tuple (zero,unit,decon,g).

def pipe(c,c',0,(zero,unit,decon,g)) = c.get() >> c'.put(unit) >> pipe(c,c',0,(zero,unit,decon,g)) {- For n > 0, function f is computed as follows. The process network consists of three processes: recv(), send(), and pipe(). Channel local is directed from recv() to send(), channel d from recv() to pipe(), and channel d' from pipe() to send(). recv() repeats the following steps: receive input x from channel c, if x = zero then send None() along local, else compute decon(x) = (x', x''), send Some(x') along local, and x'' along d. pipe() recursively returns f(x'') along d' send() repeats the following steps: receive y' along local, if y' is of the form None(), it denotes input zero; send unit along c', if y' is of the form Some(y), then receive z = f(x'') along d' compute f(x) = g(y,z) and send it along c' -} def pipe(c,c',n,(zero,unit,decon,g)) = val local = Buffer() val d = Buffer() val d' = Buffer() def recv() = c.get() >x> (if(x = zero) then local.put(None()) else decon(x) >(x',x'')> local.put(Some(x')) >> d.put(x'') ) >> recv() def send() = local.get() >y'> ( y' >None()> c'.put(unit) | y' >Some(y)> d'.get() >z> -- z = f(x'') c'.put(g(y,z)) ) >> send() {- Goal of pipe(): -} recv() | send() | pipe(d,d',n-1,(zero,unit,decon,g)) --Test val in = Buffer() val out = Buffer() def output(ch) = ch.get() >x> println(x) >> output(ch) def factorial(c,c',n) = pipe(c,c',n, (0,1, lambda(x) = (x,x-1), lambda(a,b) = a*b )) in.put(4) >> in.put(3) >> in.put(3) >> stop | output(out)| factorial(in,out,5) >> stop