A simple sequencer than can playback events at varying speed and with seeking.

The code still needs some work. It works, but it is not clean. It could probably be improved in many ways.

The core of the sequencer is a system to send out events over a channel that represent a range of generalized "times" that need to be "executed". This stream of events can easily be used to control arbitrary other sites and the event stream can be controlled in terms of speed and position since the sites that handle them can be stateless because all they see is "now" commands.

However the handlers COULD keep state as long as they keep track of the current time and handle it properly. This is because an event (s, t) means that now is time t and events from time s to time t should be triggered.

A special pattern matching site can be used to implement range matching so that code using this sequencer doesn't need to check the range explicitly, just say what time it wants to trigger at.

An interface could easily be layered on top that takes as input a list of time-closure pairs and plays them back. However the primary interface would be seperate listening processes for each event.

-- The type used for times within the system. This could eventually be parameterized.
type Time = Integer

-- (a, b]: a is exclusive, b is inclusive. But the range may be reversed. This means sequence time is going backwards.
type Range = Range(Time, Time) 

-- A utility that matches publishes the given Range if v is in the given range.
def filter(Range(a, b) as r, v) = if b :> a then
					Ift(v :> a && v <= b) >> r
				  else
				  	Ift(v >= b && v <: a) >> r

-- Similar to the synchronized block in Java run f with s held. f may only publish once.
def synchronized[A](s :: Semaphore, f :: lambda() :: A) :: A = 
	s.acquire() >> (
		val x = f()
		x >> s.release() >> x) ; (s.release() >> stop)

{-
A sequencer for Orc. It functions by publishing events along listener channels.
Each event is a Range value in the form Range(a, b) means that time has gone from
sequence time a to b and all events in that range should be processed. If sequence
time is moving in reverse of real time a may be greater than b. However a is always
exclusive (that exact time has aleady been visited) and b is always inclusive 
(that exact time has not been visited).

period is the number of ms between updates to listeners in real time. This is important
so updates are arrive puctually even when sequencer time is runnings slowly.
-}
def class Sequencer(period :: Time) =
	-- Private Fields

	-- The real time source for the sequencer
	val c = Rclock()
	{-
	The reference times and speed of the sequencer. These change when the sequencer speed or time
	is changed, but they do not change as the sequencer plays.
	The tuple fields are (real reference time, sequence reference time, speed). The real 
	and sequence reference times represent a real time and a sequence time that are simultainous.
	The speed is as a factor of real time. Together these values represent the current relationship 
	between real time and sequence time.
	-}
	val state = Ref((0, 0, 1.0))
	-- A set of listener channels that should be updated as time moves.
	val listeners = Ref([] :: List[Channel])
	-- A semaphore to allow the sequencer to be killed. It is released to signal kill.
	val killSwitch = Semaphore(0)
	-- The sequence time at which the listeners where last updated.
	val lastEvent = Ref(0)
	-- The lock protecting the internal mutable state (state, lastEvent, and listeners)
	val lock = Semaphore(1)
	
	-- Private Methods
	
	-- Translate a real time into a sequence time using the current value of state.
	-- Locking not needed because reading a pointer from a Ref is atomic (I assume) so we will get some consistent view.
	def timeInt(rt) = state? >(r, s, speed)> s + (rt - r) * speed

	-- Send updates to all listeners to sequence time t. This uses lastEvent as the start of the range.
	-- Call only with lock held as lastEvent is updated.
	def eventInt(t) = 
		lastEvent? >le>
		lastEvent := t >> (signal | -- Tell the sequencer that it can continue as soon as we have updated lastEvent
		each(listeners?) >f> f(Range(le, t)) >> stop)

	-- Public API

 	-- Get the current sequence time.
	def time() = timeInt(c.time())
	-- Get the current speed. 
	def speed() = state? >(_, _, speed)> speed
	-- Set the speed to a new value
	def setSpeed(speed) = synchronized(lock, lambda () =
		val rt = c.time() -- Get the current real time
		val t = timeInt(rt) -- Compute the current sequence time
		eventInt(t) >> -- Notify listeners of the time step just before the speed change. 
					   -- This is required so that direction reversals cannot loose events.
		state := (rt, t, speed) -- Update the state with new reference times and speed
		)
	-- Get the sequence time. Events exactly at nt will not be run.
	def setTime(nt) = synchronized(lock, lambda () =
		val rt = c.time() -- Get the current real time
		val t = timeInt(rt) -- Compute the current sequence time
		eventInt(t) >> -- Notify listeners of the time step just before the time change. 
					   -- This is required so that events are not lost.
		lastEvent := nt >> -- Update the last event to the time we are jumping to.
		state? >(_, _, speed)> state := (rt, nt, speed) -- Update the state with the current real time and the new sequence time as references
		)
	-- Add a listener channel that will be notified of time ranges as they are passed
	def addListener(c) = synchronized(lock, lambda () = listeners := c : listeners?)
	-- Kill the sequencer.
	def kill() = killSwitch.release()
	
	
	-- Backend
	
	stop << -- for termination
	killSwitch.acquire() | -- when the killswitch is released terminate
	metronome(period) >>  -- Every period ms update listeners
	lock.acquireD() >> -- Use acquireD so that if the lock is already held we just give up so that update attempts cannot queue up
 	eventInt(time()) >> -- Update listeners
	lock.release() >> -- And we are done
	stop -- Never publish because that would make us terminate.
	{- 
	  The lock is needed here because if the metro is going fast enough more than one event can be published at a time. 
	  acquireD means that if we are already doing something we just stop and wait for the next event. 
	-}


-- A test sequence for the Sequencer
val seq = Sequencer(15)
val chans = Channel[Channel]()

upto(11) >v> (
  val c = Channel()
  seq.addListener(c.put) >>
  chans.put(c) >>
  repeat(c.get) >r> filter(r, v*1000) >Range(_,t)> seq.time() >ct>
    Println("event t=" + t + ", sched t=" + v*1000 + ", curr t=" + ct + ", sched err=" + (ct-v*1000) + ", real t=" + Rtime()) >> 
    stop
)
--| seq.addListener(Println)
|
Rwait(3500) >> seq.setSpeed(0.5)
|
Rwait(8500) >> seq.setSpeed(-2)
|
Rwait(12500) >> seq.setTime(10001)
|
Rwait(16500) >> seq.setSpeed(0)
|
Rwait(17750) >> seq.setSpeed(1)
|
Rwait(20000) >> seq.kill() >> 
(repeat(chans.get) >c> c.close() | chans.close()) >> stop

Add new attachment

Only authorized users are allowed to upload new attachments.
« This page (revision-1) was last changed on 04-Apr-2013 14:39 by Arthur Peters