From 6f0ef1bae28b806a511a81bcd92029b5c8b5522f Mon Sep 17 00:00:00 2001 From: Marcus Brinkmann Date: Sat, 11 Mar 2006 21:03:00 +0000 Subject: [PATCH] Implement watches for GIOChannels for write file descriptors on Win32 2006-03-02 Marcus Brinkmann Implement watches for GIOChannels for write file descriptors on Win32 (#333098). * glib/giowin32.c (GIOWin32Channel): Add a new direction field. (read_thread): Initialize direction. (write_thread): New function. (buffer_write): New function. (g_io_win32_prepare): Handle the G_IO_WIN32_FILE_DESC case for the write direction. (g_io_win32_fd_write): Call buffer_write() if there is a writer thread. (g_io_win32_fd_close): Set space_avail_event for writer threads. (g_io_win32_fd_create_watch): Create the writer thread if condition is G_IO_OUT. (g_io_channel_win32_make_pollfd): Likewise here. --- ChangeLog | 18 ++++ ChangeLog.pre-2-10 | 18 ++++ ChangeLog.pre-2-12 | 18 ++++ glib/giowin32.c | 238 +++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 283 insertions(+), 9 deletions(-) diff --git a/ChangeLog b/ChangeLog index f0c1ea30..b8bdfc30 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,21 @@ +2006-03-02 Marcus Brinkmann + + Implement watches for GIOChannels for write file descriptors on + Win32 (#333098). + + * glib/giowin32.c (GIOWin32Channel): Add a new direction field. + (read_thread): Initialize direction. + (write_thread): New function. + (buffer_write): New function. + (g_io_win32_prepare): Handle the G_IO_WIN32_FILE_DESC case for the + write direction. + (g_io_win32_fd_write): Call buffer_write() if there is a writer + thread. + (g_io_win32_fd_close): Set space_avail_event for writer threads. + (g_io_win32_fd_create_watch): Create the writer thread if + condition is G_IO_OUT. + (g_io_channel_win32_make_pollfd): Likewise here. + 2006-03-09 Matthias Clasen * Makefile.am: Add ChangeLog.pre-2.8 to EXTRA_DIST. diff --git a/ChangeLog.pre-2-10 b/ChangeLog.pre-2-10 index f0c1ea30..b8bdfc30 100644 --- a/ChangeLog.pre-2-10 +++ b/ChangeLog.pre-2-10 @@ -1,3 +1,21 @@ +2006-03-02 Marcus Brinkmann + + Implement watches for GIOChannels for write file descriptors on + Win32 (#333098). + + * glib/giowin32.c (GIOWin32Channel): Add a new direction field. + (read_thread): Initialize direction. + (write_thread): New function. + (buffer_write): New function. + (g_io_win32_prepare): Handle the G_IO_WIN32_FILE_DESC case for the + write direction. + (g_io_win32_fd_write): Call buffer_write() if there is a writer + thread. + (g_io_win32_fd_close): Set space_avail_event for writer threads. + (g_io_win32_fd_create_watch): Create the writer thread if + condition is G_IO_OUT. + (g_io_channel_win32_make_pollfd): Likewise here. + 2006-03-09 Matthias Clasen * Makefile.am: Add ChangeLog.pre-2.8 to EXTRA_DIST. diff --git a/ChangeLog.pre-2-12 b/ChangeLog.pre-2-12 index f0c1ea30..b8bdfc30 100644 --- a/ChangeLog.pre-2-12 +++ b/ChangeLog.pre-2-12 @@ -1,3 +1,21 @@ +2006-03-02 Marcus Brinkmann + + Implement watches for GIOChannels for write file descriptors on + Win32 (#333098). + + * glib/giowin32.c (GIOWin32Channel): Add a new direction field. + (read_thread): Initialize direction. + (write_thread): New function. + (buffer_write): New function. + (g_io_win32_prepare): Handle the G_IO_WIN32_FILE_DESC case for the + write direction. + (g_io_win32_fd_write): Call buffer_write() if there is a writer + thread. + (g_io_win32_fd_close): Set space_avail_event for writer threads. + (g_io_win32_fd_create_watch): Create the writer thread if + condition is G_IO_OUT. + (g_io_channel_win32_make_pollfd): Likewise here. + 2006-03-09 Matthias Clasen * Makefile.am: Add ChangeLog.pre-2.8 to EXTRA_DIST. diff --git a/glib/giowin32.c b/glib/giowin32.c index ff164a11..7a9b6d7b 100644 --- a/glib/giowin32.c +++ b/glib/giowin32.c @@ -77,6 +77,10 @@ struct _GIOWin32Channel { /* Following fields are used by fd channels. */ CRITICAL_SECTION mutex; + int direction; /* 0 means we read from it, + * 1 means we write to it. + */ + gboolean running; /* Is reader thread running. FALSE if * EOF has been reached. */ @@ -391,7 +395,8 @@ read_thread (void *parameter) channel->fd, (guint) channel->data_avail_event, (guint) channel->space_avail_event); - + + channel->direction = 0; channel->buffer = g_malloc (BUFFER_SIZE); channel->rdp = channel->wrp = 0; channel->running = TRUE; @@ -486,6 +491,117 @@ read_thread (void *parameter) return 0; } +static unsigned __stdcall +write_thread (void *parameter) +{ + GIOWin32Channel *channel = parameter; + guchar *buffer; + guint nbytes; + + g_io_channel_ref ((GIOChannel *)channel); + + if (channel->debug) + g_print ("write_thread %#x: start fd=%d, data_avail=%#x space_avail=%#x\n", + channel->thread_id, + channel->fd, + (guint) channel->data_avail_event, + (guint) channel->space_avail_event); + + channel->direction = 1; + channel->buffer = g_malloc (BUFFER_SIZE); + channel->rdp = channel->wrp = 0; + channel->running = TRUE; + + SetEvent (channel->space_avail_event); + + /* We use the same event objects as for a reader thread, but with + * reversed meaning. So, space_avail is used if data is available + * for writing, and data_avail is used if space is available in the + * write buffer. + */ + + LOCK (channel->mutex); + while (channel->running || channel->rdp != channel->wrp) + { + if (channel->debug) + g_print ("write_thread %#x: rdp=%d, wrp=%d\n", + channel->thread_id, channel->rdp, channel->wrp); + if (channel->wrp == channel->rdp) + { + /* Buffer is empty. */ + if (channel->debug) + g_print ("write_thread %#x: resetting space_avail\n", + channel->thread_id); + ResetEvent (channel->space_avail_event); + if (channel->debug) + g_print ("write_thread %#x: waiting for data\n", + channel->thread_id); + channel->revents = G_IO_OUT; + SetEvent (channel->data_avail_event); + UNLOCK (channel->mutex); + WaitForSingleObject (channel->space_avail_event, INFINITE); + + LOCK (channel->mutex); + if (channel->rdp == channel->wrp) + break; + + if (channel->debug) + g_print ("write_thread %#x: rdp=%d, wrp=%d\n", + channel->thread_id, channel->rdp, channel->wrp); + } + + buffer = channel->buffer + channel->rdp; + if (channel->rdp < channel->wrp) + nbytes = channel->wrp - channel->rdp; + else + nbytes = BUFFER_SIZE - channel->rdp; + + if (channel->debug) + g_print ("write_thread %#x: calling write() for %d bytes\n", + channel->thread_id, nbytes); + + UNLOCK (channel->mutex); + nbytes = write (channel->fd, buffer, nbytes); + LOCK (channel->mutex); + + if (channel->debug) + g_print ("write_thread %#x: write(%i) returned %d, rdp=%d, wrp=%d\n", + channel->thread_id, channel->fd, nbytes, channel->rdp, channel->wrp); + + channel->revents = 0; + if (nbytes > 0) + channel->revents |= G_IO_OUT; + else if (nbytes <= 0) + channel->revents |= G_IO_ERR; + + channel->rdp = (channel->rdp + nbytes) % BUFFER_SIZE; + + if (nbytes <= 0) + break; + + if (channel->debug) + g_print ("write_thread: setting data_avail for thread %#x\n", + channel->thread_id); + SetEvent (channel->data_avail_event); + } + + channel->running = FALSE; + if (channel->needs_close) + { + if (channel->debug) + g_print ("write_thread %#x: channel fd %d needs closing\n", + channel->thread_id, channel->fd); + close (channel->fd); + channel->fd = -1; + } + + UNLOCK (channel->mutex); + + g_io_channel_unref ((GIOChannel *)channel); + + return 0; +} + static void create_thread (GIOWin32Channel *channel, GIOCondition condition, @@ -575,6 +691,78 @@ buffer_read (GIOWin32Channel *channel, return (*bytes_read > 0) ? G_IO_STATUS_NORMAL : G_IO_STATUS_EOF; } + +static GIOStatus +buffer_write (GIOWin32Channel *channel, + const guchar *dest, + gsize count, + gsize *bytes_written, + GError **err) +{ + guint nbytes; + guint left = count; + + LOCK (channel->mutex); + if (channel->debug) + g_print ("buffer_write: writing to thread %#x %d bytes, rdp=%d, wrp=%d\n", + channel->thread_id, count, channel->rdp, channel->wrp); + + if ((channel->wrp + 1) % BUFFER_SIZE == channel->rdp) + { + /* Buffer is full */ + if (channel->debug) + g_print ("buffer_write: tid %#x: resetting data_avail\n", + channel->thread_id); + ResetEvent (channel->data_avail_event); + if (channel->debug) + g_print ("buffer_write: tid %#x: waiting for space\n", + channel->thread_id); + UNLOCK (channel->mutex); + WaitForSingleObject (channel->data_avail_event, INFINITE); + LOCK (channel->mutex); + if (channel->debug) + g_print ("buffer_write: tid %#x: rdp=%d, wrp=%d\n", + channel->thread_id, channel->rdp, channel->wrp); + } + + nbytes = MIN ((channel->rdp + BUFFER_SIZE - channel->wrp - 1) % BUFFER_SIZE, + BUFFER_SIZE - channel->wrp); + + UNLOCK (channel->mutex); + nbytes = MIN (left, nbytes); + if (channel->debug) + g_print ("buffer_write: tid %#x: writing %d bytes\n", + channel->thread_id, nbytes); + memcpy (channel->buffer + channel->wrp, dest, nbytes); + dest += nbytes; + left -= nbytes; + LOCK (channel->mutex); + + channel->wrp = (channel->wrp + nbytes) % BUFFER_SIZE; + if (channel->debug) + g_print ("buffer_write: tid %#x: rdp=%d, wrp=%d, setting space_avail\n", + channel->thread_id, channel->rdp, channel->wrp); + SetEvent (channel->space_avail_event); + + if ((channel->wrp + 1) % BUFFER_SIZE == channel->rdp) + { + /* Buffer is full */ + if (channel->debug) + g_print ("buffer_write: tid %#x: resetting data_avail\n", + channel->thread_id); + ResetEvent (channel->data_avail_event); + } + + UNLOCK (channel->mutex); + + /* We have no way to indicate any errors form the actual + * write() call in the writer thread. Should we have? + */ + *bytes_written = count - left; + return (*bytes_written > 0) ? G_IO_STATUS_NORMAL : G_IO_STATUS_EOF; +} + + static gboolean g_io_win32_prepare (GSource *source, gint *timeout) @@ -601,13 +789,27 @@ g_io_win32_prepare (GSource *source, condition_to_string (channel->revents)); LOCK (channel->mutex); - if (channel->running && channel->wrp == channel->rdp) + if (channel->running) { - if (channel->debug) - g_print ("g_io_win32_prepare: for thread %#x, setting channel->revents = 0\n", - channel->thread_id); - channel->revents = 0; + if (channel->direction == 0 && channel->wrp == channel->rdp) + { + if (channel->debug) + g_print ("g_io_win32_prepare: for thread %#x, setting channel->revents = 0\n", + channel->thread_id); + channel->revents = 0; + } } + else + { + if (channel->direction == 1 + && (channel->wrp + 1) % BUFFER_SIZE == channel->rdp) + { + if (channel->debug) + g_print ("g_io_win32_prepare: for thread %#x, setting channel->revents = %i\n", + channel->thread_id, 0); + channel->revents = 0; + } + } UNLOCK (channel->mutex); break; @@ -964,6 +1166,11 @@ g_io_win32_fd_write (GIOChannel *channel, { GIOWin32Channel *win32_channel = (GIOWin32Channel *)channel; gint result; + + if (win32_channel->thread_id) + { + return buffer_write (win32_channel, buf, count, bytes_written, err); + } result = write (win32_channel->fd, buf, count); if (win32_channel->debug) @@ -1061,7 +1268,10 @@ g_io_win32_fd_close (GIOChannel *channel, win32_channel->thread_id, win32_channel->fd); win32_channel->running = FALSE; win32_channel->needs_close = TRUE; - SetEvent (win32_channel->data_avail_event); + if (win32_channel->direction == 0) + SetEvent (win32_channel->data_avail_event); + else + SetEvent (win32_channel->space_avail_event); } else { @@ -1105,7 +1315,12 @@ g_io_win32_fd_create_watch (GIOChannel *channel, LOCK (win32_channel->mutex); if (win32_channel->thread_id == 0) - create_thread (win32_channel, condition, read_thread); + { + if (condition & G_IO_IN) + create_thread (win32_channel, condition, read_thread); + else if (condition & G_IO_OUT) + create_thread (win32_channel, condition, write_thread); + } g_source_add_poll (source, &watch->pollfd); UNLOCK (win32_channel->mutex); @@ -1720,7 +1935,12 @@ g_io_channel_win32_make_pollfd (GIOChannel *channel, fd->fd = (gint) win32_channel->data_avail_event; if (win32_channel->thread_id == 0 && (condition & G_IO_IN)) - create_thread (win32_channel, condition, read_thread); + { + if (condition & G_IO_IN) + create_thread (win32_channel, condition, read_thread); + else if (condition & G_IO_OUT) + create_thread (win32_channel, condition, write_thread); + } break; case G_IO_WIN32_SOCKET: -- 2.34.1