/*
- iom.C -- generic I/O multiplexor
+ iom.C -- generic I/O multiplexer
Copyright (C) 2003, 2004 Marc Lehmann <pcg@goof.com>
This program is free software; you can redistribute it and/or modify
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
- Foundation, Inc. 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ Foundation, Inc. 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
+#include "iom.h"
+
#include <cstdio>
#include <cstdlib>
#include <cerrno>
#include <sys/time.h>
-#if 1 // older unices need these includes for select(2)
+#include <assert.h>
+
+#if 1 // older unices need these includes for select (2)
# include <unistd.h>
# include <sys/types.h>
+# include <time.h>
+#endif
+
+// for IOM_SIG
+#if IOM_SIG
+# include <csignal>
+# include <fcntl.h>
#endif
// if the BSDs would at least be marginally POSIX-compatible.. *sigh*
// until that happens, sys/select.h must come last
#include <sys/select.h>
-#include "iom.h"
-
// TSTAMP_MAX must still fit into a positive struct timeval
#define TSTAMP_MAX (double)(1UL<<31)
+#define TIMEVAL timeval
+#define TV_FRAC tv_usec
+#define TV_MULT 1000000L
+
+#if IOM_IO
+static io_manager_vec<io_watcher> iow;
+#endif
+#if IOM_CHECK
+static io_manager_vec<check_watcher> cw;
+#endif
+#if IOM_TIME
+static io_manager_vec<time_watcher> tw;
+#endif
+#if IOM_IDLE
+static io_manager_vec<idle_watcher> iw;
+#endif
+#if IOM_SIG
+static int sigpipe[2]; // signal signalling pipe
+static sigset_t sigs;
+struct sig_vec : io_manager_vec<sig_watcher> {
+ int pending;
+ sig_vec ()
+ : pending (false)
+ { }
+};
+static vector<sig_vec *> sw;
+#endif
+
// this is a dummy time watcher to ensure that the first
// time watcher is _always_ valid, this gets rid of a lot
// of null-pointer-checks
{
// should never get called
// reached end-of-time, or tstamp has a bogus definition,
- // or compiler initilization order broken, or somethine else :)
+ // or compiler initialisation order broken, or something else :)
abort ();
}
- tw0()
- : time_watcher (this, &tw0::cb)
- { }}
-tw0;
+ tw0 ()
+ : time_watcher (this, &tw0::cb)
+ { }
+ } tw0;
tstamp NOW;
+
+#if IOM_TIME
+tstamp io_manager::now ()
+{
+ struct timeval tv;
+
+ gettimeofday (&tv, 0);
+ return (tstamp)tv.tv_sec + (tstamp)tv.tv_usec / 1000000.;
+}
+
+void io_manager::set_now ()
+{
+ NOW = now ();
+}
+#endif
+
static bool iom_valid;
-io_manager iom;
-template<class watcher>
-void io_manager::reg (watcher *w, io_manager_vec<watcher> &queue)
+// used for initialisation only
+static struct init {
+ init ()
+ {
+#if IOM_SIG
+ sigemptyset (&sigs);
+
+ if (pipe (sigpipe))
+ {
+ perror ("io_manager: unable to create signal pipe, aborting.");
+ abort ();
+ }
+
+ fcntl (sigpipe[0], F_SETFL, O_NONBLOCK); fcntl (sigpipe[0], F_SETFD, FD_CLOEXEC);
+ fcntl (sigpipe[1], F_SETFL, O_NONBLOCK); fcntl (sigpipe[1], F_SETFD, FD_CLOEXEC);
+#endif
+
+ iom_valid = true;
+
+#if IOM_TIME
+ io_manager::set_now ();
+
+ tw0.start (TSTAMP_MAX);
+#endif
+ }
+
+ static void required ();
+} init;
+
+void
+init::required ()
{
if (!iom_valid)
- abort ();
+ {
+ write (2, "io_manager: early registration attempt, aborting.\n",
+ sizeof ("io_manager: early registration attempt, aborting.\n") - 1);
+ abort ();
+ }
+}
- if (!w->active)
+template<class watcher>
+void io_manager::reg (watcher &w, io_manager_vec<watcher> &queue)
+{
+ init::required ();
+
+ if (!w.active)
{
-#if IOM_CHECK
- queue.activity = true;
-#endif
- queue.push_back (w);
- w->active = queue.size ();
+ queue.push_back (&w);
+ w.active = queue.size ();
}
}
template<class watcher>
-void io_manager::unreg (watcher *w, io_manager_vec<watcher> &queue)
+void io_manager::unreg (watcher &w, io_manager_vec<watcher> &queue)
{
if (!iom_valid)
return;
- if (w->active)
+ if (w.active)
{
- queue [w->active - 1] = 0;
- w->active = 0;
+ queue [w.active - 1] = 0;
+ w.active = 0;
}
}
void time_watcher::trigger ()
{
call (*this);
-
- iom.reg (this);
+ io_manager::reg (*this);
}
-void io_manager::reg (time_watcher *w) { reg (w, tw); }
-void io_manager::unreg (time_watcher *w) { unreg (w, tw); }
+void io_manager::reg (time_watcher &w) { io_manager::reg (w, tw); }
+void io_manager::unreg (time_watcher &w) { io_manager::unreg (w, tw); }
#endif
#if IOM_IO
-void io_manager::reg (io_watcher *w) { reg (w, iow); }
-void io_manager::unreg (io_watcher *w) { unreg (w, iow); }
+void io_manager::reg (io_watcher &w) { io_manager::reg (w, iow); }
+void io_manager::unreg (io_watcher &w) { io_manager::unreg (w, iow); }
#endif
#if IOM_CHECK
-void io_manager::reg (check_watcher *w) { reg (w, cw); }
-void io_manager::unreg (check_watcher *w) { unreg (w, cw); }
+void io_manager::reg (check_watcher &w) { io_manager::reg (w, cw); }
+void io_manager::unreg (check_watcher &w) { io_manager::unreg (w, cw); }
#endif
#if IOM_IDLE
-void io_manager::reg (idle_watcher *w) { reg (w, iw); }
-void io_manager::unreg (idle_watcher *w) { unreg (w, iw); }
+void io_manager::reg (idle_watcher &w) { io_manager::reg (w, iw); }
+void io_manager::unreg (idle_watcher &w) { io_manager::unreg (w, iw); }
#endif
-#if IOM_TIME
-inline void set_now (void)
+#if IOM_SIG
+static void
+sighandler (int signum)
{
- struct timeval tv;
+ sw [signum - 1]->pending = true;
- gettimeofday (&tv, 0);
+ // we use a pipe for signal notifications, as most current
+ // OSes (Linux...) do not implement pselect correctly. ugh.
+ char ch = signum; // actual content not used
+ write (sigpipe[1], &ch, 1);
+}
- NOW = (tstamp)tv.tv_sec + (tstamp)tv.tv_usec / 1000000;
-#endif
+void io_manager::reg (sig_watcher &w)
+{
+ assert (0 < w.signum);
+
+ sw.reserve (w.signum);
+
+ while (sw.size () < w.signum) // pathetic
+ sw.push_back (0);
+
+ sig_vec *&sv = sw[w.signum - 1];
+
+ if (!sv)
+ {
+ sv = new sig_vec;
+
+ sigaddset (&sigs, w.signum);
+ sigprocmask (SIG_BLOCK, &sigs, NULL);
+
+ struct sigaction sa;
+ sa.sa_handler = sighandler;
+ sigfillset (&sa.sa_mask);
+ sa.sa_flags = SA_RESTART;
+
+ if (sigaction (w.signum, &sa, 0))
+ {
+ perror ("io_manager: error while installing signal handler, ignoring.");
+ abort ();
+ }
+
+ }
+
+ io_manager::reg (w, *sv);
}
+void io_manager::unreg (sig_watcher &w)
+{
+ if (!w.active)
+ return;
+
+ assert (0 < w.signum && w.signum <= sw.size ());
+
+ io_manager::unreg (w, *sw[w.signum - 1]);
+}
+
+void sig_watcher::start (int signum)
+{
+ stop ();
+ this->signum = signum;
+ io_manager::reg (*this);
+}
+#endif
+
void io_manager::loop ()
{
+ init::required ();
+
#if IOM_TIME
set_now ();
#endif
for (;;)
{
- struct timeval *to = 0;
- struct timeval tval;
-
-#if IOM_IDLE
- if (iw.size ())
- {
- tval.tv_sec = 0;
- tval.tv_usec = 0;
- to = &tval;
- }
- else
-#endif
- {
#if IOM_TIME
- time_watcher *next;
+ // call pending time watchers
+ {
+ bool activity;
- for (;;)
- {
- next = tw[0]; // the first time-watcher must exist at ALL times
-
- for (int i = tw.size (); i--; )
- if (!tw[i])
- tw.erase_unordered (i);
- else if (tw[i]->at < next->at)
- next = tw[i];
+ do
+ {
+ activity = false;
- if (next->at > NOW)
+ for (int i = tw.size (); i--; )
+ if (!tw[i])
+ tw.erase_unordered (i);
+ else if (tw[i]->at <= NOW)
{
- if (next != tw[0])
- {
- double diff = next->at - NOW;
- tval.tv_sec = (int)diff;
- tval.tv_usec = (int)((diff - tval.tv_sec) * 1000000);
- to = &tval;
- }
- break;
- }
- else
- {
- unreg (next);
- next->call (*next);
+ time_watcher &w = *tw[i];
+
+ unreg (w);
+ w.call (w);
+
+ activity = true;
}
- }
+ }
+ while (activity);
+ }
#endif
- }
-
#if IOM_CHECK
- tw.activity = false;
-
+ // call all check watchers
for (int i = cw.size (); i--; )
if (!cw[i])
cw.erase_unordered (i);
else
cw[i]->call (*cw[i]);
+#endif
- if (tw.activity)
+ struct TIMEVAL *to = 0;
+ struct TIMEVAL tval;
+
+#if IOM_IDLE
+ if (iw.size ())
{
tval.tv_sec = 0;
- tval.tv_usec = 0;
+ tval.TV_FRAC = 0;
to = &tval;
}
+ else
+#endif
+ {
+#if IOM_TIME
+ // find earliest active watcher
+ time_watcher *next = tw[0]; // the first time-watcher must exist at ALL times
+
+ for (io_manager_vec<time_watcher>::const_iterator i = tw.end (); i-- > tw.begin (); )
+ if (*i && (*i)->at < next->at)
+ next = *i;
+
+ if (next->at > NOW && next != tw[0])
+ {
+ double diff = next->at - NOW;
+ tval.tv_sec = (int)diff;
+ tval.TV_FRAC = (int) ((diff - tval.tv_sec) * TV_MULT);
+ to = &tval;
+ }
+ }
#endif
-#if IOM_IO
- fd_set rfd, wfd, efd;
+#if IOM_IO || IOM_SIG
+ fd_set rfd, wfd;
FD_ZERO (&rfd);
FD_ZERO (&wfd);
int fds = 0;
- for (io_manager_vec<io_watcher>::iterator i = iow.end (); i-- > iow.begin (); )
+# if IOM_IO
+ for (io_manager_vec<io_watcher>::const_iterator i = iow.end (); i-- > iow.begin (); )
if (*i)
{
if ((*i)->events & EVENT_READ ) FD_SET ((*i)->fd, &rfd);
if ((*i)->fd >= fds) fds = (*i)->fd + 1;
}
+# endif
- if (!to && !fds) //TODO: also check idle_watchers and check_watchers
+ if (!to && !fds) //TODO: also check idle_watchers and check_watchers?
break; // no events
- fds = select (fds, &rfd, &wfd, &efd, to);
+# if IOM_SIG
+ FD_SET (sigpipe[0], &rfd);
+ if (sigpipe[0] >= fds) fds = sigpipe[0] + 1;
+# endif
+
+# if IOM_SIG
+ // there is no race, as we use a pipe for signals, so select
+ // will return if a signal is caught.
+ sigprocmask (SIG_UNBLOCK, &sigs, NULL);
+# endif
+ fds = select (fds, &rfd, &wfd, NULL, to);
+# if IOM_SIG
+ sigprocmask (SIG_BLOCK, &sigs, NULL);
+# endif
+
# if IOM_TIME
- set_now ();
+ {
+ // update time, try to compensate for gross non-monotonic time changes
+ tstamp diff = NOW;
+ set_now ();
+ diff = NOW - diff;
+
+ if (diff < 0)
+ for (io_manager_vec<time_watcher>::const_iterator i = tw.end (); i-- > tw.begin (); )
+ if (*i)
+ (*i)->at += diff;
+ }
# endif
if (fds > 0)
- for (int i = iow.size (); i--; )
- if (!iow[i])
- iow.erase_unordered (i);
- else
+ {
+# if IOM_SIG
+ if (FD_ISSET (sigpipe[0], &rfd))
{
- short revents = iow[i]->events;
-
- if (!FD_ISSET (iow[i]->fd, &rfd)) revents &= ~EVENT_READ;
- if (!FD_ISSET (iow[i]->fd, &wfd)) revents &= ~EVENT_WRITE;
-
- if (revents)
- iow[i]->call (*iow[i], revents);
+ char ch;
+
+ while (read (sigpipe[0], &ch, 1) > 0)
+ ;
+
+ for (vector<sig_vec *>::iterator svp = sw.end (); svp-- > sw.begin (); )
+ if (*svp && (*svp)->pending)
+ {
+ sig_vec &sv = **svp;
+ for (int i = sv.size (); i--; )
+ if (!sv[i])
+ sv.erase_unordered (i);
+ else
+ sv[i]->call (*sv[i]);
+
+ sv.pending = false;
+ }
}
+# endif
+
+# if IOM_IO
+ for (int i = iow.size (); i--; )
+ if (!iow[i])
+ iow.erase_unordered (i);
+ else
+ {
+ io_watcher &w = *iow[i];
+ short revents = w.events;
+
+ if (!FD_ISSET (w.fd, &rfd)) revents &= ~EVENT_READ;
+ if (!FD_ISSET (w.fd, &wfd)) revents &= ~EVENT_WRITE;
+
+ if (revents)
+ w.call (w, revents);
+ }
+#endif
+ }
else if (fds < 0 && errno != EINTR)
{
- perror ("Error while waiting for I/O or time event");
+ perror ("io_manager: fatal error while waiting for I/O or time event, aborting.");
abort ();
}
#if IOM_IDLE
#else
break;
#endif
-
}
}
-io_manager::io_manager ()
-{
- iom_valid = true;
-
-#if IOM_TIME
- set_now ();
-
- tw0.start (TSTAMP_MAX);
-#endif
-}
-
-io_manager::~io_manager ()
-{
- iom_valid = false;
-}
-