A pipeline provides a simple form of parallelism. We show how to
represent a pipeline in Orc.

Consider a server that repeatedly receives inputs along an input
channel, computes f(x) for each input x, and outputs the result along
an output channel. The order of the inputs is preserved among the
outputs. If function f has the appropriate structure, the server can
be programmed as a pipeline of processes that may operate on different
inputs simultaneously.

First, consider a very simple structure of f. Let
f(x) = g(x) >y> h(y)

We can then implement the server as a two-stage pipeline. First define
a transducer(c,c',fn)  that repeatedly accepts inputs along c, applies
function fn to each input and outputs along channel c' in the
appropriate order.

[{orc runnable='false'

def transducer(c,c',fn) =
  c.get() >x> fn(x) >y> c'.put(y) >> transducer(c,c',fn)
}]

The implementation of f is given by the following function pipe.

[{orc runnable='false'

def pipe(c,c',g,h) =
  val c'' = Buffer()
  transducer(c,c'',g) | transducer(c'',c',h)
  
}]

Here is a test case using pipe:

[{orc

def transducer(c,c',fn) =
  c.get() >x> fn(x) >y> c'.put(y) >> transducer(c,c',fn)
  
def pipe(c,c',g,h) =
  val c'' = Buffer()
  transducer(c,c'',g) | transducer(c'',c',h)

val in =  Buffer()
val out = Buffer()

def output(ch) = ch.get() >x> println(x) >> output(ch)

in.put(0)  >> in.put(3) >> in.put(5) >> stop |
output(out)|
pipe(in,out,lambda(x) = x+1,lambda(x) = x-1)
>> stop

}]

Next, we consider a more elaborate function which is defined
recursively. This requires the pipeline to be also defined
recursively. There are two special argument values "zero" and "unit"
so that
f(zero) = unit

A non-zero argument value x can be deconstructed into a pair of values
x' and x" using a function 
decon, as follows, where g is some given function.

[{orc runnable='false'

f(x) = decon(x) >(x',x'')> g(x',f(x''))

}]

Thus, f is completely defined by the tuple (zero,unit,decon,g).
As an example, for the factorial function,

[{orc runnable='false'

(zero,unit,decon,g) =
(0,1,
lambda(x) = (x,x-1),
lambda(a,b) = a*b
)

}]

To compute the mean of a non-empty list of numbers L, we compute sumlen(L) which
publishes a pair (s,l), where s is the sum of the elements of L and l
is its length; then, the mean equals s/l. Function sumlen can be computed
by a pipeline, where

[{orc runnable='false'

(zero,unit,decon,g) =
([], (0,0),
lambda(x:xs) = ((x,1),xs),
lambda((a,b),(a',b')) = (a+a',b+b')
)

}]

We give a general procedure for implementing such a recursive pipe.
Function pipe, given below, reads from channel c and outputs to
c'. It implements a maximum depth of recursion given
by argument n. Function f is described by the tuple
(zero,unit,decon,g).


[{orc

def pipe(c,c',0,(zero,unit,decon,g)) =
  c.get() >>  c'.put(unit) >> pipe(c,c',0,(zero,unit,decon,g))

{- For n > 0, function f is computed as follows. The process network
 consists of three processes: recv(), send(), and pipe().
 Channel local is directed from recv() to send(),
 channel d from recv() to pipe(), and
 channel d' from pipe() to send().

 recv() repeats the following steps:
  receive input x from channel c,
  if x = zero then send None() along local,
  else compute decon(x) = (x', x''),
    send Some(x') along local, and x'' along d.
 pipe() recursively returns f(x'') along d'

 send() repeats the following steps:
  receive y' along local,
  if y' is of the form None(), it denotes input zero;
   send unit along c',
  if y' is of the form Some(y), then
   receive z = f(x'') along d'
   compute f(x) = g(y,z) and send it along c'

-}  
def pipe(c,c',n,(zero,unit,decon,g)) =
  val local = Buffer()
  val d = Buffer()
  val d' = Buffer()

  def recv() = c.get() >x>
               (if(x = zero) then local.put(None())
                else decon(x) >(x',x'')>
                     local.put(Some(x')) >> d.put(x'')
               )
               >> recv()

  def send() = local.get() >y'>
               (  y' >None()> c'.put(unit)
                | y' >Some(y)> d'.get() >z>  -- z = f(x'')
                     c'.put(g(y,z))
               )
               >> send()

{- Goal of pipe(): -}
recv() | send() | pipe(d,d',n-1,(zero,unit,decon,g))

--Test
val in  = Buffer()
val out = Buffer()

def output(ch) = ch.get() >x> println(x) >> output(ch)

def factorial(c,c',n) = pipe(c,c',n,
(0,1,
lambda(x) = (x,x-1),
lambda(a,b) = a*b
))

in.put(4)  >> in.put(3) >> in.put(3) >> stop |
output(out)|
factorial(in,out,5)
>> stop
}]