GUnixInputStream, GUnixOutputStream: support ordinary files better

If the fd is not a pipe or socket, fall back to using threads to do
async I/O rather than poll, since poll doesn't work the way you want
for ordinary files.

https://bugzilla.gnome.org/show_bug.cgi?id=606913
This commit is contained in:
Dan Winship 2010-04-27 16:54:18 -04:00
parent e60846dc78
commit 9b4cc6edf4
2 changed files with 58 additions and 10 deletions

View File

@ -46,9 +46,12 @@
* @include: gio/gunixinputstream.h * @include: gio/gunixinputstream.h
* @see_also: #GInputStream * @see_also: #GInputStream
* *
* #GUnixInputStream implements #GInputStream for reading from a * #GUnixInputStream implements #GInputStream for reading from a UNIX
* UNIX file descriptor, including asynchronous operations. The file * file descriptor, including asynchronous operations. (If the file
* descriptor must be selectable, so it doesn't work with opened files. * descriptor refers to a socket or pipe, this will use poll() to do
* asynchronous I/O. If it refers to a regular file, it will fall back
* to doing asynchronous I/O in another thread like
* #GLocalFileInputStream.)
* *
* Note that <filename>&lt;gio/gunixinputstream.h&gt;</filename> belongs * Note that <filename>&lt;gio/gunixinputstream.h&gt;</filename> belongs
* to the UNIX-specific GIO interfaces, thus you have to use the * to the UNIX-specific GIO interfaces, thus you have to use the
@ -73,7 +76,8 @@ G_DEFINE_TYPE_WITH_CODE (GUnixInputStream, g_unix_input_stream, G_TYPE_INPUT_STR
struct _GUnixInputStreamPrivate { struct _GUnixInputStreamPrivate {
int fd; int fd;
gboolean close_fd; guint close_fd : 1;
guint is_pipe_or_socket : 1;
}; };
static void g_unix_input_stream_set_property (GObject *object, static void g_unix_input_stream_set_property (GObject *object,
@ -213,6 +217,10 @@ g_unix_input_stream_set_property (GObject *object,
{ {
case PROP_FD: case PROP_FD:
unix_stream->priv->fd = g_value_get_int (value); unix_stream->priv->fd = g_value_get_int (value);
if (lseek (unix_stream->priv->fd, 0, SEEK_CUR) == -1 && errno == ESPIPE)
unix_stream->priv->is_pipe_or_socket = TRUE;
else
unix_stream->priv->is_pipe_or_socket = FALSE;
break; break;
case PROP_CLOSE_FD: case PROP_CLOSE_FD:
unix_stream->priv->close_fd = g_value_get_boolean (value); unix_stream->priv->close_fd = g_value_get_boolean (value);
@ -360,7 +368,8 @@ g_unix_input_stream_read (GInputStream *stream,
unix_stream = G_UNIX_INPUT_STREAM (stream); unix_stream = G_UNIX_INPUT_STREAM (stream);
if (g_cancellable_make_pollfd (cancellable, &poll_fds[1])) if (unix_stream->priv->is_pipe_or_socket &&
g_cancellable_make_pollfd (cancellable, &poll_fds[1]))
{ {
poll_fds[0].fd = unix_stream->priv->fd; poll_fds[0].fd = unix_stream->priv->fd;
poll_fds[0].events = G_IO_IN; poll_fds[0].events = G_IO_IN;
@ -511,6 +520,14 @@ g_unix_input_stream_read_async (GInputStream *stream,
unix_stream = G_UNIX_INPUT_STREAM (stream); unix_stream = G_UNIX_INPUT_STREAM (stream);
if (!unix_stream->priv->is_pipe_or_socket)
{
G_INPUT_STREAM_CLASS (g_unix_input_stream_parent_class)->
read_async (stream, buffer, count, io_priority,
cancellable, callback, user_data);
return;
}
data = g_new0 (ReadAsyncData, 1); data = g_new0 (ReadAsyncData, 1);
data->count = count; data->count = count;
data->buffer = buffer; data->buffer = buffer;
@ -535,9 +552,16 @@ g_unix_input_stream_read_finish (GInputStream *stream,
GAsyncResult *result, GAsyncResult *result,
GError **error) GError **error)
{ {
GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
GSimpleAsyncResult *simple; GSimpleAsyncResult *simple;
gssize nread; gssize nread;
if (!unix_stream->priv->is_pipe_or_socket)
{
return G_INPUT_STREAM_CLASS (g_unix_input_stream_parent_class)->
read_finish (stream, result, error);
}
simple = G_SIMPLE_ASYNC_RESULT (result); simple = G_SIMPLE_ASYNC_RESULT (result);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_unix_input_stream_read_async); g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_unix_input_stream_read_async);

View File

@ -46,9 +46,12 @@
* @include: gio/gunixoutputstream.h * @include: gio/gunixoutputstream.h
* @see_also: #GOutputStream * @see_also: #GOutputStream
* *
* #GUnixOutputStream implements #GOutputStream for writing to a * #GUnixOutputStream implements #GOutputStream for writing to a UNIX
* UNIX file descriptor, including asynchronous operations. The file * file descriptor, including asynchronous operations. (If the file
* descriptor must be selectable, so it doesn't work with opened files. * descriptor refers to a socket or pipe, this will use poll() to do
* asynchronous I/O. If it refers to a regular file, it will fall back
* to doing asynchronous I/O in another thread like
* #GLocalFileOutputStream.)
* *
* Note that <filename>&lt;gio/gunixoutputstream.h&gt;</filename> belongs * Note that <filename>&lt;gio/gunixoutputstream.h&gt;</filename> belongs
* to the UNIX-specific GIO interfaces, thus you have to use the * to the UNIX-specific GIO interfaces, thus you have to use the
@ -73,7 +76,8 @@ G_DEFINE_TYPE_WITH_CODE (GUnixOutputStream, g_unix_output_stream, G_TYPE_OUTPUT_
struct _GUnixOutputStreamPrivate { struct _GUnixOutputStreamPrivate {
int fd; int fd;
gboolean close_fd; guint close_fd : 1;
guint is_pipe_or_socket : 1;
}; };
static void g_unix_output_stream_set_property (GObject *object, static void g_unix_output_stream_set_property (GObject *object,
@ -198,6 +202,10 @@ g_unix_output_stream_set_property (GObject *object,
{ {
case PROP_FD: case PROP_FD:
unix_stream->priv->fd = g_value_get_int (value); unix_stream->priv->fd = g_value_get_int (value);
if (lseek (unix_stream->priv->fd, 0, SEEK_CUR) == -1 && errno == ESPIPE)
unix_stream->priv->is_pipe_or_socket = TRUE;
else
unix_stream->priv->is_pipe_or_socket = FALSE;
break; break;
case PROP_CLOSE_FD: case PROP_CLOSE_FD:
unix_stream->priv->close_fd = g_value_get_boolean (value); unix_stream->priv->close_fd = g_value_get_boolean (value);
@ -345,7 +353,8 @@ g_unix_output_stream_write (GOutputStream *stream,
unix_stream = G_UNIX_OUTPUT_STREAM (stream); unix_stream = G_UNIX_OUTPUT_STREAM (stream);
if (g_cancellable_make_pollfd (cancellable, &poll_fds[1])) if (unix_stream->priv->is_pipe_or_socket &&
g_cancellable_make_pollfd (cancellable, &poll_fds[1]))
{ {
poll_fds[0].fd = unix_stream->priv->fd; poll_fds[0].fd = unix_stream->priv->fd;
poll_fds[0].events = G_IO_OUT; poll_fds[0].events = G_IO_OUT;
@ -497,6 +506,14 @@ g_unix_output_stream_write_async (GOutputStream *stream,
unix_stream = G_UNIX_OUTPUT_STREAM (stream); unix_stream = G_UNIX_OUTPUT_STREAM (stream);
if (!unix_stream->priv->is_pipe_or_socket)
{
G_OUTPUT_STREAM_CLASS (g_unix_output_stream_parent_class)->
write_async (stream, buffer, count, io_priority,
cancellable, callback, user_data);
return;
}
data = g_new0 (WriteAsyncData, 1); data = g_new0 (WriteAsyncData, 1);
data->count = count; data->count = count;
data->buffer = buffer; data->buffer = buffer;
@ -521,9 +538,16 @@ g_unix_output_stream_write_finish (GOutputStream *stream,
GAsyncResult *result, GAsyncResult *result,
GError **error) GError **error)
{ {
GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream);
GSimpleAsyncResult *simple; GSimpleAsyncResult *simple;
gssize nwritten; gssize nwritten;
if (!unix_stream->priv->is_pipe_or_socket)
{
return G_OUTPUT_STREAM_CLASS (g_unix_output_stream_parent_class)->
write_finish (stream, result, error);
}
simple = G_SIMPLE_ASYNC_RESULT (result); simple = G_SIMPLE_ASYNC_RESULT (result);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_unix_output_stream_write_async); g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_unix_output_stream_write_async);