We consider various concurrent implementations of the classic "list fold" function from functional programming:
def fold(_, [x]) = x def fold(f, x:xs) = f(x, fold(xs))
This is a seedless fold (sometimes called fold1
) which requires that the
list be nonempty and uses its first element as a seed. This implementation is
short-circuiting --- it may finish early if the reduction operator f
does
not use its second argument --- but it is not concurrent; no two calls to f
can proceed in parallel. However, if f
is associative, we can overcome this restriction
and implement fold concurrently. If f
is also commutative, we can further increase concurrency.
We first consider the case when the reduction operator is associative. We
define afold(f,xs)
where f
is a binary associative function and
xs
is a non-empty list. The implementation iteratively reduces xs
to a single value. Each step of the iteration applies the auxiliary function
step
, which halves the size of xs
by reducing disjoint pairs of
adjacent items.
def afold(_, [x]) = x def afold(f, xs) = def step([]) = [] def step([x]) = [x] def step(x:y:xs) = f(x,y):step(xs) afold(f, step(xs))
Notice that f(x,y):step(xs)
is an implicit
fork-join. Thus, the call f(x,y)
executes in parallel with the recursive call step(xs)
.
As a result, all calls to f
execute concurrently within
each iteration of afold
.
We can make the implementation even more concurrent when the fold operator
is both associative and commutative. We define cfold(f,xs)
, where
f
is a associative and commutative binary function and xs
is a non-empty list.
The implementation initially copies all list items into a channel in arbitrary
order using the auxiliary function xfer
, counting the total
number of items copied. The auxiliary function combine
repeatedly
pulls pairs of items from the channel, reduces
them, and places the result back in the channel. Each pair of items is reduced
in parallel as they become available. The last item in the channel is the
result of the overall fold.
def cfold(f, xs) = val c = Channel() def xfer([]) = 0 def xfer(x:xs) = c.put(x) >> stop | xfer(xs)+1 def combine(0) = stop def combine(1) = c.get() def combine(m) = c.get() >x> c.get() >y> ( c.put(f(x,y)) >> stop | combine(m-1)) xfer(xs) >n> combine(n)