We construct a network of servers in net(n,f,in,out). The input to the network is Buffer "in", and the output is buffer is "out". Each item x from in is transformed to f(x) and appears in out. The order of the items in the input is preserved in the output. The goal is to employ n servers to do the computations in parallel.

The strategy is to divide the inputs among the servers; input number i goes to server i % n. Each server is assumed to maintain the proper order in its own input and output. The outputs of the servers are placed in order in the output buffer "out". Server i has c(i) as the input buffer and c'(i) as its output buffer.

def net(n,f,in,out) =
 -- n is the number of servers
 -- f is the function each server is computing
 -- in,out are the input and output channels of the network

 val c = fillArray(Array(n), lambda (_) = Buffer())
 val c' = fillArray(Array(n), lambda (_) = Buffer())

 --------------------- Distribute from a channel to n channels ----
 --- distribute(i) distributes the input items among the buffers c
 -- The next item is put in c(i), following one in c((i+1) % n), ...

 def distribute(i) =
   in.get() >x>
   c(i)?.put(x) >>
   distribute((i+1) % n)

 --------------------- Collect from n channels into a channel ----
 --- collect(i) removes the items from buffers c' and outputs
 -- The next item is removed from c'(i), following one from c'((i+1) % n), ...

 def collect(i) =
   c'(i)?.get()  >x>
   out.put(x) >>
   collect((i+1) % n)

 --------------------- Server process -----------------------

 def server(i) = c(i)?.get()  >x> f(x) >y> c'(i)?.put(y) >> server(i)

 --------------------- End of Server  -----------------------

{- goal for net() -}
 distribute(0) | upto(n) >i> server(i) | collect(0)

{- Test -}
val in = Buffer()
val out = Buffer()

def output()= out.get() >x> println(x) >> output()
def input(i) = in.put(i) >> Rtimer(20) >> input(i+1)


 input(0)
| output()
| net(7,lambda(z) = z,in,out)

Add new attachment

Only authorized users are allowed to upload new attachments.
« This page (revision-1) was last changed on 26-Dec-2008 11:21 by AdrianQuark