include "sys.m"; include "styx.m"; include "styxflush.m"; styxflush := load Styxflush Styxflush->PATH; init: fn(); tmsg: fn(m: ref Styx->Tmsg, flushc: chan of (int, chan of int), reply: chan of ref Styx->Rmsg): (int, ref Styx->Rmsg); rmsg: fn(m: ref Styx->Rmsg): int; Einterrupted: con "interrupted";
Init must be called before anything else in styxflush to intialise its internal data structures.
When a T-message request arrives that will be dealt with concurrently, tmsg(m, flushc, reply) should be called to inform styxflush of the new request. M gives the T-message; flushc gives a channel that will be used if the request is flushed (see below), and reply should hold an unbuffered channel that can be used to send a reply to the central loop. Flushc will usually be a fresh channel for each request, but several requests may share the same flushc if, for instance, one process is managing several requests. Tmsg returns a tuple (handled, rm), where handled is non-zero if styxflush has dealt with the request itself. If it has, then the caller must not handle the request; it must send rm as a reply if it is not nil.
Rmsg should be called when a reply message arrives at the central process (the same process that has called tmsg). It returns non-zero if the reply message should actually be sent to the client - otherwise it should be discarded.
replyc: chan of ref Rmsg; centralloop(tm: chan of ref Tmsg, fd: ref Sys->FD) { replyc = chan of Rmsg; for(;;)alt{ m := <-tm => if(m == nil || tagof m == tagof Tmsg.Readerror){ cleanup(); # kill outstanding processes, etc. return; } flushc := chan of (int, chan of int); (handled, rm) := styxflush->tmsg(m, flushc, replyc); if(!handled) spawn request(m, flushc); else if(rm != nil) sendreply(rm); rm := <- replyc => if(styxflush->rmsg(rm)) sendreply(rm); } } sendreply(fd: ref Sys->FD, rm: ref Rmsg) { d := rm.pack(); sys->write(fd, d, len d); } request(tm: ref Tmsg, flushc: chan of (int, chan of int)) { pick m := tm { Open => replyc <-= ref Rmsg.Open(m.tag, ...); Read => [...] alt{ x := <-readc => # read from data produced on readc replyc <-= ref Rmsg.Read(m.tag, ...); (nil, rc) := <-flushc => # read request has been flushed. replyc <-= ref Rmsg.Error(m.tag, Einterrupted); rc <-= 1; } etc ... } }
STYXFLUSH(2 ) | Rev: Tue Mar 31 02:42:39 GMT 2015 |