From 7f4864e58d4ee438a83cbf15f76e7d7aa171016c Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Fri, 26 Sep 2008 16:19:35 +0000 Subject: [PATCH] Bug 505361 - gunixinputstream.c assumes poll() available Bug 509446 - portable blocking gio cancellation * gcancellable.c (g_cancellable_make_pollfd): New method to make a GPollFD for a cancellable (which is slightly more complicated on Windows than Unix). * gunixinputstream.c (g_unix_input_stream_read): * gunixoutputstream.c (g_unix_output_stream_write): Use g_cancellable_make_pollfd() and g_poll() rather than using poll() directly. * tests/unix-streams.c: test of GUnixInputStream, GUnixOutputStream, and GCancellable. svn path=/trunk/; revision=7553 --- docs/reference/gio/gio-sections.txt | 1 + gio/ChangeLog | 17 ++ gio/gcancellable.c | 57 ++++++- gio/gcancellable.h | 4 + gio/gio.symbols | 1 + gio/gunixinputstream.c | 20 +-- gio/gunixoutputstream.c | 20 +-- gio/pltcheck.sh | 2 +- gio/tests/Makefile.am | 7 +- gio/tests/unix-streams.c | 256 ++++++++++++++++++++++++++++ 10 files changed, 356 insertions(+), 29 deletions(-) create mode 100644 gio/tests/unix-streams.c diff --git a/docs/reference/gio/gio-sections.txt b/docs/reference/gio/gio-sections.txt index 9c8a9431..6d78499e 100644 --- a/docs/reference/gio/gio-sections.txt +++ b/docs/reference/gio/gio-sections.txt @@ -932,6 +932,7 @@ g_cancellable_new g_cancellable_is_cancelled g_cancellable_set_error_if_cancelled g_cancellable_get_fd +g_cancellable_make_pollfd g_cancellable_get_current g_cancellable_pop_current g_cancellable_push_current diff --git a/gio/ChangeLog b/gio/ChangeLog index 29a9558b..2f6aa064 100644 --- a/gio/ChangeLog +++ b/gio/ChangeLog @@ -1,3 +1,20 @@ +2008-09-26 Dan Winship + + Bug 505361 – gunixinputstream.c assumes poll() available + Bug 509446 – portable blocking gio cancellation + + * gcancellable.c (g_cancellable_make_pollfd): New method to make a + GPollFD for a cancellable (which is slightly more complicated on + Windows than Unix). + + * gunixinputstream.c (g_unix_input_stream_read): + * gunixoutputstream.c (g_unix_output_stream_write): Use + g_cancellable_make_pollfd() and g_poll() rather than using poll() + directly. + + * tests/unix-streams.c: test of GUnixInputStream, + GUnixOutputStream, and GCancellable. + 2008-09-26 Dan Winship * gdesktopappinfo.c (get_all_desktop_entries_for_mime_type): add a diff --git a/gio/gcancellable.c b/gio/gcancellable.c index 86d00925..dedc3bd8 100644 --- a/gio/gcancellable.c +++ b/gio/gcancellable.c @@ -59,6 +59,10 @@ struct _GCancellable guint cancelled : 1; guint allocated_pipe : 1; int cancel_pipe[2]; + +#ifdef G_OS_WIN32 + GIOChannel *read_channel; +#endif }; static guint signals[LAST_SIGNAL] = { 0 }; @@ -79,6 +83,11 @@ g_cancellable_finalize (GObject *object) if (cancellable->cancel_pipe[1] != -1) close (cancellable->cancel_pipe[1]); +#ifdef G_OS_WIN32 + if (cancellable->read_channel) + g_io_channel_unref (cancellable->read_channel); +#endif + G_OBJECT_CLASS (g_cancellable_parent_class)->finalize (object); } @@ -304,6 +313,15 @@ g_cancellable_reset (GCancellable *cancellable) if (cancellable->cancelled) { char ch; +#ifdef G_OS_WIN32 + if (cancellable->read_channel) + { + gsize bytes_read; + g_io_channel_read_chars (cancellable->read_channel, &ch, 1, + &bytes_read, NULL); + } + else +#endif if (cancellable->cancel_pipe[0] != -1) read (cancellable->cancel_pipe[0], &ch, 1); cancellable->cancelled = FALSE; @@ -359,7 +377,9 @@ g_cancellable_set_error_if_cancelled (GCancellable *cancellable, * Gets the file descriptor for a cancellable job. This can be used to * implement cancellable operations on Unix systems. The returned fd will * turn readable when @cancellable is cancelled. - * + * + * See also g_cancellable_make_pollfd(). + * * Returns: A valid file descriptor. %-1 if the file descriptor * is not supported, or on errors. **/ @@ -383,6 +403,41 @@ g_cancellable_get_fd (GCancellable *cancellable) return fd; } +/** + * g_cancellable_make_pollfd: + * @cancellable: a #GCancellable. + * @pollfd: a pointer to a #GPollFD + * + * Creates a #GPollFD corresponding to @cancellable; this can be passed + * to g_poll() and used to poll for cancellation. + **/ +void +g_cancellable_make_pollfd (GCancellable *cancellable, GPollFD *pollfd) +{ + g_return_if_fail (G_IS_CANCELLABLE (cancellable)); + g_return_if_fail (pollfd != NULL); + +#ifdef G_OS_WIN32 + if (!cancellable->read_channel) + { + int fd = g_cancellable_get_fd (cancellable); + cancellable->read_channel = g_io_channel_win32_new_fd (fd); + g_io_channel_set_buffered (cancellable->read_channel, FALSE); + g_io_channel_set_flags (cancellable->read_channel, + G_IO_FLAG_NONBLOCK, NULL); + g_io_channel_set_encoding (cancellable->read_channel, NULL, NULL); + } + g_io_channel_win32_make_pollfd (cancellable->read_channel, G_IO_IN, pollfd); + /* (We need to keep cancellable->read_channel around, because it's + * keeping track of state related to the pollfd.) + */ +#else /* !G_OS_WIN32 */ + pollfd->fd = g_cancellable_get_fd (cancellable); + pollfd->events = G_IO_IN; +#endif /* G_OS_WIN32 */ + pollfd->revents = 0; +} + /** * g_cancellable_cancel: * @cancellable: a #GCancellable object. diff --git a/gio/gcancellable.h b/gio/gcancellable.h index 364311f2..c327fe7e 100644 --- a/gio/gcancellable.h +++ b/gio/gcancellable.h @@ -68,7 +68,11 @@ GCancellable *g_cancellable_new (void); gboolean g_cancellable_is_cancelled (GCancellable *cancellable); gboolean g_cancellable_set_error_if_cancelled (GCancellable *cancellable, GError **error); + int g_cancellable_get_fd (GCancellable *cancellable); +void g_cancellable_make_pollfd (GCancellable *cancellable, + GPollFD *pollfd); + GCancellable *g_cancellable_get_current (void); void g_cancellable_push_current (GCancellable *cancellable); void g_cancellable_pop_current (GCancellable *cancellable); diff --git a/gio/gio.symbols b/gio/gio.symbols index ca20327b..85a30b1c 100644 --- a/gio/gio.symbols +++ b/gio/gio.symbols @@ -124,6 +124,7 @@ g_cancellable_new g_cancellable_is_cancelled g_cancellable_set_error_if_cancelled g_cancellable_get_fd +g_cancellable_make_pollfd g_cancellable_get_current g_cancellable_push_current g_cancellable_pop_current diff --git a/gio/gunixinputstream.c b/gio/gunixinputstream.c index 98280cdf..155b9e68 100644 --- a/gio/gunixinputstream.c +++ b/gio/gunixinputstream.c @@ -28,7 +28,6 @@ #include #include #include -#include #include #include @@ -173,23 +172,18 @@ g_unix_input_stream_read (GInputStream *stream, { GUnixInputStream *unix_stream; gssize res; - struct pollfd poll_fds[2]; + GPollFD poll_fds[2]; int poll_ret; - int cancel_fd; unix_stream = G_UNIX_INPUT_STREAM (stream); - cancel_fd = g_cancellable_get_fd (cancellable); - if (cancel_fd != -1) + if (cancellable) { + poll_fds[0].fd = unix_stream->priv->fd; + poll_fds[0].events = G_IO_IN; + g_cancellable_make_pollfd (cancellable, &poll_fds[1]); do - { - poll_fds[0].events = POLLIN; - poll_fds[0].fd = unix_stream->priv->fd; - poll_fds[1].events = POLLIN; - poll_fds[1].fd = cancel_fd; - poll_ret = poll (poll_fds, 2, -1); - } + poll_ret = g_poll (poll_fds, 2, -1); while (poll_ret == -1 && errno == EINTR); if (poll_ret == -1) @@ -346,7 +340,7 @@ g_unix_input_stream_read_async (GInputStream *stream, data->stream = unix_stream; source = _g_fd_source_new (unix_stream->priv->fd, - POLLIN, + G_IO_IN, cancellable); g_source_set_callback (source, (GSourceFunc)read_async_cb, data, g_free); diff --git a/gio/gunixoutputstream.c b/gio/gunixoutputstream.c index 8a26b0e1..ebf17ded 100644 --- a/gio/gunixoutputstream.c +++ b/gio/gunixoutputstream.c @@ -28,7 +28,6 @@ #include #include #include -#include #include #include @@ -161,23 +160,18 @@ g_unix_output_stream_write (GOutputStream *stream, { GUnixOutputStream *unix_stream; gssize res; - struct pollfd poll_fds[2]; + GPollFD poll_fds[2]; int poll_ret; - int cancel_fd; unix_stream = G_UNIX_OUTPUT_STREAM (stream); - cancel_fd = g_cancellable_get_fd (cancellable); - if (cancel_fd != -1) + if (cancellable) { + poll_fds[0].fd = unix_stream->priv->fd; + poll_fds[0].events = G_IO_OUT; + g_cancellable_make_pollfd (cancellable, &poll_fds[1]); do - { - poll_fds[0].events = POLLOUT; - poll_fds[0].fd = unix_stream->priv->fd; - poll_fds[1].events = POLLIN; - poll_fds[1].fd = cancel_fd; - poll_ret = poll (poll_fds, 2, -1); - } + poll_ret = g_poll (poll_fds, 2, -1); while (poll_ret == -1 && errno == EINTR); if (poll_ret == -1) @@ -335,7 +329,7 @@ g_unix_output_stream_write_async (GOutputStream *stream, data->stream = unix_stream; source = _g_fd_source_new (unix_stream->priv->fd, - POLLOUT, + G_IO_OUT, cancellable); g_source_set_callback (source, (GSourceFunc)write_async_cb, data, g_free); diff --git a/gio/pltcheck.sh b/gio/pltcheck.sh index 181d129b..ff3323bb 100755 --- a/gio/pltcheck.sh +++ b/gio/pltcheck.sh @@ -9,7 +9,7 @@ if ! which readelf 2>/dev/null >/dev/null; then exit 0 fi -SKIP='\ +#include +#include +#include +#include +#include +#include +#include + +#define DATA "abcdefghijklmnopqrstuvwxyz" + +int writer_pipe[2], reader_pipe[2]; +GCancellable *writer_cancel, *reader_cancel, *main_cancel; +GMainLoop *loop; + +static gpointer +writer_thread (gpointer user_data) +{ + GOutputStream *out; + gssize nwrote, offset; + GError *err = NULL; + + out = g_unix_output_stream_new (writer_pipe[1], TRUE); + + do + { + g_usleep (10); + + offset = 0; + while (offset < sizeof (DATA)) + { + nwrote = g_output_stream_write (out, DATA + offset, + sizeof (DATA) - offset, + writer_cancel, &err); + if (nwrote <= 0 || err != NULL) + break; + offset += nwrote; + } + + g_assert (nwrote > 0 || err != NULL); + } + while (err == NULL); + + if (g_cancellable_is_cancelled (writer_cancel)) + { + g_cancellable_cancel (main_cancel); + g_object_unref (out); + return NULL; + } + + g_warning ("writer: %s", err->message); + g_assert_not_reached (); +} + +static gpointer +reader_thread (gpointer user_data) +{ + GInputStream *in; + gssize nread, total; + GError *err = NULL; + char buf[sizeof (DATA)]; + + in = g_unix_input_stream_new (reader_pipe[0], TRUE); + + do + { + total = 0; + while (total < sizeof (DATA)) + { + nread = g_input_stream_read (in, buf + total, sizeof (buf) - total, + reader_cancel, &err); + if (nread <= 0 || err != NULL) + break; + total += nread; + } + + if (err) + break; + + if (nread == 0) + { + g_assert (err == NULL); + /* pipe closed */ + g_object_unref (in); + return NULL; + } + + g_assert_cmpstr (buf, ==, DATA); + g_assert (!g_cancellable_is_cancelled (reader_cancel)); + } + while (err == NULL); + + g_warning ("reader: %s", err->message); + g_assert_not_reached (); +} + +char main_buf[sizeof (DATA)]; +gssize main_len, main_offset; + +static void readable (GObject *source, GAsyncResult *res, gpointer user_data); +static void writable (GObject *source, GAsyncResult *res, gpointer user_data); + +static void +do_main_cancel (GOutputStream *out) +{ + g_output_stream_close (out, NULL, NULL); + g_main_loop_quit (loop); +} + +static void +readable (GObject *source, GAsyncResult *res, gpointer user_data) +{ + GInputStream *in = G_INPUT_STREAM (source); + GOutputStream *out = user_data; + GError *err = NULL; + + main_len = g_input_stream_read_finish (in, res, &err); + + if (g_cancellable_is_cancelled (main_cancel)) + { + do_main_cancel (out); + return; + } + + g_assert (err == NULL); + + main_offset = 0; + g_output_stream_write_async (out, main_buf, main_len, + G_PRIORITY_DEFAULT, main_cancel, + writable, in); +} + +static void +writable (GObject *source, GAsyncResult *res, gpointer user_data) +{ + GOutputStream *out = G_OUTPUT_STREAM (source); + GInputStream *in = user_data; + GError *err = NULL; + gssize nwrote; + + nwrote = g_output_stream_write_finish (out, res, &err); + + if (g_cancellable_is_cancelled (main_cancel)) + { + do_main_cancel (out); + return; + } + + g_assert (err == NULL); + g_assert_cmpint (nwrote, <=, main_len - main_offset); + + main_offset += nwrote; + if (main_offset == main_len) + { + g_input_stream_read_async (in, main_buf, sizeof (main_buf), + G_PRIORITY_DEFAULT, main_cancel, + readable, out); + } + else + { + g_output_stream_write_async (out, main_buf + main_offset, + main_len - main_offset, + G_PRIORITY_DEFAULT, main_cancel, + writable, in); + } +} + +static gboolean +timeout (gpointer cancellable) +{ + g_cancellable_cancel (cancellable); + return FALSE; +} + +static void +test_pipe_io (void) +{ + GThread *writer, *reader; + GInputStream *in; + GOutputStream *out; + + /* Split off two (additional) threads, a reader and a writer. From + * the writer thread, write data synchronously in small chunks, + * which gets read asynchronously by the main thread and then + * written asynchronously to the reader thread, which reads it + * synchronously. Eventually a timeout in the main thread will cause + * it to cancel the writer thread, which will in turn cancel the + * read op in the main thread, which will then close the pipe to + * the reader thread, causing the read op to fail. + */ + + g_assert (pipe (writer_pipe) == 0 && pipe (reader_pipe) == 0); + + writer_cancel = g_cancellable_new (); + reader_cancel = g_cancellable_new (); + main_cancel = g_cancellable_new (); + + writer = g_thread_create (writer_thread, NULL, TRUE, NULL); + reader = g_thread_create (reader_thread, NULL, TRUE, NULL); + + in = g_unix_input_stream_new (writer_pipe[0], TRUE); + out = g_unix_output_stream_new (reader_pipe[1], TRUE); + + g_input_stream_read_async (in, main_buf, sizeof (main_buf), + G_PRIORITY_DEFAULT, main_cancel, + readable, out); + + g_timeout_add (500, timeout, writer_cancel); + + loop = g_main_loop_new (NULL, TRUE); + g_main_loop_run (loop); + g_main_loop_unref (loop); + + g_thread_join (reader); + g_thread_join (writer); + + g_object_unref (main_cancel); + g_object_unref (reader_cancel); + g_object_unref (writer_cancel); + g_object_unref (in); + g_object_unref (out); +} + +int +main (int argc, + char *argv[]) +{ + g_thread_init (NULL); + g_type_init (); + g_test_init (&argc, &argv, NULL); + + g_test_add_func ("/unix-streams/pipe-io-test", test_pipe_io); + + return g_test_run(); +} -- 2.34.1