From 053f9e72b12b9b5ab5571da9f4cd0b9b13f41e62 Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Wed, 20 May 2009 11:19:47 +0200 Subject: [PATCH] Add support for graceful disconnect to GTcpConnection --- docs/reference/gio/gio-sections.txt | 2 + gio/gio.symbols | 2 + gio/gtcpconnection.c | 331 ++++++++++++++++++++++++++++ gio/gtcpconnection.h | 6 +- gio/tests/send-data.c | 49 +++- 5 files changed, 386 insertions(+), 4 deletions(-) diff --git a/docs/reference/gio/gio-sections.txt b/docs/reference/gio/gio-sections.txt index 581eda6b..e2fce8fb 100644 --- a/docs/reference/gio/gio-sections.txt +++ b/docs/reference/gio/gio-sections.txt @@ -1722,6 +1722,8 @@ g_socket_connection_get_remote_address g_socket_connection_get_socket GTcpConnection +g_tcp_connection_set_graceful_disconnect +g_tcp_connection_get_graceful_disconnect GUnixConnection g_unix_connection_receive_fd diff --git a/gio/gio.symbols b/gio/gio.symbols index 6cf24930..7db9b5c2 100644 --- a/gio/gio.symbols +++ b/gio/gio.symbols @@ -1182,6 +1182,8 @@ g_threaded_socket_service_new #if IN_HEADER(__G_TCP_CONNECTION_H__) #if IN_FILE(__G_TCP_CONNECTION_C__) g_tcp_connection_get_type G_GNUC_CONST +g_tcp_connection_set_graceful_disconnect +g_tcp_connection_get_graceful_disconnect #endif #endif diff --git a/gio/gtcpconnection.c b/gio/gtcpconnection.c index bd1c4110..6c4e320b 100644 --- a/gio/gtcpconnection.c +++ b/gio/gtcpconnection.c @@ -29,6 +29,9 @@ #include "config.h" #include "gtcpconnection.h" +#include "gasyncresult.h" +#include "gsimpleasyncresult.h" +#include "giostream.h" #include "glibintl.h" #include "gioalias.h" @@ -53,15 +56,343 @@ G_DEFINE_TYPE_WITH_CODE (GTcpConnection, g_tcp_connection, g_socket_protocol_id_lookup_by_name ("tcp")); ); +static gboolean g_tcp_connection_close (GIOStream *stream, + GCancellable *cancellable, + GError **error); +static void g_tcp_connection_close_async (GIOStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); + +struct _GTcpConnectionPrivate +{ + guint graceful_disconnect : 1; +}; + + +enum +{ + PROP_0, + PROP_GRACEFUL_DISCONNECT +}; + static void g_tcp_connection_init (GTcpConnection *connection) { + connection->priv = G_TYPE_INSTANCE_GET_PRIVATE (connection, + G_TYPE_TCP_CONNECTION, + GTcpConnectionPrivate); + connection->priv->graceful_disconnect = FALSE; +} + +static void +g_tcp_connection_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + GTcpConnection *connection = G_TCP_CONNECTION (object); + + switch (prop_id) + { + case PROP_GRACEFUL_DISCONNECT: + g_value_set_boolean (value, connection->priv->graceful_disconnect); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + } +} + +static void +g_tcp_connection_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + GTcpConnection *connection = G_TCP_CONNECTION (object); + + switch (prop_id) + { + case PROP_GRACEFUL_DISCONNECT: + g_tcp_connection_set_graceful_disconnect (connection, + g_value_get_boolean (value)); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + } } static void g_tcp_connection_class_init (GTcpConnectionClass *class) { + GObjectClass *gobject_class = G_OBJECT_CLASS (class); + GIOStreamClass *stream_class = G_IO_STREAM_CLASS (class); + + g_type_class_add_private (class, sizeof (GTcpConnectionPrivate)); + + gobject_class->set_property = g_tcp_connection_set_property; + gobject_class->get_property = g_tcp_connection_get_property; + + stream_class->close_fn = g_tcp_connection_close; + stream_class->close_async = g_tcp_connection_close_async; + + g_object_class_install_property (gobject_class, PROP_GRACEFUL_DISCONNECT, + g_param_spec_boolean ("graceful-disconnect", + P_("Graceful Disconnect"), + P_("Whether or not close does a graceful disconnect"), + FALSE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + +} + +static gboolean +g_tcp_connection_close (GIOStream *stream, + GCancellable *cancellable, + GError **error) +{ + GTcpConnection *connection = G_TCP_CONNECTION (stream); + GSocket *socket; + char buffer[1024]; + gssize ret; + GError *my_error; + gboolean had_error; + + socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (stream)); + had_error = FALSE; + + if (connection->priv->graceful_disconnect && + !g_cancellable_is_cancelled (cancellable) /* Cancelled -> close fast */) + { + if (!g_socket_shutdown (socket, FALSE, TRUE, error)) + { + error = NULL; /* Ignore further errors */ + had_error = TRUE; + } + else + { + while (TRUE) + { + if (!g_socket_condition_wait (socket, + G_IO_IN, cancellable, error)) + { + had_error = TRUE; + error = NULL; + break; + } + + my_error = NULL; + ret = g_socket_receive (socket, buffer, sizeof (buffer), + &my_error); + if (ret < 0) + { + if (g_error_matches (my_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) + g_error_free (my_error); + else + { + had_error = TRUE; + g_propagate_error (error, my_error); + error = NULL; + break; + } + } + if (ret == 0) + break; + } + } + } + + return G_IO_STREAM_CLASS (g_tcp_connection_parent_class) + ->close_fn (stream, cancellable, error) && !had_error; +} + +typedef struct { + GSimpleAsyncResult *res; + GCancellable *cancellable; +} CloseAsyncData; + +static void +close_async_data_free (CloseAsyncData *data) +{ + g_object_unref (data->res); + if (data->cancellable) + g_object_unref (data->cancellable); + g_free (data); } +static void +async_close_finish (CloseAsyncData *data, GError *error, gboolean in_mainloop) +{ + GIOStreamClass *parent = G_IO_STREAM_CLASS (g_tcp_connection_parent_class); + GIOStream *stream; + GError *my_error; + + stream = G_IO_STREAM (g_async_result_get_source_object (G_ASYNC_RESULT (data->res))); + + /* Doesn't block, ignore error */ + if (error) + { + parent->close_fn (stream, data->cancellable, NULL); + g_simple_async_result_set_from_error (data->res, error); + } + else + { + my_error = NULL; + parent->close_fn (stream, data->cancellable, &my_error); + if (my_error) + { + g_simple_async_result_set_from_error (data->res, my_error); + g_error_free (my_error); + } + } + + if (in_mainloop) + g_simple_async_result_complete (data->res); + else + g_simple_async_result_complete_in_idle (data->res); +} + +static gboolean +close_read_ready (GSocket *socket, + GIOCondition condition, + CloseAsyncData *data) +{ + GError *error = NULL; + char buffer[1024]; + gssize ret; + + if (g_cancellable_set_error_if_cancelled (data->cancellable, + &error)) + { + async_close_finish (data, error, TRUE); + g_error_free (error); + return FALSE; + } + + ret = g_socket_receive (socket, buffer, sizeof (buffer), &error); + if (ret < 0) + { + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) + g_error_free (error); + else + { + async_close_finish (data, error, TRUE); + g_error_free (error); + return FALSE; + } + } + + if (ret == 0) + { + async_close_finish (data, NULL, TRUE); + return FALSE; + } + + return TRUE; +} + + +static void +g_tcp_connection_close_async (GIOStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GTcpConnection *connection = G_TCP_CONNECTION (stream); + CloseAsyncData *data; + GSocket *socket; + GSource *source; + GError *error; + + if (connection->priv->graceful_disconnect && + !g_cancellable_is_cancelled (cancellable) /* Cancelled -> close fast */) + { + data = g_new (CloseAsyncData, 1); + data->res = + g_simple_async_result_new (G_OBJECT (stream), callback, user_data, + g_tcp_connection_close_async); + if (cancellable) + data->cancellable = g_object_ref (cancellable); + else + data->cancellable = NULL; + + socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (stream)); + + error = NULL; + if (!g_socket_shutdown (socket, FALSE, TRUE, &error)) + { + async_close_finish (data, error, FALSE); + g_error_free (error); + close_async_data_free (data); + return; + } + + source = g_socket_create_source (socket, G_IO_IN, cancellable); + g_source_set_callback (source, + (GSourceFunc) close_read_ready, + data, (GDestroyNotify)close_async_data_free); + g_source_attach (source, NULL); + g_source_unref (source); + + return; + } + + out: + return G_IO_STREAM_CLASS (g_tcp_connection_parent_class) + ->close_async (stream, io_priority, cancellable, callback, user_data); + + +} + +/** + * g_tcp_connection_set_graceful_disconnect: + * @connection: a #GTcpConnection + * @graceful_disconnect: Whether to do graceful disconnects or not + * + * This enabled graceful disconnects on close. A graceful disconnect + * means that we signal the recieving end that the connection is terminated + * and wait for it to close the connection before closing the connection. + * + * A graceful disconnect means that we can be sure that we successfully sent + * all the outstanding data to the other end, or get an error reported. + * However, it also means we have to wait for all the data to reach the + * other side and for it to acknowledge this by closing the socket, which may + * take a while. For this reason it is disabled by default. + * + * Since: 2.22 + **/ +void +g_tcp_connection_set_graceful_disconnect (GTcpConnection *connection, + gboolean graceful_disconnect) +{ + graceful_disconnect = !!graceful_disconnect; + if (graceful_disconnect != connection->priv->graceful_disconnect) + { + connection->priv->graceful_disconnect = graceful_disconnect; + g_object_notify (G_OBJECT (connection), "graceful-disconnect"); + } +} + +/** + * g_tcp_connection_get_graceful_disconnect: + * @connection: a #GTcpConnection + * + * Checks if graceful disconnects are used. See + * g_tcp_connection_set_graceful_disconnect(). + * + * Returns: %TRUE if graceful disconnect is used on close, %FALSE otherwise + * + * Since: 2.22 + **/ +gboolean +g_tcp_connection_get_graceful_disconnect (GTcpConnection *connection) +{ + return connection->priv->graceful_disconnect; +} + + #define __G_TCP_CONNECTION_C__ #include "gioaliasdef.c" diff --git a/gio/gtcpconnection.h b/gio/gtcpconnection.h index a9eed5d1..3928825a 100644 --- a/gio/gtcpconnection.h +++ b/gio/gtcpconnection.h @@ -57,7 +57,11 @@ struct _GTcpConnection GTcpConnectionPrivate *priv; }; -GType g_tcp_connection_get_type (void); +GType g_tcp_connection_get_type (void) G_GNUC_CONST; + +void g_tcp_connection_set_graceful_disconnect (GTcpConnection *connection, + gboolean graceful_disconnect); +gboolean g_tcp_connection_get_graceful_disconnect (GTcpConnection *connection); G_END_DECLS diff --git a/gio/tests/send-data.c b/gio/tests/send-data.c index 41eed5fb..d2a702b4 100644 --- a/gio/tests/send-data.c +++ b/gio/tests/send-data.c @@ -2,10 +2,18 @@ #include #include +GMainLoop *loop; + int cancel_timeout = 0; +gboolean async = FALSE; +gboolean graceful = FALSE; static GOptionEntry cmd_entries[] = { {"cancel", 'c', 0, G_OPTION_ARG_INT, &cancel_timeout, "Cancel any op after the specified amount of seconds", NULL}, + {"async", 'a', 0, G_OPTION_ARG_NONE, &async, + "Use async ops", NULL}, + {"graceful-disconnect", 'g', 0, G_OPTION_ARG_NONE, &graceful, + "Use graceful disconnect", NULL}, {NULL} }; @@ -35,6 +43,17 @@ socket_address_to_string (GSocketAddress *address) return res; } +static void +async_cb (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GAsyncResult **resp = user_data; + *resp = g_object_ref (res); + g_main_loop_quit (loop); +} + + int main (int argc, char *argv[]) { @@ -64,6 +83,9 @@ main (int argc, char *argv[]) return 1; } + if (async) + loop = g_main_loop_new (NULL, FALSE); + if (cancel_timeout) { cancellable = g_cancellable_new (); @@ -96,6 +118,9 @@ main (int argc, char *argv[]) socket_address_to_string (address)); g_object_unref (address); + if (graceful) + g_tcp_connection_set_graceful_disconnect (G_TCP_CONNECTION (connection), TRUE); + out = g_io_stream_get_output_stream (G_IO_STREAM (connection)); while (fgets(buffer, sizeof (buffer), stdin) != NULL) @@ -110,10 +135,28 @@ main (int argc, char *argv[]) } g_print ("closing stream\n"); - if (!g_io_stream_close (G_IO_STREAM (connection), cancellable, &error)) + if (async) { - g_warning ("close error: %s\n", error->message); - return 1; + GAsyncResult *res; + g_io_stream_close_async (G_IO_STREAM (connection), + 0, cancellable, async_cb, &res); + g_main_loop_run (loop); + if (!g_io_stream_close_finish (G_IO_STREAM (connection), + res, &error)) + { + g_object_unref (res); + g_warning ("close error: %s\n", error->message); + return 1; + } + g_object_unref (res); + } + else + { + if (!g_io_stream_close (G_IO_STREAM (connection), cancellable, &error)) + { + g_warning ("close error: %s\n", error->message); + return 1; + } } return 0; -- 2.34.1