commit 25cb6b442cc37b8e6056cc837233523d7572736b Author: Hans Petter Jansson Date: Thu Jan 13 18:04:58 2011 +0100 Patch 8: gvfs-dav-recursive-directory-ops.patch diff --git a/daemon/Makefile.am b/daemon/Makefile.am index 39369b4..767eeb6 100644 --- a/daemon/Makefile.am +++ b/daemon/Makefile.am @@ -430,6 +430,7 @@ gvfsd_gphoto2_LDADD = $(libraries) $(GPHOTO2_LIBS) $(HAL_LIBS) endif gvfsd_http_SOURCES = \ + gmempipe.c gmempipe.h \ soup-input-stream.c soup-input-stream.h \ soup-output-stream.c soup-output-stream.h \ gvfsbackendhttp.c gvfsbackendhttp.h \ @@ -474,6 +475,7 @@ gvfsd_nvvfs_LDADD = $(libraries) gvfsd_dav_SOURCES = \ + gmempipe.c gmempipe.h \ soup-input-stream.c soup-input-stream.h \ soup-output-stream.c soup-output-stream.h \ gvfsbackendhttp.c gvfsbackendhttp.h \ diff --git a/daemon/gmempipe.c b/daemon/gmempipe.c new file mode 100644 index 0000000..3ceebd9 --- /dev/null +++ b/daemon/gmempipe.c @@ -0,0 +1,716 @@ +/* GIO - GLib Input, Output and Streaming Library + * + * Copyright (C) Christian Kellner + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General + * Public License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place, Suite 330, + * Boston, MA 02111-1307, USA. + * + * Author: Christian Kellner + */ + +#include "gmempipe.h" + +#include + + + +G_DEFINE_TYPE (GMemPipe, g_mem_pipe, G_TYPE_OBJECT) + +/* TODO: Real P_() */ +#define P_(_x) (_x) + +static void g_mem_pipe_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec); + +static void g_mem_pipe_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec); + +typedef enum { + + GPD_NONE = 0x00, + GPD_READ = 0x01, /* reading from the pipe */ + GPD_WRITE = 0x02 /* writing to the pipe */ + +} GPipeDirection; + +enum +{ + PROP_0, + PROP_BUFFER_SIZE +}; + +struct _GMemPipePrivate +{ + + struct { + gsize size; + gsize dlen; + + char *data; + + } buffer; + + GMutex *lock; + GCond *cond; + + GPipeDirection closed; + GInputStream *input_stream; + GOutputStream *output_stream; +}; + +/* streaming classes declarations */ +typedef struct _GMemPipeInputStream GMemPipeInputStream; +typedef struct _GMemPipeOutputStream GMemPipeOutputStream; + +static GOutputStream *g_mem_pipe_output_stream_new (GMemPipe *mem_pipe); +static GInputStream *g_mem_pipe_input_stream_new (GMemPipe *mem_pipe); + +/* GMemPipe imlementation starts here*/ +static void +g_mem_pipe_finalize (GObject *object) +{ + GMemPipe *mem_pipe; + GMemPipePrivate *priv; + + mem_pipe = G_MEM_PIPE (object); + + priv = mem_pipe->priv; + + g_mutex_free (priv->lock); + g_cond_free (priv->cond); + + if (priv->buffer.size) + g_free (priv->buffer.data); + + /* FIXME: move those to dispose? */ + g_object_unref (priv->input_stream); + g_object_unref (priv->output_stream); + + if (G_OBJECT_CLASS (g_mem_pipe_parent_class)->finalize) + (*G_OBJECT_CLASS (g_mem_pipe_parent_class)->finalize) (object); +} + +static void +g_mem_pipe_class_init (GMemPipeClass *klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + + g_type_class_add_private (klass, sizeof (GMemPipePrivate)); + + gobject_class->finalize = g_mem_pipe_finalize; + gobject_class->set_property = g_mem_pipe_set_property; + gobject_class->get_property = g_mem_pipe_get_property; + + g_object_class_install_property (gobject_class, PROP_BUFFER_SIZE, + g_param_spec_uint ("buffer-size", + P_("Buffer size"), + P_("The size of the internal buffer"), + 0, + G_MAXUINT, + 0, + G_PARAM_CONSTRUCT_ONLY | + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + +} + +static void +g_mem_pipe_init (GMemPipe *mem_pipe) +{ + GMemPipePrivate *priv; + mem_pipe->priv = priv = G_TYPE_INSTANCE_GET_PRIVATE (mem_pipe, + G_TYPE_MEM_PIPE, + GMemPipePrivate); + + priv->cond = g_cond_new (); + priv->lock = g_mutex_new (); + priv->input_stream = g_mem_pipe_input_stream_new (mem_pipe); + priv->output_stream = g_mem_pipe_output_stream_new (mem_pipe); +} + +static void +g_mem_pipe_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + GMemPipe *mem_pipe = G_MEM_PIPE (object); + + switch (prop_id) + { + case PROP_BUFFER_SIZE: + g_warn_if_fail (mem_pipe->priv->buffer.dlen == 0); + g_value_set_uint (value, mem_pipe->priv->buffer.size); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + } +} + +static void +g_mem_pipe_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + GMemPipe *mem_pipe = G_MEM_PIPE (object); + + switch (prop_id) + { + case PROP_BUFFER_SIZE: + mem_pipe->priv->buffer.size = g_value_get_uint (value); + mem_pipe->priv->buffer.data = g_realloc (mem_pipe->priv->buffer.data, + mem_pipe->priv->buffer.size); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + } +} + +static void +signal_cancellation (GCancellable *cancellable, + gpointer user_data) +{ + GMemPipe *mem_pipe; + GMemPipePrivate *priv; + + mem_pipe = G_MEM_PIPE (user_data); + priv = mem_pipe->priv; + + g_mutex_lock (priv->lock); + g_cond_signal (priv->cond); + g_mutex_unlock (priv->lock); +} + +static guint +_g_mem_pipe_cancellable_connect (GMemPipe *mem_pipe, + GCancellable *cancellable) +{ + guint handler_id; + + if (cancellable == NULL) + return 0; + + handler_id = g_cancellable_connect (cancellable, + G_CALLBACK (signal_cancellation), + mem_pipe, + NULL); + + return handler_id; +} + +/* half open means write-end closed and read-end still open; + * NB there is no need to special case the inverse of it + * since writing to a pipe where the read is already closed + * makes no sense at all (this is a SIGPIPE in unix land) + */ +#define _g_mem_pipe_is_half_open(_pipe) _pipe->priv->closed & GPD_WRITE && \ + ~_pipe->priv->closed & GPD_READ +static gboolean +_g_mem_pipe_check_is_open (GMemPipe *mem_pipe, + GPipeDirection direction, + GError **error) +{ + gboolean res; + GMemPipePrivate *priv = mem_pipe->priv; + + res = priv->closed == 0; + + if (res == FALSE && direction & GPD_READ) + res = _g_mem_pipe_is_half_open (mem_pipe); + + if (res == FALSE) + g_set_error_literal (error, + G_IO_ERROR, + G_IO_ERROR_CLOSED, + "Pipe closed"); + return res; +} + +/* LOCKED: Must be called with the LOCK held! + * + * Checks if the pipe is ready for IO. Will wait + * if not. + * + * NB: Can only check for GPD_WIRTE *or* GPD_READ! + * + * Returns: The number of data one can read/write + * to the pipe; -1 in case of an error and 0 iff + * the pipe write end got closed and there is no + * data left in the pipe to read from. + */ +static gssize +_g_mem_pipe_poll (GMemPipe *mem_pipe, + GPipeDirection direction, + GCancellable *cancellable, + GError **error) +{ + GMemPipePrivate *priv = mem_pipe->priv; + gssize res = -1; + + while (! g_cancellable_set_error_if_cancelled (cancellable, error)) + { + if (! _g_mem_pipe_check_is_open (mem_pipe, direction, error)) + break; + + /* First check if we have to wait at all */ + if (direction & GPD_WRITE) + { + res = priv->buffer.size - priv->buffer.dlen; + g_assert (res >= 0); + } + else /* ergo: direction & GPD_READ */ + { + res = priv->buffer.dlen; + + /* if the pipe is half open we must + never wait even in the case that + res == 0; so we directly return */ + if (_g_mem_pipe_is_half_open (mem_pipe)) + return res; + } + /* if we got some data (or error) there is no need to wait! */ + if (res != 0) + break; + + g_cond_wait (priv->cond, priv->lock); + + res = -1; /* reset */ + } + + return res; +} + +gssize +g_mem_pipe_read (GMemPipe *mem_pipe, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + GMemPipePrivate *priv; + gssize n; + gssize nread; + gboolean is_buffered; + gulong handler_id; + + g_return_val_if_fail (G_IS_MEM_PIPE (mem_pipe), -1); + priv = mem_pipe->priv; + + if (count == 0) + return 0; + + handler_id = _g_mem_pipe_cancellable_connect (mem_pipe, cancellable); + g_mutex_lock (priv->lock); + + is_buffered = (priv->buffer.size != 0); + if (is_buffered == FALSE) + { + priv->buffer.data = buffer; + priv->buffer.size = count; + + /* if the writer thread is waiting for + a place to write to, signal him that + now there is this very place */ + g_cond_signal (priv->cond); + } + + n = _g_mem_pipe_poll (mem_pipe, GPD_READ, cancellable, error); + + if (n > 0) + { + nread = MIN (count, n); + + if (is_buffered) + { + memcpy (buffer, priv->buffer.data, nread); + g_memmove (priv->buffer.data, priv->buffer.data + nread, nread); + priv->buffer.dlen -= nread; + + /* There should be room now in the buffer, + maybe get more data right now */ + g_cond_signal (priv->cond); + } + else + priv->buffer.dlen = priv->buffer.size = 0; + } + else + nread = n; + + g_mutex_unlock (priv->lock); + g_cancellable_disconnect (cancellable, handler_id); + + return nread; +} + +gssize +g_mem_pipe_write (GMemPipe *mem_pipe, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + GMemPipePrivate *priv; + gssize n; + gssize nwritten; + gulong handler_id; + + g_return_val_if_fail (G_IS_MEM_PIPE (mem_pipe), -1); + + if (count == 0) + return 0; + + priv = mem_pipe->priv; + handler_id = _g_mem_pipe_cancellable_connect (mem_pipe, cancellable); + + g_mutex_lock (priv->lock); + + n = _g_mem_pipe_poll (mem_pipe, GPD_WRITE, cancellable, error); + + if (n > 0) + { + nwritten = MIN (count, n); + memcpy (priv->buffer.data + priv->buffer.dlen, buffer, nwritten); + priv->buffer.dlen += nwritten; + + /* data is now available, wake up any sleeping thread */ + g_cond_signal (priv->cond); + } + else + nwritten = n; + + g_mutex_unlock (priv->lock); + g_cancellable_disconnect (cancellable, handler_id); + + return nwritten; +} + +gboolean +g_mem_pipe_write_all (GMemPipe *mem_pipe, + const void *buffer, + gsize count, + gsize *bytes_written, + GCancellable *cancellable, + GError **error) +{ + gssize n; + gsize nwritten; + gboolean res; + + g_return_val_if_fail (G_IS_MEM_PIPE (mem_pipe), FALSE); + + nwritten = 0; + res = TRUE; + + do + { + n = g_mem_pipe_write (mem_pipe, + buffer + nwritten, + count - nwritten, + cancellable, + error); + + if (n < 0) + break; + + nwritten += n; + + } + while (nwritten != count); + + if (bytes_written != NULL) + *bytes_written = nwritten; + + return nwritten == count; +} + +gboolean +g_mem_pipe_close_write (GMemPipe *mem_pipe, + GCancellable *cancellable, + GError **error) +{ + GMemPipePrivate *priv; + + g_return_val_if_fail (G_IS_MEM_PIPE (mem_pipe), FALSE); + + priv = mem_pipe->priv; + g_mutex_lock (priv->lock); + priv->closed |= GPD_WRITE; + g_cond_signal (priv->cond); + g_mutex_unlock (priv->lock); + + return TRUE; +} + +gboolean +g_mem_pipe_close_read (GMemPipe *mem_pipe, + GCancellable *cancellable, + GError **error) +{ + GMemPipePrivate *priv; + + g_return_val_if_fail (G_IS_MEM_PIPE (mem_pipe), FALSE); + + priv = mem_pipe->priv; + + g_mutex_lock (priv->lock); + priv->closed |= GPD_READ; + g_cond_signal (priv->cond); + g_mutex_unlock (priv->lock); + + return TRUE; +} + +GMemPipe * +g_mem_pipe_new () +{ + GMemPipe *mem_pipe; + + mem_pipe = g_object_new (G_TYPE_MEM_PIPE, NULL); + + return mem_pipe; +} + +GMemPipe * +g_mem_pipe_buffered_new (gsize buffer_size) +{ + GMemPipe *mem_pipe; + + mem_pipe = g_object_new (G_TYPE_MEM_PIPE, + "buffer-size", buffer_size, + NULL); + return mem_pipe; +} + +GInputStream * +g_mem_pipe_get_input_stream (GMemPipe *mem_pipe) +{ + GMemPipePrivate *priv; + + g_return_val_if_fail (G_IS_MEM_PIPE (mem_pipe), FALSE); + + priv = mem_pipe->priv; + return g_object_ref (priv->input_stream); +} + +GOutputStream * +g_mem_pipe_get_output_stream (GMemPipe *mem_pipe) +{ + GMemPipePrivate *priv; + + g_return_val_if_fail (G_IS_MEM_PIPE (mem_pipe), FALSE); + + priv = mem_pipe->priv; + return g_object_ref (priv->output_stream); +} + + +/* *********************************************************************** */ +/* streams */ + +/* ********************************* */ +/* input stream */ +typedef GInputStreamClass GMemPipeInputStreamClass; +struct _GMemPipeInputStream +{ + GInputStream parent_instance; + GMemPipe *mem_pipe; +}; + +#define G_TYPE_MEM_PIPE_INPUT_STREAM (g_mem_pipe_input_stream_get_type ()) +#define G_MEM_PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), \ + G_TYPE_MEM_PIPE_INPUT_STREAM, \ + GMemPipeInputStream)) +#define G_IS_MEM_PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), \ + G_TYPE_MEM_PIPE_INPUT_STREAM)) + +GType g_mem_pipe_input_stream_get_type (void) G_GNUC_CONST; + +G_DEFINE_TYPE (GMemPipeInputStream, + g_mem_pipe_input_stream, + G_TYPE_INPUT_STREAM) + +static GInputStream * +g_mem_pipe_input_stream_new (GMemPipe *mem_pipe) +{ + GMemPipeInputStream *istream; + istream = g_object_new (G_TYPE_MEM_PIPE_INPUT_STREAM, NULL); + + istream->mem_pipe = g_object_ref (mem_pipe); + return G_INPUT_STREAM (istream); +} + +static gssize +g_mem_pipe_input_stream_read (GInputStream *stream, + void *buffer, + gsize length, + GCancellable *cancellable, + GError **error) +{ + GMemPipeInputStream *istream = (GMemPipeInputStream *) stream; + + return g_mem_pipe_read (istream->mem_pipe, + buffer, + length, + cancellable, + error); +} + +static gboolean +g_mem_pipe_input_stream_close (GInputStream *stream, + GCancellable *cancellable, + GError **error) +{ + GMemPipeInputStream *istream = (GMemPipeInputStream *) stream; + + return g_mem_pipe_close_read (istream->mem_pipe, + cancellable, + error); +} + +static void +g_mem_pipe_input_stream_init (GMemPipeInputStream *self) +{ + +} + +static void +g_mem_pipe_input_stream_dispose (GObject *object) +{ + GMemPipeInputStream *istream; + + istream = G_MEM_PIPE_INPUT_STREAM (object); + + if (istream->mem_pipe != NULL) + { + g_object_unref (istream->mem_pipe); + istream->mem_pipe = NULL; + } + + G_OBJECT_CLASS (g_mem_pipe_input_stream_parent_class)->dispose (object); +} + +static void +g_mem_pipe_input_stream_class_init (GMemPipeInputStreamClass *klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass); + + gobject_class->dispose = g_mem_pipe_input_stream_dispose; + stream_class->read_fn = g_mem_pipe_input_stream_read; + stream_class->close_fn = g_mem_pipe_input_stream_close; +} + + +/* ********************************* */ +/* output stream */ +typedef GOutputStreamClass GMemPipeOutputStreamClass; +struct _GMemPipeOutputStream +{ + GOutputStream parent_instance; + GMemPipe *mem_pipe; +}; + +#define G_TYPE_MEM_PIPE_OUTPUT_STREAM (g_mem_pipe_output_stream_get_type ()) +#define G_MEM_PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), \ + G_TYPE_MEM_PIPE_OUTPUT_STREAM, \ + GMemPipeOutputStream)) +#define G_IS_MEM_PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), \ + G_TYPE_MEM_PIPE_OUTPUT_STREAM)) + +GType g_mem_pipe_output_stream_get_type (void) G_GNUC_CONST; + +G_DEFINE_TYPE (GMemPipeOutputStream, + g_mem_pipe_output_stream, + G_TYPE_OUTPUT_STREAM) + +static GOutputStream * +g_mem_pipe_output_stream_new (GMemPipe *mem_pipe) +{ + GMemPipeOutputStream *ostream; + ostream = g_object_new (G_TYPE_MEM_PIPE_OUTPUT_STREAM, NULL); + + ostream->mem_pipe = g_object_ref (mem_pipe); + return G_OUTPUT_STREAM (ostream); +} + +static gssize +g_mem_pipe_output_stream_write (GOutputStream *stream, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + GMemPipeOutputStream *ostream = (GMemPipeOutputStream *) stream; + + return g_mem_pipe_write (ostream->mem_pipe, + buffer, + count, + cancellable, + error); +} + +static gboolean +g_mem_pipe_output_stream_close (GOutputStream *stream, + GCancellable *cancellable, + GError **error) +{ + GMemPipeOutputStream *ostream = (GMemPipeOutputStream *) stream; + + g_return_val_if_fail (G_IS_MEMORY_OUTPUT_STREAM (stream), FALSE); + + return g_mem_pipe_close_write (ostream->mem_pipe, + cancellable, + error); +} + +static void +g_mem_pipe_output_stream_init (GMemPipeOutputStream *self) +{ + +} + +static void +g_mem_pipe_output_stream_dispose (GObject *object) +{ + GMemPipeOutputStream *ostream; + + ostream = G_MEM_PIPE_OUTPUT_STREAM (object); + + if (ostream->mem_pipe != NULL) + { + g_object_unref (ostream->mem_pipe); + ostream->mem_pipe = NULL; + } + + G_OBJECT_CLASS (g_mem_pipe_output_stream_parent_class)->dispose (object); +} + +static void +g_mem_pipe_output_stream_class_init (GMemPipeOutputStreamClass *klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GOutputStreamClass *stream_class = G_OUTPUT_STREAM_CLASS (klass); + + gobject_class->dispose = g_mem_pipe_output_stream_dispose; + stream_class->write_fn = g_mem_pipe_output_stream_write; + stream_class->close_fn = g_mem_pipe_output_stream_close; +} diff --git a/daemon/gmempipe.h b/daemon/gmempipe.h new file mode 100644 index 0000000..c9b1cd3 --- /dev/null +++ b/daemon/gmempipe.h @@ -0,0 +1,86 @@ +/* GIO - GLib Input, Output and Streaming Library + * + * Copyright (C) Christian Kellner + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General + * Public License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place, Suite 330, + * Boston, MA 02111-1307, USA. + * + * Author: Christian Kellner + */ + +#ifndef __G_MEM_PIPE_H__ +#define __G_MEM_PIPE_H__ + +#include +#include + +G_BEGIN_DECLS + +#define G_TYPE_MEM_PIPE (g_mem_pipe_get_type ()) +#define G_MEM_PIPE(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), G_TYPE_MEM_PIPE, GMemPipe)) +#define G_MEM_PIPE_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), G_TYPE_MEM_PIPE, GMemPipeClass)) +#define G_IS_MEM_PIPE(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), G_TYPE_MEM_PIPE)) +#define G_IS_MEM_PIPE_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), G_TPE_MEM_PIPE)) +#define G_MEM_PIPE_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), G_TYPE_MEM_PIPE, GMemPipeClass)) + +typedef struct _GMemPipe GMemPipe; +typedef struct _GMemPipePrivate GMemPipePrivate; +typedef struct _GMemPipeClass GMemPipeClass; + +struct _GMemPipe +{ + GObject parent_instance; + + GMemPipePrivate *priv; +}; + +struct _GMemPipeClass +{ + GObjectClass parent_class; + +}; + +GType g_mem_pipe_get_type (void) G_GNUC_CONST; +GMemPipe * g_mem_pipe_new (void); +GMemPipe * g_mem_pipe_buffered_new (gsize buffer_size); +gssize g_mem_pipe_write (GMemPipe *mem_pipe, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error); +gboolean g_mem_pipe_write_all (GMemPipe *mem_pipe, + const void *buffer, + gsize count, + gsize *bytes_written, + GCancellable *cancellable, + GError **error); +gssize g_mem_pipe_read (GMemPipe *mem_pipe, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error); +gboolean g_mem_pipe_close_write (GMemPipe *mem_pipe, + GCancellable *cancellable, + GError **error); +gboolean g_mem_pipe_close_read (GMemPipe *mem_pipe, + GCancellable *cancellable, + GError **error); +GInputStream *g_mem_pipe_get_input_stream (GMemPipe *mem_pipe); +GOutputStream *g_mem_pipe_get_output_stream (GMemPipe *mem_pipe); + + +G_END_DECLS + +#endif /* __G_MEM_PIPE_H__ */ diff --git a/daemon/gvfsbackenddav.c b/daemon/gvfsbackenddav.c index 95dc428..0862249 100644 --- a/daemon/gvfsbackenddav.c +++ b/daemon/gvfsbackenddav.c @@ -2313,6 +2313,47 @@ try_unmount (GVfsBackend *backend, _exit (0); } + +/* *** overwrite http backend method *** */ +static void +do_open_for_read (GVfsBackend *backend, + GVfsJobOpenForRead *job, + const char *filename) +{ + SoupURI *uri; + GFileType file_type; + gboolean res; + guint num_children; + GError *error; + + error = NULL; + + uri = http_backend_uri_for_filename (backend, filename, FALSE); + res = stat_location (backend, uri, &file_type, &num_children, &error); + soup_uri_free (uri); + + if (res == FALSE) + { + g_vfs_job_failed_from_error (G_VFS_JOB (job), error); + g_error_free (error); + return; + } + + if (file_type == G_FILE_TYPE_DIRECTORY) + { + g_vfs_job_failed (G_VFS_JOB (job), + G_IO_ERROR, G_IO_ERROR_IS_DIRECTORY, + _("Directory not empty")); + return; + } + + /* the real work happens in the parent class */ + G_VFS_BACKEND_CLASS (g_vfs_backend_dav_parent_class)->open_for_read (backend, + job, + filename); +} + + /* ************************************************************************* */ /* */ static void @@ -2326,6 +2367,8 @@ g_vfs_backend_dav_class_init (GVfsBackendDavClass *klass) backend_class = G_VFS_BACKEND_CLASS (klass); + backend_class->open_for_read = do_open_for_read; + backend_class->try_mount = NULL; backend_class->mount = do_mount; backend_class->try_query_info = NULL; diff --git a/daemon/gvfsbackendhttp.c b/daemon/gvfsbackendhttp.c index 976e0ed..8534d83 100644 --- a/daemon/gvfsbackendhttp.c +++ b/daemon/gvfsbackendhttp.c @@ -239,7 +239,6 @@ http_error_code_from_status (guint status) return G_IO_ERROR_FAILED; } - static void g_vfs_job_failed_from_http_status (GVfsJob *job, guint status_code, const char *message) { @@ -262,6 +261,16 @@ g_vfs_job_failed_from_http_status (GVfsJob *job, guint status_code, const char * } } +static void +g_vfs_job_failed_from_soup_error (GVfsJob *job, GError *error) +{ + if (error->domain == SOUP_HTTP_ERROR) + g_vfs_job_failed_from_http_status (job, error->code, error->message); + else + g_vfs_job_failed_literal (job, error->domain, error->code, error->message); +} + + guint http_backend_send_message (GVfsBackend *backend, SoupMessage *msg) @@ -335,53 +344,17 @@ try_mount (GVfsBackend *backend, return TRUE; } -/* *** open_read () *** */ -static void -open_for_read_ready (GObject *source_object, - GAsyncResult *result, - gpointer user_data) -{ - GInputStream *stream; - GVfsJob *job; - gboolean res; - gboolean can_seek; - GError *error; - - stream = G_INPUT_STREAM (source_object); - error = NULL; - job = G_VFS_JOB (user_data); - - res = soup_input_stream_send_finish (stream, - result, - &error); - if (res == FALSE) - { - g_vfs_job_failed_literal (G_VFS_JOB (job), - error->domain, - error->code, - error->message); - - g_error_free (error); - g_object_unref (stream); - return; - } - - can_seek = G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)); - - g_vfs_job_open_for_read_set_can_seek (G_VFS_JOB_OPEN_FOR_READ (job), can_seek); - g_vfs_job_open_for_read_set_handle (G_VFS_JOB_OPEN_FOR_READ (job), stream); - g_vfs_job_succeeded (job); -} - -static gboolean -try_open_for_read (GVfsBackend *backend, - GVfsJobOpenForRead *job, - const char *filename) +static void +do_open_for_read (GVfsBackend *backend, + GVfsJobOpenForRead *job, + const char *filename) { GVfsBackendHttp *op_backend; GInputStream *stream; SoupMessage *msg; SoupURI *uri; + GError *error; + gboolean res; op_backend = G_VFS_BACKEND_HTTP (backend); uri = http_backend_uri_for_filename (backend, filename, FALSE); @@ -390,125 +363,119 @@ try_open_for_read (GVfsBackend *backend, soup_message_body_set_accumulate (msg->response_body, FALSE); - stream = soup_input_stream_new (op_backend->session_async, msg); + stream = soup_input_stream_new (op_backend->session, msg); g_object_unref (msg); - soup_input_stream_send_async (stream, - G_PRIORITY_DEFAULT, - G_VFS_JOB (job)->cancellable, - open_for_read_ready, - job); - return TRUE; + error = NULL; + res = soup_input_stream_send (stream, + G_VFS_JOB (job)->cancellable, + &error); + + if (res == FALSE) + { + g_vfs_job_failed_from_soup_error (G_VFS_JOB (job), error); + g_error_free (error); + g_object_unref (stream); + } + else + { + gboolean can_seek; + can_seek = G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)); + + g_vfs_job_open_for_read_set_can_seek (G_VFS_JOB_OPEN_FOR_READ (job), can_seek); + g_vfs_job_open_for_read_set_handle (G_VFS_JOB_OPEN_FOR_READ (job), stream); + + g_vfs_job_succeeded (G_VFS_JOB (job)); + } } -/* *** read () *** */ static void -read_ready (GObject *source_object, - GAsyncResult *result, - gpointer user_data) +do_read (GVfsBackend * backend, + GVfsJobRead * job, + GVfsBackendHandle handle, + char * buffer, + gsize bytes_requested) { GInputStream *stream; - GVfsJob *job; GError *error; - gssize nread; - - stream = G_INPUT_STREAM (source_object); - error = NULL; - job = G_VFS_JOB (user_data); - - nread = g_input_stream_read_finish (stream, result, &error); - - if (nread < 0) - { - g_vfs_job_failed_literal (G_VFS_JOB (job), - error->domain, - error->code, - error->message); + gssize n_bytes; - g_error_free (error); - return; - } + stream = G_INPUT_STREAM (handle); - g_vfs_job_read_set_size (G_VFS_JOB_READ (job), nread); - g_vfs_job_succeeded (job); + error = NULL; + n_bytes = g_input_stream_read (stream, + buffer, + bytes_requested, + G_VFS_JOB (job)->cancellable, + &error); + if (n_bytes >= 0) + { + g_vfs_job_read_set_size (job, n_bytes); + g_vfs_job_succeeded (G_VFS_JOB (job)); + } + else + { + g_vfs_job_failed_from_soup_error (G_VFS_JOB (job), error); + g_error_free (error); + } } -static gboolean -try_read (GVfsBackend *backend, - GVfsJobRead *job, - GVfsBackendHandle handle, - char *buffer, - gsize bytes_requested) -{ - GInputStream *stream; - stream = G_INPUT_STREAM (handle); - - g_input_stream_read_async (stream, - buffer, - bytes_requested, - G_PRIORITY_DEFAULT, - G_VFS_JOB (job)->cancellable, - read_ready, - job); - return TRUE; -} - -static gboolean -try_seek_on_read (GVfsBackend *backend, - GVfsJobSeekRead *job, - GVfsBackendHandle handle, - goffset offset, - GSeekType type) +static void +do_seek_on_read (GVfsBackend *backend, + GVfsJobSeekRead *job, + GVfsBackendHandle handle, + goffset offset, + GSeekType type) { - GInputStream *stream; - GError *error = NULL; + GInputStream *stream; + GError *error = NULL; + gboolean res; stream = G_INPUT_STREAM (handle); - if (!g_seekable_seek (G_SEEKABLE (stream), offset, type, - G_VFS_JOB (job)->cancellable, &error)) + + res = g_seekable_seek (G_SEEKABLE (stream), + offset, + type, + G_VFS_JOB (job)->cancellable, + &error); + + if (res == FALSE) { - g_vfs_job_failed_literal (G_VFS_JOB (job), - error->domain, - error->code, - error->message); + g_vfs_job_failed_from_soup_error (G_VFS_JOB (job), error); g_error_free (error); - return FALSE; } else { g_vfs_job_seek_read_set_offset (job, g_seekable_tell (G_SEEKABLE (stream))); g_vfs_job_succeeded (G_VFS_JOB (job)); } - - return TRUE; } /* *** read_close () *** */ static void -close_read_ready (GObject *source_object, - GAsyncResult *result, - gpointer user_data) +do_close_read (GVfsBackend *backend, + GVfsJobCloseRead *read_job, + GVfsBackendHandle handle) { GInputStream *stream; GVfsJob *job; GError *error; gboolean res; - job = G_VFS_JOB (user_data); - stream = G_INPUT_STREAM (source_object); - res = g_input_stream_close_finish (stream, - result, - &error); + error = NULL; + job = G_VFS_JOB (read_job); + stream = G_INPUT_STREAM (handle); + + + res = g_input_stream_close (stream, + job->cancellable, + &error); if (res == FALSE) { - g_vfs_job_failed_literal (G_VFS_JOB (job), - error->domain, - error->code, - error->message); - + g_vfs_job_failed_from_soup_error (G_VFS_JOB (job), error); g_error_free (error); } else @@ -517,24 +484,6 @@ close_read_ready (GObject *source_object, g_object_unref (stream); } -static gboolean -try_close_read (GVfsBackend *backend, - GVfsJobCloseRead *job, - GVfsBackendHandle handle) -{ - GInputStream *stream; - - stream = G_INPUT_STREAM (handle); - - g_input_stream_close_async (stream, - G_PRIORITY_DEFAULT, - G_VFS_JOB (job)->cancellable, - close_read_ready, - job); - return TRUE; -} - - /* *** query_info () *** */ static void @@ -688,10 +637,12 @@ g_vfs_backend_http_class_init (GVfsBackendHttpClass *klass) backend_class = G_VFS_BACKEND_CLASS (klass); backend_class->try_mount = try_mount; - backend_class->try_open_for_read = try_open_for_read; - backend_class->try_read = try_read; - backend_class->try_seek_on_read = try_seek_on_read; - backend_class->try_close_read = try_close_read; + + backend_class->open_for_read = do_open_for_read; + backend_class->close_read = do_close_read; + backend_class->read = do_read; + backend_class->seek_on_read = do_seek_on_read; + backend_class->try_query_info = try_query_info; backend_class->try_query_info_on_read = try_query_info_on_read; } diff --git a/daemon/soup-input-stream.c b/daemon/soup-input-stream.c index e1928af..10af4dd 100644 --- a/daemon/soup-input-stream.c +++ b/daemon/soup-input-stream.c @@ -18,7 +18,7 @@ * Boston, MA 02111-1307, USA. */ -#include +//#include #include @@ -28,6 +28,7 @@ #include #include "soup-input-stream.h" +#include "gmempipe.h" static void soup_input_stream_seekable_iface_init (GSeekableIface *seekable_iface); @@ -37,29 +38,23 @@ G_DEFINE_TYPE_WITH_CODE (SoupInputStream, soup_input_stream, G_TYPE_INPUT_STREAM typedef void (*SoupInputStreamCallback) (GInputStream *); -typedef struct { +struct SoupInputStreamPrivate { SoupSession *session; - GMainContext *async_context; SoupMessage *msg; + gboolean got_headers, finished; goffset offset; - GCancellable *cancellable; - GSource *cancel_watch; - SoupInputStreamCallback got_headers_cb; - SoupInputStreamCallback got_chunk_cb; - SoupInputStreamCallback finished_cb; - SoupInputStreamCallback cancelled_cb; + GMutex *lock; + GCond *cond; + + GMemPipe *mem_pipe; - guchar *leftover_buffer; - gsize leftover_bufsize, leftover_offset; + GInputStream *in; + GOutputStream *out; - guchar *caller_buffer; - gsize caller_bufsize, caller_nread; - GAsyncReadyCallback outstanding_callback; - GSimpleAsyncResult *result; +}; -} SoupInputStreamPrivate; #define SOUP_INPUT_STREAM_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_INPUT_STREAM, SoupInputStreamPrivate)) @@ -71,24 +66,6 @@ static gssize soup_input_stream_read (GInputStream *stream, static gboolean soup_input_stream_close (GInputStream *stream, GCancellable *cancellable, GError **error); -static void soup_input_stream_read_async (GInputStream *stream, - void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer data); -static gssize soup_input_stream_read_finish (GInputStream *stream, - GAsyncResult *result, - GError **error); -static void soup_input_stream_close_async (GInputStream *stream, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer data); -static gboolean soup_input_stream_close_finish (GInputStream *stream, - GAsyncResult *result, - GError **error); static goffset soup_input_stream_tell (GSeekable *seekable); @@ -115,13 +92,25 @@ soup_input_stream_finalize (GObject *object) SoupInputStream *stream = SOUP_INPUT_STREAM (object); SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); - g_object_unref (priv->session); + g_print ("Finalize\n"); - g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_input_stream_got_headers), stream); - g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_input_stream_got_chunk), stream); - g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_input_stream_finished), stream); - g_object_unref (priv->msg); - g_free (priv->leftover_buffer); + if (priv->session) + g_object_unref (priv->session); + + if (priv->msg) + g_object_unref (priv->msg); + + if (priv->mem_pipe) + { + g_object_unref (priv->mem_pipe); + g_object_unref (priv->in); + g_object_unref (priv->out); + } + + g_mutex_free (priv->lock); + g_cond_free (priv->cond); + + g_print ("Done!\n"); if (G_OBJECT_CLASS (soup_input_stream_parent_class)->finalize) (*G_OBJECT_CLASS (soup_input_stream_parent_class)->finalize) (object); @@ -136,41 +125,77 @@ soup_input_stream_class_init (SoupInputStreamClass *klass) g_type_class_add_private (klass, sizeof (SoupInputStreamPrivate)); gobject_class->finalize = soup_input_stream_finalize; - - stream_class->read_fn = soup_input_stream_read; + stream_class->read_fn = soup_input_stream_read; stream_class->close_fn = soup_input_stream_close; - stream_class->read_async = soup_input_stream_read_async; - stream_class->read_finish = soup_input_stream_read_finish; - stream_class->close_async = soup_input_stream_close_async; - stream_class->close_finish = soup_input_stream_close_finish; } static void soup_input_stream_seekable_iface_init (GSeekableIface *seekable_iface) { - seekable_iface->tell = soup_input_stream_tell; - seekable_iface->can_seek = soup_input_stream_can_seek; - seekable_iface->seek = soup_input_stream_seek; + seekable_iface->tell = soup_input_stream_tell; + seekable_iface->can_seek = soup_input_stream_can_seek; + seekable_iface->seek = soup_input_stream_seek; seekable_iface->can_truncate = soup_input_stream_can_truncate; - seekable_iface->truncate_fn = soup_input_stream_truncate; + seekable_iface->truncate_fn = soup_input_stream_truncate; } static void soup_input_stream_init (SoupInputStream *stream) { - ; + SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + stream->priv = priv; + + priv->lock = g_mutex_new (); + priv->cond = g_cond_new (); } static void -soup_input_stream_queue_message (SoupInputStream *stream) +_soup_input_stream_queue_msg_and_wait (SoupInputStream *stream) { - SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + SoupInputStreamPrivate *priv = stream->priv; + + g_mutex_lock (priv->lock); + + if (priv->mem_pipe) + { + g_object_unref (priv->mem_pipe); + g_object_unref (priv->in); + g_object_unref (priv->out); + } + + priv->mem_pipe = g_mem_pipe_new (); + priv->in = g_mem_pipe_get_input_stream (priv->mem_pipe); + priv->out = g_mem_pipe_get_output_stream (priv->mem_pipe); priv->got_headers = priv->finished = FALSE; /* Add an extra ref since soup_session_queue_message steals one */ g_object_ref (priv->msg); soup_session_queue_message (priv->session, priv->msg, NULL, NULL); + + /* wait until we got the headers (or fail?) */ + while (!priv->got_headers && !priv->finished) + g_cond_wait (priv->cond, priv->lock); + + g_mutex_unlock (priv->lock); +} + +extern void soup_message_io_cleanup (SoupMessage *msg); + +static void +_soup_input_stream_cancel_msg_and_wait (SoupInputStream *stream) +{ + SoupInputStreamPrivate *priv = stream->priv; + + g_mutex_lock (priv->lock); + soup_session_cancel_message (priv->session, priv->msg, SOUP_STATUS_CANCELLED); + soup_message_io_cleanup (priv->msg); + + /* FIXME: is this at all necessary? */ + while (!priv->got_headers && !priv->finished) + g_cond_wait (priv->cond, priv->lock); + + g_mutex_unlock (priv->lock); } /** @@ -181,21 +206,8 @@ soup_input_stream_queue_message (SoupInputStream *stream) * Prepares to send @msg over @session, and returns a #GInputStream * that can be used to read the response. * - * @msg may not be sent until the first read call; if you need to look - * at the status code or response headers before reading the body, you - * can use soup_input_stream_send() or soup_input_stream_send_async() - * to force the message to be sent and the response headers read. + * FIXME * - * If @msg gets a non-2xx result, the first read (or send) will return - * an error with type %SOUP_INPUT_STREAM_HTTP_ERROR. - * - * Internally, #SoupInputStream is implemented using asynchronous I/O, - * so if you are using the synchronous API (eg, - * g_input_stream_read()), you should create a new #GMainContext and - * set it as the %SOUP_SESSION_ASYNC_CONTEXT property on @session. (If - * you don't, then synchronous #GInputStream calls will cause the main - * loop to be run recursively.) The async #GInputStream API works fine - * with %SOUP_SESSION_ASYNC_CONTEXT either set or unset. * * Returns: a new #GInputStream. **/ @@ -211,17 +223,15 @@ soup_input_stream_new (SoupSession *session, SoupMessage *msg) priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); priv->session = g_object_ref (session); - priv->async_context = soup_session_get_async_context (session); priv->msg = g_object_ref (msg); - + g_signal_connect (msg, "got_headers", - G_CALLBACK (soup_input_stream_got_headers), stream); + G_CALLBACK (soup_input_stream_got_headers), stream); g_signal_connect (msg, "got_chunk", - G_CALLBACK (soup_input_stream_got_chunk), stream); + G_CALLBACK (soup_input_stream_got_chunk), stream); g_signal_connect (msg, "finished", - G_CALLBACK (soup_input_stream_finished), stream); + G_CALLBACK (soup_input_stream_finished), stream); - soup_input_stream_queue_message (stream); return G_INPUT_STREAM (stream); } @@ -230,6 +240,8 @@ soup_input_stream_got_headers (SoupMessage *msg, gpointer stream) { SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + g_mutex_lock (priv->lock); + /* If the status is unsuccessful, we just ignore the signal and let * libsoup keep going (eventually either it will requeue the request * (after handling authentication/redirection), or else the @@ -239,19 +251,14 @@ soup_input_stream_got_headers (SoupMessage *msg, gpointer stream) return; priv->got_headers = TRUE; - if (!priv->caller_buffer) - { - /* Not ready to read the body yet */ - soup_session_pause_message (priv->session, msg); - } - - if (priv->got_headers_cb) - priv->got_headers_cb (stream); + g_cond_signal (priv->cond); + g_mutex_unlock (priv->lock); } static void -soup_input_stream_got_chunk (SoupMessage *msg, SoupBuffer *chunk_buffer, - gpointer stream) +soup_input_stream_got_chunk (SoupMessage *msg, + SoupBuffer *chunk_buffer, + gpointer stream) { SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); const gchar *chunk = chunk_buffer->data; @@ -263,46 +270,14 @@ soup_input_stream_got_chunk (SoupMessage *msg, SoupBuffer *chunk_buffer, if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) return; - /* Sanity check */ - if (priv->caller_bufsize == 0 || priv->leftover_bufsize != 0) - g_warning ("soup_input_stream_got_chunk called again before previous chunk was processed"); - - /* Copy what we can into priv->caller_buffer */ - if (priv->caller_bufsize - priv->caller_nread > 0) - { - gsize nread = MIN (chunk_size, priv->caller_bufsize - priv->caller_nread); - - memcpy (priv->caller_buffer + priv->caller_nread, chunk, nread); - priv->caller_nread += nread; - priv->offset += nread; - chunk += nread; - chunk_size -= nread; - } - - if (chunk_size > 0) - { - /* Copy the rest into priv->leftover_buffer. If there's already - * some data there, realloc and append. Otherwise just copy. - */ - if (priv->leftover_bufsize) - { - priv->leftover_buffer = g_realloc (priv->leftover_buffer, - priv->leftover_bufsize + chunk_size); - memcpy (priv->leftover_buffer + priv->leftover_bufsize, - chunk, chunk_size); - priv->leftover_bufsize += chunk_size; - } - else - { - priv->leftover_bufsize = chunk_size; - priv->leftover_buffer = g_memdup (chunk, chunk_size); - priv->leftover_offset = 0; - } - } + g_output_stream_write_all (priv->out, + chunk, + chunk_size, + NULL, + NULL, + NULL); - soup_session_pause_message (priv->session, msg); - if (priv->got_chunk_cb) - priv->got_chunk_cb (stream); + /* FIXME: handle errors */ } static void @@ -310,71 +285,12 @@ soup_input_stream_finished (SoupMessage *msg, gpointer stream) { SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); + g_mutex_lock (priv->lock); priv->finished = TRUE; - - if (priv->finished_cb) - priv->finished_cb (stream); -} - -static gboolean -soup_input_stream_cancelled (GIOChannel *chan, GIOCondition condition, - gpointer stream) -{ - SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); - - priv->cancel_watch = NULL; - - soup_session_pause_message (priv->session, priv->msg); - if (priv->cancelled_cb) - priv->cancelled_cb (stream); - - return FALSE; -} - -static void -soup_input_stream_prepare_for_io (GInputStream *stream, - GCancellable *cancellable, - guchar *buffer, - gsize count) -{ - SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); - int cancel_fd; - - priv->cancellable = cancellable; - cancel_fd = g_cancellable_get_fd (cancellable); - if (cancel_fd != -1) - { - GIOChannel *chan = g_io_channel_unix_new (cancel_fd); - priv->cancel_watch = soup_add_io_watch (priv->async_context, chan, - G_IO_IN | G_IO_ERR | G_IO_HUP, - soup_input_stream_cancelled, - stream); - g_io_channel_unref (chan); - } - - priv->caller_buffer = buffer; - priv->caller_bufsize = count; - priv->caller_nread = 0; - - if (priv->got_headers) - soup_session_unpause_message (priv->session, priv->msg); -} - -static void -soup_input_stream_done_io (GInputStream *stream) -{ - SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); - - if (priv->cancel_watch) - { - g_source_destroy (priv->cancel_watch); - priv->cancel_watch = NULL; - g_cancellable_release_fd (priv->cancellable); - } - priv->cancellable = NULL; - - priv->caller_buffer = NULL; - priv->caller_bufsize = 0; + //g_print ("Finished!\n"); + g_mem_pipe_close_write (priv->mem_pipe, NULL, NULL); + g_cond_signal (priv->cond); + g_mutex_unlock (priv->lock); } static gboolean @@ -386,58 +302,10 @@ set_error_if_http_failed (SoupMessage *msg, GError **error) msg->status_code, msg->reason_phrase); return TRUE; } + return FALSE; } -static gsize -read_from_leftover (SoupInputStreamPrivate *priv, - gpointer buffer, gsize bufsize) -{ - gsize nread; - - if (priv->leftover_bufsize - priv->leftover_offset <= bufsize) - { - nread = priv->leftover_bufsize - priv->leftover_offset; - memcpy (buffer, priv->leftover_buffer + priv->leftover_offset, nread); - - g_free (priv->leftover_buffer); - priv->leftover_buffer = NULL; - priv->leftover_bufsize = priv->leftover_offset = 0; - } - else - { - nread = bufsize; - memcpy (buffer, priv->leftover_buffer + priv->leftover_offset, nread); - priv->leftover_offset += nread; - } - - priv->offset += nread; - return nread; -} - -/* This does the work of soup_input_stream_send(), assuming that the - * GInputStream pending flag has already been set. It is also used by - * soup_input_stream_send_async() in some circumstances. - */ -static gboolean -soup_input_stream_send_internal (GInputStream *stream, - GCancellable *cancellable, - GError **error) -{ - SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); - - soup_input_stream_prepare_for_io (stream, cancellable, NULL, 0); - while (!priv->finished && !priv->got_headers && - !g_cancellable_is_cancelled (cancellable)) - g_main_context_iteration (priv->async_context, TRUE); - soup_input_stream_done_io (stream); - - if (g_cancellable_set_error_if_cancelled (cancellable, error)) - return FALSE; - else if (set_error_if_http_failed (priv->msg, error)) - return FALSE; - return TRUE; -} /** * soup_input_stream_send: @@ -454,54 +322,59 @@ soup_input_stream_send_internal (GInputStream *stream, * not. **/ gboolean -soup_input_stream_send (GInputStream *stream, - GCancellable *cancellable, - GError **error) +soup_input_stream_send (GInputStream *input_stream, + GCancellable *cancellable, + GError **error) { + SoupInputStream *stream; + SoupInputStreamPrivate *priv; gboolean result; - g_return_val_if_fail (SOUP_IS_INPUT_STREAM (stream), FALSE); + g_return_val_if_fail (SOUP_IS_INPUT_STREAM (input_stream), FALSE); + + stream = SOUP_INPUT_STREAM (input_stream); + priv = stream->priv; - if (!g_input_stream_set_pending (stream, error)) + if (!g_input_stream_set_pending (input_stream, error)) return FALSE; - result = soup_input_stream_send_internal (stream, cancellable, error); - g_input_stream_clear_pending (stream); + + _soup_input_stream_queue_msg_and_wait (stream); + + g_input_stream_clear_pending (input_stream); + + if (g_cancellable_set_error_if_cancelled (cancellable, error)) + return FALSE; + + result = ! set_error_if_http_failed (priv->msg, error); return result; } static gssize -soup_input_stream_read (GInputStream *stream, - void *buffer, - gsize count, - GCancellable *cancellable, - GError **error) +soup_input_stream_read (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) { - SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); - - if (priv->finished) - return 0; + SoupInputStream *istream; + SoupInputStreamPrivate *priv; + gssize nread; - /* If there is data leftover from a previous read, return it. */ - if (priv->leftover_bufsize) - return read_from_leftover (priv, buffer, count); + g_return_val_if_fail (SOUP_IS_INPUT_STREAM (stream), -1); + + istream = SOUP_INPUT_STREAM (stream); + priv = istream->priv; - /* No leftover data, accept one chunk from the network */ - soup_input_stream_prepare_for_io (stream, cancellable, buffer, count); - while (!priv->finished && priv->caller_nread == 0 && - !g_cancellable_is_cancelled (cancellable)) - g_main_context_iteration (priv->async_context, TRUE); - soup_input_stream_done_io (stream); + nread = g_input_stream_read (priv->in, + buffer, + count, + cancellable, + error); - if (priv->caller_nread > 0) - return priv->caller_nread; + priv->offset += nread; - if (g_cancellable_set_error_if_cancelled (cancellable, error)) - return -1; - else if (set_error_if_http_failed (priv->msg, error)) - return -1; - else - return 0; + return nread; } static gboolean @@ -509,314 +382,25 @@ soup_input_stream_close (GInputStream *stream, GCancellable *cancellable, GError **error) { - SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); - - if (!priv->finished) - soup_session_cancel_message (priv->session, priv->msg, SOUP_STATUS_CANCELLED); - - return TRUE; -} - -static void -wrapper_callback (GObject *source_object, GAsyncResult *res, - gpointer user_data) -{ - GInputStream *stream = G_INPUT_STREAM (source_object); - SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); - - g_input_stream_clear_pending (stream); - if (priv->outstanding_callback) - (*priv->outstanding_callback) (source_object, res, user_data); - priv->outstanding_callback = NULL; - g_object_unref (stream); -} - -static void -send_async_thread (GSimpleAsyncResult *res, - GObject *object, - GCancellable *cancellable) -{ - GError *error = NULL; - gboolean success; - - success = soup_input_stream_send_internal (G_INPUT_STREAM (object), - cancellable, &error); - g_simple_async_result_set_op_res_gboolean (res, success); - if (error) - { - g_simple_async_result_set_from_error (res, error); - g_error_free (error); - } -} - -static void -soup_input_stream_send_async_in_thread (GInputStream *stream, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data) -{ - GSimpleAsyncResult *res; - - res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, - soup_input_stream_send_async_in_thread); - g_simple_async_result_run_in_thread (res, send_async_thread, - io_priority, cancellable); - g_object_unref (res); -} - -static void -send_async_finished (GInputStream *stream) -{ - SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); - GSimpleAsyncResult *result; - GError *error = NULL; - - if (!g_cancellable_set_error_if_cancelled (priv->cancellable, &error)) - set_error_if_http_failed (priv->msg, &error); - - priv->got_headers_cb = NULL; - priv->finished_cb = NULL; - soup_input_stream_done_io (stream); - - result = priv->result; - priv->result = NULL; - - g_simple_async_result_set_op_res_gboolean (result, error == NULL); - if (error) - { - g_simple_async_result_set_from_error (result, error); - g_error_free (error); - } - g_simple_async_result_complete (result); -} - -static void -soup_input_stream_send_async_internal (GInputStream *stream, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data) -{ - SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); - - g_object_ref (stream); - priv->outstanding_callback = callback; - - /* If the session uses the default GMainContext, then we can do - * async I/O directly. But if it has its own main context, it's - * easier to just run it in another thread. - */ - if (soup_session_get_async_context (priv->session)) - { - soup_input_stream_send_async_in_thread (stream, io_priority, cancellable, - wrapper_callback, user_data); - return; - } - - priv->got_headers_cb = send_async_finished; - priv->finished_cb = send_async_finished; - - soup_input_stream_prepare_for_io (stream, cancellable, NULL, 0); - priv->result = g_simple_async_result_new (G_OBJECT (stream), - wrapper_callback, user_data, - soup_input_stream_send_async); -} - -/** - * soup_input_stream_send_async: - * @stream: a #SoupInputStream - * @io_priority: the io priority of the request. - * @cancellable: optional #GCancellable object, %NULL to ignore. - * @callback: callback to call when the request is satisfied - * @user_data: the data to pass to callback function - * - * Asynchronously sends the HTTP request associated with @stream, and - * reads the response headers. Call this after soup_input_stream_new() - * and before the first g_input_stream_read_async() if you want to - * check the HTTP status code before you start reading. - **/ -void -soup_input_stream_send_async (GInputStream *stream, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data) -{ - GError *error = NULL; - - g_return_if_fail (SOUP_IS_INPUT_STREAM (stream)); - - if (!g_input_stream_set_pending (stream, &error)) - { - g_simple_async_report_gerror_in_idle (G_OBJECT (stream), - callback, - user_data, - error); - g_error_free (error); - return; - } - soup_input_stream_send_async_internal (stream, io_priority, cancellable, - callback, user_data); -} - -/** - * soup_input_stream_send_finish: - * @stream: a #SoupInputStream - * @result: a #GAsyncResult. - * @error: a #GError location to store the error occuring, or %NULL to - * ignore. - * - * Finishes a soup_input_stream_send_async() operation. - * - * Return value: %TRUE if the message was sent successfully and - * received a successful status code, %FALSE if not. - **/ -gboolean -soup_input_stream_send_finish (GInputStream *stream, - GAsyncResult *result, - GError **error) -{ - GSimpleAsyncResult *simple; - - g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), FALSE); - simple = G_SIMPLE_ASYNC_RESULT (result); - - g_return_val_if_fail (g_simple_async_result_get_source_tag (simple) == soup_input_stream_send_async, FALSE); - - if (g_simple_async_result_propagate_error (simple, error)) - return FALSE; - - return g_simple_async_result_get_op_res_gboolean (simple); -} - -static void -read_async_done (GInputStream *stream) -{ - SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); - GSimpleAsyncResult *result; - GError *error = NULL; - - result = priv->result; - priv->result = NULL; - - if (g_cancellable_set_error_if_cancelled (priv->cancellable, &error) || - set_error_if_http_failed (priv->msg, &error)) - { - g_simple_async_result_set_from_error (result, error); - g_error_free (error); - } - else - g_simple_async_result_set_op_res_gssize (result, priv->caller_nread); - - priv->got_chunk_cb = NULL; - priv->finished_cb = NULL; - priv->cancelled_cb = NULL; - soup_input_stream_done_io (stream); - - g_simple_async_result_complete (result); - g_object_unref (result); -} - -static void -soup_input_stream_read_async (GInputStream *stream, - void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data) -{ - SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream); - GSimpleAsyncResult *result; - - /* If the session uses the default GMainContext, then we can do - * async I/O directly. But if it has its own main context, we fall - * back to the async-via-sync-in-another-thread implementation. - */ - if (soup_session_get_async_context (priv->session)) - { - G_INPUT_STREAM_CLASS (soup_input_stream_parent_class)-> - read_async (stream, buffer, count, io_priority, - cancellable, callback, user_data); - return; - } - - result = g_simple_async_result_new (G_OBJECT (stream), - callback, user_data, - soup_input_stream_read_async); - - if (priv->finished) - { - g_simple_async_result_set_op_res_gssize (result, 0); - g_simple_async_result_complete_in_idle (result); - g_object_unref (result); - return; - } - - if (priv->leftover_bufsize) - { - gsize nread = read_from_leftover (priv, buffer, count); - g_simple_async_result_set_op_res_gssize (result, nread); - g_simple_async_result_complete_in_idle (result); - g_object_unref (result); - return; - } - - priv->result = result; - - priv->got_chunk_cb = read_async_done; - priv->finished_cb = read_async_done; - priv->cancelled_cb = read_async_done; - soup_input_stream_prepare_for_io (stream, cancellable, buffer, count); -} - -static gssize -soup_input_stream_read_finish (GInputStream *stream, - GAsyncResult *result, - GError **error) -{ - GSimpleAsyncResult *simple; + SoupInputStream *istream; + SoupInputStreamPrivate *priv; - g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), -1); - simple = G_SIMPLE_ASYNC_RESULT (result); - g_return_val_if_fail (g_simple_async_result_get_source_tag (simple) == soup_input_stream_read_async, -1); + g_return_val_if_fail (SOUP_IS_INPUT_STREAM (stream), -1); - return g_simple_async_result_get_op_res_gssize (simple); -} + istream = SOUP_INPUT_STREAM (stream); + priv = istream->priv; -static void -soup_input_stream_close_async (GInputStream *stream, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data) -{ - GSimpleAsyncResult *result; - gboolean success; - GError *error = NULL; - - result = g_simple_async_result_new (G_OBJECT (stream), - callback, user_data, - soup_input_stream_close_async); - success = soup_input_stream_close (stream, cancellable, &error); - g_simple_async_result_set_op_res_gboolean (result, success); - if (error) + g_mutex_lock (priv->lock); + if (!priv->finished) { - g_simple_async_result_set_from_error (result, error); - g_error_free (error); + soup_session_cancel_message (priv->session, + priv->msg, + SOUP_STATUS_CANCELLED); + g_print ("Cancelling message\n"); } - g_simple_async_result_complete_in_idle (result); - g_object_unref (result); -} - -static gboolean -soup_input_stream_close_finish (GInputStream *stream, - GAsyncResult *result, - GError **error) -{ - /* Failures handled in generic close_finish code */ + g_mem_pipe_close_read (priv->mem_pipe, cancellable, error); + g_mutex_unlock (priv->lock); return TRUE; } @@ -834,17 +418,17 @@ soup_input_stream_can_seek (GSeekable *seekable) return TRUE; } -extern void soup_message_io_cleanup (SoupMessage *msg); static gboolean soup_input_stream_seek (GSeekable *seekable, - goffset offset, - GSeekType type, - GCancellable *cancellable, - GError **error) + goffset offset, + GSeekType type, + GCancellable *cancellable, + GError **error) { GInputStream *stream = G_INPUT_STREAM (seekable); - SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (seekable); + SoupInputStream *istream = SOUP_INPUT_STREAM (stream); + SoupInputStreamPrivate *priv = istream->priv; char *range; if (type == G_SEEK_END) @@ -862,8 +446,7 @@ soup_input_stream_seek (GSeekable *seekable, if (!g_input_stream_set_pending (stream, error)) return FALSE; - soup_session_cancel_message (priv->session, priv->msg, SOUP_STATUS_CANCELLED); - soup_message_io_cleanup (priv->msg); + _soup_input_stream_cancel_msg_and_wait (istream); switch (type) { @@ -889,7 +472,7 @@ soup_input_stream_seek (GSeekable *seekable, soup_message_headers_append (priv->msg->request_headers, "Range", range); g_free (range); - soup_input_stream_queue_message (SOUP_INPUT_STREAM (stream)); + _soup_input_stream_queue_msg_and_wait (istream); g_input_stream_clear_pending (stream); return TRUE; @@ -922,8 +505,8 @@ soup_input_stream_get_message (GInputStream *stream) GQuark soup_http_error_quark (void) { - static GQuark error; - if (!error) - error = g_quark_from_static_string ("soup_http_error_quark"); - return error; + static GQuark error; + if (!error) + error = g_quark_from_static_string ("soup_http_error_quark"); + return error; } diff --git a/daemon/soup-input-stream.h b/daemon/soup-input-stream.h index f425291..2d47c7c 100644 --- a/daemon/soup-input-stream.h +++ b/daemon/soup-input-stream.h @@ -33,17 +33,22 @@ G_BEGIN_DECLS typedef struct SoupInputStream SoupInputStream; typedef struct SoupInputStreamClass SoupInputStreamClass; +typedef struct SoupInputStreamPrivate SoupInputStreamPrivate; struct SoupInputStream { GInputStream parent; + SoupInputStreamPrivate *priv; + }; struct SoupInputStreamClass { GInputStreamClass parent_class; + + /* Padding for future expansion */ void (*_g_reserved1) (void); void (*_g_reserved2) (void); @@ -61,15 +66,6 @@ gboolean soup_input_stream_send (GInputStream *stream, GCancellable *cancellable, GError **error); -void soup_input_stream_send_async (GInputStream *stream, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data); -gboolean soup_input_stream_send_finish (GInputStream *stream, - GAsyncResult *result, - GError **error); - SoupMessage *soup_input_stream_get_message (GInputStream *stream); #define SOUP_HTTP_ERROR soup_http_error_quark()