[Pkg-telepathy-commits] [libnice] 100/265: outputstream: Make the blocking send thread safe

Simon McVittie smcv at debian.org
Wed May 14 12:04:57 UTC 2014


This is an automated email from the git hooks/post-receive script.

smcv pushed a commit to branch debian
in repository libnice.

commit 3da6767e230e55006e7ece6b3b241057d36cfdcf
Author: Olivier Crête <olivier.crete at collabora.com>
Date:   Thu Jan 23 21:03:46 2014 -0500

    outputstream: Make the blocking send thread safe
    
    There was a possible race between disconnection and freeing of the WriteDatas
    structure, now it's ref-counted so it can never happen. Also set the len to -1
    if the cancellable was cancelled.
---
 agent/outputstream.c | 76 ++++++++++++++++++++++++++++++++++------------------
 1 file changed, 50 insertions(+), 26 deletions(-)

diff --git a/agent/outputstream.c b/agent/outputstream.c
index 839c36c..2025247 100644
--- a/agent/outputstream.c
+++ b/agent/outputstream.c
@@ -300,12 +300,25 @@ nice_output_stream_new (NiceAgent *agent, guint stream_id, guint component_id)
 }
 
 typedef struct {
+  volatile gint ref_count;
+
   GCond cond;
   GMutex mutex;
-  GError **error;
+  GError *error;
 } WriteData;
 
 static void
+write_data_unref (WriteData *write_data)
+{
+  if (g_atomic_int_dec_and_test (&write_data->ref_count)) {
+    g_cond_clear (&write_data->cond);
+    g_mutex_clear (&write_data->mutex);
+    g_clear_error (&write_data->error);
+    g_slice_free (WriteData, write_data);
+  }
+}
+
+static void
 write_cancelled_cb (GCancellable *cancellable, gpointer user_data)
 {
   WriteData *write_data = user_data;
@@ -314,7 +327,7 @@ write_cancelled_cb (GCancellable *cancellable, gpointer user_data)
   g_cond_broadcast (&write_data->cond);
   g_mutex_unlock (&write_data->mutex);
 
-  g_cancellable_set_error_if_cancelled (cancellable, write_data->error);
+  g_cancellable_set_error_if_cancelled (cancellable, &write_data->error);
 }
 
 static void
@@ -337,13 +350,13 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
   GError *child_error = NULL;
   NiceAgent *agent = NULL;  /* owned */
   gulong cancel_id = 0, writeable_id;
-  WriteData write_data;
+  WriteData *write_data;
 
   /* Closed streams are not writeable. */
   if (g_output_stream_is_closed (stream)) {
     g_set_error_literal (&child_error, G_IO_ERROR, G_IO_ERROR_CLOSED,
         "Stream is closed.");
-    goto done;
+    return -1;
   }
 
   /* Has the agent disappeared? */
@@ -351,32 +364,39 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
   if (agent == NULL) {
     g_set_error_literal (&child_error, G_IO_ERROR, G_IO_ERROR_CLOSED,
         "Stream is closed due to the NiceAgent being finalised.");
-    goto done;
+    return -1;
   }
 
-  if (count == 0)
+  if (count == 0) {
+    g_object_unref (agent);
     return 0;
+  }
 
   /* FIXME: nice_agent_send_full() is non-blocking, which is a bit unexpected
    * since nice_agent_recv() is blocking. Currently this uses a fairly dodgy
    * GCond solution; would be much better for nice_agent_send() to block
    * properly in the main loop. */
   len = 0;
-  write_data.error = &child_error;
+  write_data = g_slice_new0 (WriteData);
+  g_atomic_int_set (&write_data->ref_count, 3);
+  write_data->error = NULL;
 
-  g_mutex_init (&write_data.mutex);
-  g_cond_init (&write_data.cond);
+  g_mutex_init (&write_data->mutex);
+  g_cond_init (&write_data->cond);
 
   if (cancellable != NULL) {
     cancel_id = g_cancellable_connect (cancellable,
-        (GCallback) write_cancelled_cb, &write_data, NULL);
+        (GCallback) write_cancelled_cb, write_data,
+        (GDestroyNotify) write_data_unref);
   }
 
-  writeable_id = g_signal_connect (G_OBJECT (agent),
+  g_mutex_lock (&write_data->mutex);
+
+  writeable_id = g_signal_connect_data (G_OBJECT (agent),
       "reliable-transport-writable",
-      (GCallback) reliable_transport_writeable_cb, &write_data);
+      (GCallback) reliable_transport_writeable_cb, write_data,
+      (GClosureNotify) write_data_unref, 0);
 
-  g_mutex_lock (&write_data.mutex);
 
   do {
     _len = nice_agent_send_full (agent, self->priv->stream_id,
@@ -387,7 +407,7 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
         g_error_matches (child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
       /* EWOULDBLOCK. */
       g_clear_error (&child_error);
-      g_cond_wait (&write_data.cond, &write_data.mutex);
+      g_cond_wait (&write_data->cond, &write_data->mutex);
     } else if (_len > 0) {
       /* Success. */
       len += _len;
@@ -398,26 +418,30 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
     }
   } while ((gsize) len < count);
 
-  g_mutex_unlock (&write_data.mutex);
-
   g_signal_handler_disconnect (G_OBJECT (agent), writeable_id);
+  g_mutex_unlock (&write_data->mutex);
 
-  if (cancellable != NULL)
+  if (cancellable != NULL) {
     g_cancellable_disconnect (cancellable, cancel_id);
+    /* If we were cancelled, but we have no other errors can couldn't write
+     * anything, return the cancellation error. If we could write
+     * something partial, there is no error.
+     */
+    if (write_data->error && !child_error && len == 0) {
+      g_propagate_error (&child_error, write_data->error);
+      len = -1;
+    }
+  }
 
-  g_cond_clear (&write_data.cond);
-  g_mutex_clear (&write_data.mutex);
-
-done:
-  if (agent != NULL)
-    g_object_unref (agent);
+  write_data_unref (write_data);
 
   g_assert ((child_error != NULL) == (len == -1));
-  g_assert (len != 0);
-
-  if (child_error != NULL)
+  if (child_error)
     g_propagate_error (error, child_error);
 
+  g_object_unref (agent);
+  g_assert (len != 0);
+
   return len;
 }
 

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-telepathy/libnice.git



More information about the Pkg-telepathy-commits mailing list