include "sys.m"; include "styx.m"; include "styxflush.m"; styxflush := load Styxflush Styxflush->PATH; init: fn(); tmsg: fn(m: ref Styx->Tmsg, flushc: chan of (int, chan of int), reply: chan of ref Styx->Rmsg): (int, ref Styx->Rmsg); rmsg: fn(m: ref Styx->Rmsg): int; Einterrupted: con "interrupted";
Init must be called before anything else in styxflush to intialise its internal data structures.
When a T-message request arrives that will be dealt with concurrently, tmsg(m, flushc, reply) should be called to inform styxflush of the new request. M gives the T-message; flushc gives a channel that will be used if the request is flushed (see below), and reply should hold an unbuffered channel that can be used to send a reply to the central loop. Flushc will usually be a fresh channel for each request, but several requests may share the same flushc if, for instance, one process is managing several requests. Tmsg returns a tuple (handled, rm), where handled is non-zero if styxflush has dealt with the request itself. If it has, then the caller must not handle the request; it must send rm as a reply if it is not nil.
Rmsg should be called when a reply message arrives at the central process (the same process that has called tmsg). It returns non-zero if the reply message should actually be sent to the client - otherwise it should be discarded.
replyc: chan of ref Rmsg;
centralloop(tm: chan of ref Tmsg, fd: ref Sys->FD)
{
replyc = chan of Rmsg;
for(;;)alt{
m := <-tm =>
if(m == nil || tagof m == tagof Tmsg.Readerror){
cleanup(); # kill outstanding processes, etc.
return;
}
flushc := chan of (int, chan of int);
(handled, rm) := styxflush->tmsg(m, flushc, replyc);
if(!handled)
spawn request(m, flushc);
else if(rm != nil)
sendreply(rm);
rm := <- replyc =>
if(styxflush->rmsg(rm))
sendreply(rm);
}
}
sendreply(fd: ref Sys->FD, rm: ref Rmsg)
{
d := rm.pack();
sys->write(fd, d, len d);
}
request(tm: ref Tmsg, flushc: chan of (int, chan of int))
{
pick m := tm {
Open =>
replyc <-= ref Rmsg.Open(m.tag, ...);
Read =>
[...]
alt{
x := <-readc =>
# read from data produced on readc
replyc <-= ref Rmsg.Read(m.tag, ...);
(nil, rc) := <-flushc =>
# read request has been flushed.
replyc <-= ref Rmsg.Error(m.tag, Einterrupted);
rc <-= 1;
}
etc ...
}
}
| STYXFLUSH(2 ) | Rev: Tue Mar 31 02:42:39 GMT 2015 |