Add pollable input/output streams

When interfacing with APIs that expect unix-style async I/O, it is
useful to be able to tell in advance whether a read/write is going to
block. This adds new interfaces GPollableInputStream and
GPollableOutputStream that can be implemented by a GInputStream or
GOutputStream to add _is_readable/_is_writable, _create_source, and
_read_nonblocking/_write_nonblocking methods.

Also, implement for GUnixInput/OutputStream and
GSocketInput/OutputStream

https://bugzilla.gnome.org/show_bug.cgi?id=634241
This commit is contained in:
Dan Winship 2010-09-18 13:05:25 -04:00
parent 6181c7de36
commit c20c2c0abd
19 changed files with 1251 additions and 12 deletions

View File

@ -68,6 +68,8 @@
<xi:include href="xml/gunixoutputstream.xml"/> <xi:include href="xml/gunixoutputstream.xml"/>
<xi:include href="xml/gconverterinputstream.xml"/> <xi:include href="xml/gconverterinputstream.xml"/>
<xi:include href="xml/gconverteroutputstream.xml"/> <xi:include href="xml/gconverteroutputstream.xml"/>
<xi:include href="xml/gpollableinputstream.xml"/>
<xi:include href="xml/gpollableoutputstream.xml"/>
</chapter> </chapter>
<chapter id="types"> <chapter id="types">
<title>File types and applications</title> <title>File types and applications</title>

View File

@ -2934,3 +2934,44 @@ G_IS_PERIODIC
<SUBSECTION Private> <SUBSECTION Private>
g_periodic_get_type g_periodic_get_type
</SECTION> </SECTION>
<SECTION>
<FILE>gpollableinputstream</FILE>
<TITLE>GPollableInputStream</TITLE>
GPollableInputStream
GPollableInputStreamInterface
<SUBSECTION>
g_pollable_input_stream_can_poll
g_pollable_input_stream_is_readable
g_pollable_input_stream_create_source
g_pollable_input_stream_read_nonblocking
<SUBSECTION>
GPollableSourceFunc
g_pollable_source_new
<SUBSECTION Standard>
G_POLLABLE_INPUT_STREAM
G_POLLABLE_INPUT_STREAM_GET_INTERFACE
G_IS_POLLABLE_INPUT_STREAM
G_TYPE_POLLABLE_INPUT_STREAM
<SUBSECTION Private>
g_pollable_input_stream_get_type
</SECTION>
<SECTION>
<FILE>gpollableoutputstream</FILE>
<TITLE>GPollableOutputStream</TITLE>
GPollableOutputStream
GPollableOutputStreamInterface
<SUBSECTION>
g_pollable_output_stream_can_poll
g_pollable_output_stream_is_writable
g_pollable_output_stream_create_source
g_pollable_output_stream_write_nonblocking
<SUBSECTION Standard>
G_POLLABLE_OUTPUT_STREAM
G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE
G_IS_POLLABLE_OUTPUT_STREAM
G_TYPE_POLLABLE_OUTPUT_STREAM
<SUBSECTION Private>
g_pollable_output_stream_get_type
</SECTION>

View File

@ -76,6 +76,9 @@ g_output_stream_splice_flags_get_type
g_password_save_get_type g_password_save_get_type
g_periodic_get_type g_periodic_get_type
g_permission_get_type g_permission_get_type
g_pollable_input_stream_get_type
g_pollable_io_stream_get_type
g_pollable_output_stream_get_type
g_proxy_address_enumerator_get_type g_proxy_address_enumerator_get_type
g_proxy_address_get_type g_proxy_address_get_type
g_proxy_get_type g_proxy_get_type

View File

@ -349,6 +349,8 @@ libgio_2_0_la_SOURCES = \
goutputstream.c \ goutputstream.c \
gperiodic.c \ gperiodic.c \
gpermission.c \ gpermission.c \
gpollableinputstream.c \
gpollableoutputstream.c \
gpollfilemonitor.c \ gpollfilemonitor.c \
gpollfilemonitor.h \ gpollfilemonitor.h \
gproxyresolver.c \ gproxyresolver.c \
@ -505,6 +507,8 @@ gio_headers = \
goutputstream.h \ goutputstream.h \
gperiodic.h \ gperiodic.h \
gpermission.h \ gpermission.h \
gpollableinputstream.h \
gpollableoutputstream.h \
gproxyaddress.h \ gproxyaddress.h \
gproxy.h \ gproxy.h \
gproxyaddressenumerator.h \ gproxyaddressenumerator.h \

View File

@ -95,6 +95,8 @@
#include <gio/goutputstream.h> #include <gio/goutputstream.h>
#include <gio/gperiodic.h> #include <gio/gperiodic.h>
#include <gio/gpermission.h> #include <gio/gpermission.h>
#include <gio/gpollableinputstream.h>
#include <gio/gpollableoutputstream.h>
#include <gio/gproxy.h> #include <gio/gproxy.h>
#include <gio/gproxyaddress.h> #include <gio/gproxyaddress.h>
#include <gio/gproxyaddressenumerator.h> #include <gio/gproxyaddressenumerator.h>

View File

@ -1973,3 +1973,24 @@ g_periodic_remove
g_periodic_unblock g_periodic_unblock
#endif #endif
#endif #endif
#if IN_HEADER(__G_POLLABLE_INPUT_STREAM_H__)
#if IN_FILE(__G_POLLABLE_INPUT_STREAM_C__)
g_pollable_input_stream_get_type G_GNUC_CONST
g_pollable_input_stream_can_poll
g_pollable_input_stream_create_source
g_pollable_input_stream_is_readable
g_pollable_input_stream_read_nonblocking
g_pollable_source_new
#endif
#endif
#if IN_HEADER(__G_POLLABLE_OUTPUT_STREAM_H__)
#if IN_FILE(__G_POLLABLE_OUTPUT_STREAM_C__)
g_pollable_output_stream_get_type G_GNUC_CONST
g_pollable_output_stream_can_poll
g_pollable_output_stream_create_source
g_pollable_output_stream_is_writable
g_pollable_output_stream_write_nonblocking
#endif
#endif

View File

@ -124,6 +124,8 @@ typedef struct _GNetworkAddress GNetworkAddress;
typedef struct _GNetworkService GNetworkService; typedef struct _GNetworkService GNetworkService;
typedef struct _GOutputStream GOutputStream; typedef struct _GOutputStream GOutputStream;
typedef struct _GIOStream GIOStream; typedef struct _GIOStream GIOStream;
typedef struct _GPollableInputStream GPollableInputStream; /* Dummy typedef */
typedef struct _GPollableOutputStream GPollableOutputStream; /* Dummy typedef */
typedef struct _GResolver GResolver; typedef struct _GResolver GResolver;
typedef struct _GSeekable GSeekable; typedef struct _GSeekable GSeekable;
typedef struct _GSimpleAsyncResult GSimpleAsyncResult; typedef struct _GSimpleAsyncResult GSimpleAsyncResult;
@ -391,6 +393,22 @@ typedef struct _GDBusNodeInfo GDBusNodeInfo;
typedef gboolean (*GCancellableSourceFunc) (GCancellable *cancellable, typedef gboolean (*GCancellableSourceFunc) (GCancellable *cancellable,
gpointer user_data); gpointer user_data);
/**
* GPollableSourceFunc:
* @pollable_stream: the #GPollableInputStream or #GPollableOutputStream
* @user_data: data passed in by the user.
*
* This is the function type of the callback used for the #GSource
* returned by g_pollable_input_stream_create_source() and
* g_pollable_output_stream_create_source().
*
* Returns: it should return %FALSE if the source should be removed.
*
* Since: 2.28
*/
typedef gboolean (*GPollableSourceFunc) (GObject *pollable_stream,
gpointer user_data);
G_END_DECLS G_END_DECLS
#endif /* __GIO_TYPES_H__ */ #endif /* __GIO_TYPES_H__ */

304
gio/gpollableinputstream.c Normal file
View File

@ -0,0 +1,304 @@
/* GIO - GLib Input, Output and Streaming Library
*
* Copyright (C) 2010 Red Hat, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General
* Public License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place, Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include "config.h"
#include <errno.h>
#include "gpollableinputstream.h"
#include "gasynchelper.h"
#include "gio-marshal.h"
#include "glibintl.h"
/**
* SECTION:gpollableinputstream
* @short_description: Interface for pollable input streams
* @include: gio/gio.h
* @see_also: #GInputStream, #GPollableOutputStream, #GFileDescriptorBased
*
* #GPollableInputStream is implemented by #GInputStream<!-- -->s that
* can be polled for readiness to read. This can be used when
* interfacing with a non-gio API that expects
* unix-file-descriptor-style asynchronous I/O rather than gio-style.
*
* Since: 2.28
*/
G_DEFINE_INTERFACE (GPollableInputStream, g_pollable_input_stream, G_TYPE_INPUT_STREAM)
static gboolean g_pollable_input_stream_default_can_poll (GPollableInputStream *stream);
static gssize g_pollable_input_stream_default_read_nonblocking (GPollableInputStream *stream,
void *buffer,
gsize size,
GError **error);
static void
g_pollable_input_stream_default_init (GPollableInputStreamInterface *iface)
{
iface->can_poll = g_pollable_input_stream_default_can_poll;
iface->read_nonblocking = g_pollable_input_stream_default_read_nonblocking;
}
static gboolean
g_pollable_input_stream_default_can_poll (GPollableInputStream *stream)
{
return TRUE;
}
/**
* g_pollable_input_stream_can_poll:
* @stream: a #GPollableInputStream.
*
* Checks if @stream is actually pollable. Some classes may implement
* #GPollableInputStream but have only certain instances of that class
* be pollable. If this method returns %FALSE, then the behavior of
* other #GPollableInputStream methods is undefined.
*
* For any given stream, the value returned by this method is constant;
* a stream cannot switch from pollable to non-pollable or vice versa.
*
* Returns: %TRUE if @stream is pollable, %FALSE if not.
*
* Since: 2.28
*/
gboolean
g_pollable_input_stream_can_poll (GPollableInputStream *stream)
{
g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), FALSE);
return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->can_poll (stream);
}
/**
* g_pollable_input_stream_is_readable:
* @stream: a #GPollableInputStream.
*
* Checks if @stream can be read.
*
* Note that some stream types may not be able to implement this 100%
* reliably, and it is possible that a call to g_input_stream_read()
* after this returns %TRUE would still block. To guarantee
* non-blocking behavior, you should always use
* g_pollable_input_stream_read_nonblocking(), which will return a
* %G_IO_ERROR_WOULD_BLOCK error rather than blocking.
*
* Returns: %TRUE if @stream is readable, %FALSE if not. If an error
* has occurred on @stream, this will result in
* g_pollable_input_stream_is_readable() returning %TRUE, and the
* next attempt to read will return the error.
*
* Since: 2.28
*/
gboolean
g_pollable_input_stream_is_readable (GPollableInputStream *stream)
{
g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), FALSE);
return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->is_readable (stream);
}
/**
* g_pollable_input_stream_create_source:
* @stream: a #GPollableInputStream.
* @cancellable: a #GCancellable, or %NULL
*
* Creates a #GSource that triggers when @stream can be read, or
* @cancellable is triggered or an error occurs. The callback on the
* source is of the #GPollableSourceFunc type.
*
* As with g_pollable_input_stream_is_readable(), it is possible that
* the stream may not actually be readable even after the source
* triggers, so you should use
* g_pollable_input_stream_read_nonblocking() rather than
* g_input_stream_read() from the callback.
*
* Returns: a new #GSource
*
* Since: 2.28
*/
GSource *
g_pollable_input_stream_create_source (GPollableInputStream *stream,
GCancellable *cancellable)
{
g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), NULL);
return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
create_source (stream, cancellable);
}
static gssize
g_pollable_input_stream_default_read_nonblocking (GPollableInputStream *stream,
void *buffer,
gsize size,
GError **error)
{
if (!g_pollable_input_stream_is_readable (stream))
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
g_strerror (EAGAIN));
return -1;
}
return g_input_stream_read (G_INPUT_STREAM (stream), buffer, size,
NULL, error);
}
/**
* g_pollable_input_stream_read_nonblocking:
* @stream: a #GPollableInputStream
* @buffer: a buffer to read data into (which should be at least @size
* bytes long).
* @size: the number of bytes you want to read
* @cancellable: a #GCancellable, or %NULL
* @error: #GError for error reporting, or %NULL to ignore.
*
* Attempts to read up to @size bytes from @stream into @buffer, as
* with g_input_stream_read(). If @stream is not currently readable,
* this will immediately return %G_IO_ERROR_WOULD_BLOCK, and you can
* use g_pollable_input_stream_create_source() to create a #GSource
* that will be triggered when @stream is readable.
*
* Note that since this method never blocks, you cannot actually
* use @cancellable to cancel it. However, it will return an error
* if @cancellable has already been cancelled when you call, which
* may happen if you call this method after a source triggers due
* to having been cancelled.
*
* Return value: the number of bytes read, or -1 on error (including
* %G_IO_ERROR_WOULD_BLOCK).
*/
gssize
g_pollable_input_stream_read_nonblocking (GPollableInputStream *stream,
void *buffer,
gsize size,
GCancellable *cancellable,
GError **error)
{
g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), -1);
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return -1;
return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
read_nonblocking (stream, buffer, size, error);
}
/* GPollableSource */
typedef struct {
GSource source;
GObject *stream;
} GPollableSource;
static gboolean
pollable_source_prepare (GSource *source,
gint *timeout)
{
*timeout = -1;
return FALSE;
}
static gboolean
pollable_source_check (GSource *source)
{
return FALSE;
}
static gboolean
pollable_source_dispatch (GSource *source,
GSourceFunc callback,
gpointer user_data)
{
GPollableSourceFunc func = (GPollableSourceFunc)callback;
GPollableSource *pollable_source = (GPollableSource *)source;
return (*func) (pollable_source->stream, user_data);
}
static void
pollable_source_finalize (GSource *source)
{
GPollableSource *pollable_source = (GPollableSource *)source;
g_object_unref (pollable_source->stream);
}
static gboolean
pollable_source_closure_callback (GObject *stream,
gpointer data)
{
GClosure *closure = data;
GValue param = { 0, };
GValue result_value = { 0, };
gboolean result;
g_value_init (&result_value, G_TYPE_BOOLEAN);
g_value_init (&param, G_TYPE_OBJECT);
g_value_set_object (&param, stream);
g_closure_invoke (closure, &result_value, 1, &param, NULL);
result = g_value_get_boolean (&result_value);
g_value_unset (&result_value);
g_value_unset (&param);
return result;
}
static GSourceFuncs pollable_source_funcs =
{
pollable_source_prepare,
pollable_source_check,
pollable_source_dispatch,
pollable_source_finalize,
(GSourceFunc)pollable_source_closure_callback,
(GSourceDummyMarshal)_gio_marshal_BOOLEAN__VOID,
};
/**
* g_pollable_source_new:
* @pollable_stream: the stream associated with the new source
*
* Utility method for #GPollableInputStream and #GPollableOutputStream
* implementations. Creates a new #GSource that expects a callback of
* type #GPollableSourceFunc. The new source does not actually do
* anything on its own; use g_source_add_child_source() to add other
* sources to it to cause it to trigger.
*
* Return value: the new #GSource.
*
* Since: 2.28
*/
GSource *
g_pollable_source_new (GObject *pollable_stream)
{
GSource *source;
GPollableSource *pollable_source;
source = g_source_new (&pollable_source_funcs, sizeof (GPollableSource));
g_source_set_name (source, "GPollableSource");
pollable_source = (GPollableSource *)source;
pollable_source->stream = g_object_ref (pollable_stream);
return source;
}

101
gio/gpollableinputstream.h Normal file
View File

@ -0,0 +1,101 @@
/* GIO - GLib Input, Output and Streaming Library
*
* Copyright (C) 2010 Red Hat, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General
* Public License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place, Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __G_POLLABLE_INPUT_STREAM_H__
#define __G_POLLABLE_INPUT_STREAM_H__
#include <gio/gio.h>
G_BEGIN_DECLS
#define G_TYPE_POLLABLE_INPUT_STREAM (g_pollable_input_stream_get_type ())
#define G_POLLABLE_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), G_TYPE_POLLABLE_INPUT_STREAM, GPollableInputStream))
#define G_IS_POLLABLE_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), G_TYPE_POLLABLE_INPUT_STREAM))
#define G_POLLABLE_INPUT_STREAM_GET_INTERFACE(obj) (G_TYPE_INSTANCE_GET_INTERFACE ((obj), G_TYPE_POLLABLE_INPUT_STREAM, GPollableInputStreamInterface))
/**
* GPollableInputStream:
*
* An interface for a #GInputStream that can be polled for readability.
*
* Since: 2.28
*/
typedef struct _GPollableInputStreamInterface GPollableInputStreamInterface;
/**
* GPollableInputStreamInterface:
* @g_iface: The parent interface.
* @can_poll: Checks if the #GPollableInputStream instance is actually pollable
* @is_readable: Checks if the stream is readable
* @create_source: Creates a #GSource to poll the stream
* @read_nonblocking: Does a non-blocking read or returns
* %G_IO_ERROR_WOULD_BLOCK
*
* The interface for pollable input streams.
*
* The default implementation of @can_poll always returns %TRUE.
*
* The default implementation of @read_nonblocking calls
* g_pollable_input_stream_is_readable(), and then calls
* g_input_stream_read() if it returns %TRUE. This means you only need
* to override it if it is possible that your @is_readable
* implementation may return %TRUE when the stream is not actually
* readable.
*
* Since: 2.28
*/
struct _GPollableInputStreamInterface
{
GTypeInterface g_iface;
/* Virtual Table */
gboolean (*can_poll) (GPollableInputStream *stream);
gboolean (*is_readable) (GPollableInputStream *stream);
GSource * (*create_source) (GPollableInputStream *stream,
GCancellable *cancellable);
gssize (*read_nonblocking) (GPollableInputStream *stream,
void *buffer,
gsize size,
GError **error);
};
GType g_pollable_input_stream_get_type (void) G_GNUC_CONST;
gboolean g_pollable_input_stream_can_poll (GPollableInputStream *stream);
gboolean g_pollable_input_stream_is_readable (GPollableInputStream *stream);
GSource *g_pollable_input_stream_create_source (GPollableInputStream *stream,
GCancellable *cancellable);
gssize g_pollable_input_stream_read_nonblocking (GPollableInputStream *stream,
void *buffer,
gsize size,
GCancellable *cancellable,
GError **error);
/* Helper method for stream implementations */
GSource *g_pollable_source_new (GObject *stream);
G_END_DECLS
#endif /* __G_POLLABLE_INPUT_STREAM_H__ */

201
gio/gpollableoutputstream.c Normal file
View File

@ -0,0 +1,201 @@
/* GIO - GLib Input, Output and Streaming Library
*
* Copyright (C) 2010 Red Hat, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General
* Public License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place, Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include "config.h"
#include <errno.h>
#include "gpollableoutputstream.h"
#include "gasynchelper.h"
#include "gfiledescriptorbased.h"
#include "gio-marshal.h"
#include "glibintl.h"
/**
* SECTION:gpollableoutputstream
* @short_description: Interface for pollable output streams
* @include: gio/gio.h
* @see_also: #GOutputStream, #GFileDescriptorBased, #GPollableInputStream
*
* #GPollableOutputStream is implemented by #GOutputStream<!-- -->s that
* can be polled for readiness to write. This can be used when
* interfacing with a non-gio API that expects
* unix-file-descriptor-style asynchronous I/O rather than gio-style.
*
* Since: 2.28
*/
G_DEFINE_INTERFACE (GPollableOutputStream, g_pollable_output_stream, G_TYPE_OUTPUT_STREAM)
static gboolean g_pollable_output_stream_default_can_poll (GPollableOutputStream *stream);
static gssize g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stream,
const void *buffer,
gsize size,
GError **error);
static void
g_pollable_output_stream_default_init (GPollableOutputStreamInterface *iface)
{
iface->can_poll = g_pollable_output_stream_default_can_poll;
iface->write_nonblocking = g_pollable_output_stream_default_write_nonblocking;
}
static gboolean
g_pollable_output_stream_default_can_poll (GPollableOutputStream *stream)
{
return TRUE;
}
/**
* g_pollable_output_stream_can_poll:
* @stream: a #GPollableOutputStream.
*
* Checks if @stream is actually pollable. Some classes may implement
* #GPollableOutputStream but have only certain instances of that
* class be pollable. If this method returns %FALSE, then the behavior
* of other #GPollableOutputStream methods is undefined.
*
* For any given stream, the value returned by this method is constant;
* a stream cannot switch from pollable to non-pollable or vice versa.
*
* Returns: %TRUE if @stream is pollable, %FALSE if not.
*
* Since: 2.28
*/
gboolean
g_pollable_output_stream_can_poll (GPollableOutputStream *stream)
{
g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), FALSE);
return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->can_poll (stream);
}
/**
* g_pollable_output_stream_is_writable:
* @stream: a #GPollableOutputStream.
*
* Checks if @stream can be written.
*
* Note that some stream types may not be able to implement this 100%
* reliably, and it is possible that a call to g_output_stream_write()
* after this returns %TRUE would still block. To guarantee
* non-blocking behavior, you should always use
* g_pollable_output_stream_write_nonblocking(), which will return a
* %G_IO_ERROR_WOULD_BLOCK error rather than blocking.
*
* Returns: %TRUE if @stream is writable, %FALSE if not. If an error
* has occurred on @stream, this will result in
* g_pollable_output_stream_is_writable() returning %TRUE, and the
* next attempt to write will return the error.
*
* Since: 2.28
*/
gboolean
g_pollable_output_stream_is_writable (GPollableOutputStream *stream)
{
g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), FALSE);
return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->is_writable (stream);
}
/**
* g_pollable_output_stream_create_source:
* @stream: a #GPollableOutputStream.
* @cancellable: a #GCancellable, or %NULL
*
* Creates a #GSource that triggers when @stream can be written, or
* @cancellable is triggered or an error occurs. The callback on the
* source is of the #GPollableSourceFunc type.
*
* As with g_pollable_output_stream_is_writable(), it is possible that
* the stream may not actually be writable even after the source
* triggers, so you should use
* g_pollable_output_stream_write_nonblocking() rather than
* g_output_stream_write() from the callback.
*
* Returns: a new #GSource
*
* Since: 2.28
*/
GSource *
g_pollable_output_stream_create_source (GPollableOutputStream *stream,
GCancellable *cancellable)
{
g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), NULL);
return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
create_source (stream, cancellable);
}
static gssize
g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stream,
const void *buffer,
gsize size,
GError **error)
{
if (!g_pollable_output_stream_is_writable (stream))
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
g_strerror (EAGAIN));
return -1;
}
return g_output_stream_write (G_OUTPUT_STREAM (stream), buffer, size,
NULL, error);
}
/**
* g_pollable_output_stream_write_nonblocking:
* @stream: a #GPollableOutputStream
* @buffer: a buffer to write data from
* @size: the number of bytes you want to write
* @cancellable: a #GCancellable, or %NULL
* @error: #GError for error reporting, or %NULL to ignore.
*
* Attempts to write up to @size bytes from @buffer to @stream, as
* with g_output_stream_write(). If @stream is not currently writable,
* this will immediately return %G_IO_ERROR_WOULD_BLOCK, and you can
* use g_pollable_output_stream_create_source() to create a #GSource
* that will be triggered when @stream is writable.
*
* Note that since this method never blocks, you cannot actually
* use @cancellable to cancel it. However, it will return an error
* if @cancellable has already been cancelled when you call, which
* may happen if you call this method after a source triggers due
* to having been cancelled.
*
* Return value: the number of bytes written, or -1 on error (including
* %G_IO_ERROR_WOULD_BLOCK).
*/
gssize
g_pollable_output_stream_write_nonblocking (GPollableOutputStream *stream,
const void *buffer,
gsize size,
GCancellable *cancellable,
GError **error)
{
g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), -1);
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return -1;
return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
write_nonblocking (stream, buffer, size, error);
}

View File

@ -0,0 +1,98 @@
/* GIO - GLib Input, Output and Streaming Library
*
* Copyright (C) 2010 Red Hat, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General
* Public License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place, Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __G_POLLABLE_OUTPUT_STREAM_H__
#define __G_POLLABLE_OUTPUT_STREAM_H__
#include <gio/gio.h>
G_BEGIN_DECLS
#define G_TYPE_POLLABLE_OUTPUT_STREAM (g_pollable_output_stream_get_type ())
#define G_POLLABLE_OUTPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), G_TYPE_POLLABLE_OUTPUT_STREAM, GPollableOutputStream))
#define G_IS_POLLABLE_OUTPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), G_TYPE_POLLABLE_OUTPUT_STREAM))
#define G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE(obj) (G_TYPE_INSTANCE_GET_INTERFACE ((obj), G_TYPE_POLLABLE_OUTPUT_STREAM, GPollableOutputStreamInterface))
/**
* GPollableOutputStream:
*
* An interface for a #GOutputStream that can be polled for readability.
*
* Since: 2.28
*/
typedef struct _GPollableOutputStreamInterface GPollableOutputStreamInterface;
/**
* GPollableOutputStreamInterface:
* @g_iface: The parent interface.
* @can_poll: Checks if the #GPollableOutputStream instance is actually pollable
* @is_writable: Checks if the stream is writable
* @create_source: Creates a #GSource to poll the stream
* @write_nonblocking: Does a non-blocking write or returns
* %G_IO_ERROR_WOULD_BLOCK
*
* The interface for pollable output streams.
*
* The default implementation of @can_poll always returns %TRUE.
*
* The default implementation of @write_nonblocking calls
* g_pollable_output_stream_is_writable(), and then calls
* g_output_stream_write() if it returns %TRUE. This means you only
* need to override it if it is possible that your @is_writable
* implementation may return %TRUE when the stream is not actually
* writable.
*
* Since: 2.28
*/
struct _GPollableOutputStreamInterface
{
GTypeInterface g_iface;
/* Virtual Table */
gboolean (*can_poll) (GPollableOutputStream *stream);
gboolean (*is_writable) (GPollableOutputStream *stream);
GSource * (*create_source) (GPollableOutputStream *stream,
GCancellable *cancellable);
gssize (*write_nonblocking) (GPollableOutputStream *stream,
const void *buffer,
gsize size,
GError **error);
};
GType g_pollable_output_stream_get_type (void) G_GNUC_CONST;
gboolean g_pollable_output_stream_can_poll (GPollableOutputStream *stream);
gboolean g_pollable_output_stream_is_writable (GPollableOutputStream *stream);
GSource *g_pollable_output_stream_create_source (GPollableOutputStream *stream,
GCancellable *cancellable);
gssize g_pollable_output_stream_write_nonblocking (GPollableOutputStream *stream,
const void *buffer,
gsize size,
GCancellable *cancellable,
GError **error);
G_END_DECLS
#endif /* __G_POLLABLE_OUTPUT_STREAM_H__ */

View File

@ -60,8 +60,7 @@
* Since: 2.22 * Since: 2.22
*/ */
G_DEFINE_TYPE (GSocketConnection, G_DEFINE_TYPE (GSocketConnection, g_socket_connection, G_TYPE_IO_STREAM);
g_socket_connection, G_TYPE_IO_STREAM);
enum enum
{ {

View File

@ -27,13 +27,18 @@
#include "gsocketinputstream.h" #include "gsocketinputstream.h"
#include "glibintl.h" #include "glibintl.h"
#include <gio/gsimpleasyncresult.h> #include "gsimpleasyncresult.h"
#include <gio/gcancellable.h> #include "gcancellable.h"
#include <gio/gioerror.h> #include "gpollableinputstream.h"
#include "gioerror.h"
static void g_socket_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
#define g_socket_input_stream_get_type _g_socket_input_stream_get_type #define g_socket_input_stream_get_type _g_socket_input_stream_get_type
G_DEFINE_TYPE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM); G_DEFINE_TYPE_WITH_CODE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM,
G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, g_socket_input_stream_pollable_iface_init)
)
enum enum
{ {
@ -205,6 +210,44 @@ g_socket_input_stream_read_finish (GInputStream *stream,
return count; return count;
} }
static gboolean
g_socket_input_stream_pollable_is_readable (GPollableInputStream *pollable)
{
GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
return g_socket_condition_check (input_stream->priv->socket, G_IO_IN);
}
static GSource *
g_socket_input_stream_pollable_create_source (GPollableInputStream *pollable,
GCancellable *cancellable)
{
GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
GSource *socket_source, *pollable_source;
pollable_source = g_pollable_source_new (G_OBJECT (input_stream));
socket_source = g_socket_create_source (input_stream->priv->socket,
G_IO_IN, cancellable);
g_source_set_dummy_callback (socket_source);
g_source_add_child_source (pollable_source, socket_source);
g_source_unref (socket_source);
return pollable_source;
}
static gssize
g_socket_input_stream_pollable_read_nonblocking (GPollableInputStream *pollable,
void *buffer,
gsize size,
GError **error)
{
GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
return g_socket_receive_with_blocking (input_stream->priv->socket,
buffer, size, FALSE,
NULL, error);
}
static void static void
g_socket_input_stream_class_init (GSocketInputStreamClass *klass) g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
{ {
@ -229,6 +272,14 @@ g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
} }
static void
g_socket_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
{
iface->is_readable = g_socket_input_stream_pollable_is_readable;
iface->create_source = g_socket_input_stream_pollable_create_source;
iface->read_nonblocking = g_socket_input_stream_pollable_read_nonblocking;
}
static void static void
g_socket_input_stream_init (GSocketInputStream *stream) g_socket_input_stream_init (GSocketInputStream *stream)
{ {

View File

@ -28,14 +28,20 @@
#include "gsocketoutputstream.h" #include "gsocketoutputstream.h"
#include "gsocket.h" #include "gsocket.h"
#include <gio/gsimpleasyncresult.h> #include "gsimpleasyncresult.h"
#include <gio/gcancellable.h> #include "gcancellable.h"
#include <gio/gioerror.h> #include "gpollableinputstream.h"
#include "gpollableoutputstream.h"
#include "gioerror.h"
#include "glibintl.h" #include "glibintl.h"
static void g_socket_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
#define g_socket_output_stream_get_type _g_socket_output_stream_get_type #define g_socket_output_stream_get_type _g_socket_output_stream_get_type
G_DEFINE_TYPE (GSocketOutputStream, g_socket_output_stream, G_TYPE_OUTPUT_STREAM); G_DEFINE_TYPE_WITH_CODE (GSocketOutputStream, g_socket_output_stream, G_TYPE_OUTPUT_STREAM,
G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, g_socket_output_stream_pollable_iface_init)
)
enum enum
{ {
@ -207,6 +213,44 @@ g_socket_output_stream_write_finish (GOutputStream *stream,
return count; return count;
} }
static gboolean
g_socket_output_stream_pollable_is_writable (GPollableOutputStream *pollable)
{
GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
return g_socket_condition_check (output_stream->priv->socket, G_IO_OUT);
}
static GSource *
g_socket_output_stream_pollable_create_source (GPollableOutputStream *pollable,
GCancellable *cancellable)
{
GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
GSource *socket_source, *pollable_source;
pollable_source = g_pollable_source_new (G_OBJECT (output_stream));
socket_source = g_socket_create_source (output_stream->priv->socket,
G_IO_OUT, cancellable);
g_source_set_dummy_callback (socket_source);
g_source_add_child_source (pollable_source, socket_source);
g_source_unref (socket_source);
return pollable_source;
}
static gssize
g_socket_output_stream_pollable_write_nonblocking (GPollableOutputStream *pollable,
const void *buffer,
gsize size,
GError **error)
{
GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
return g_socket_send_with_blocking (output_stream->priv->socket,
buffer, size, FALSE,
NULL, error);
}
static void static void
g_socket_output_stream_class_init (GSocketOutputStreamClass *klass) g_socket_output_stream_class_init (GSocketOutputStreamClass *klass)
{ {
@ -231,6 +275,14 @@ g_socket_output_stream_class_init (GSocketOutputStreamClass *klass)
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
} }
static void
g_socket_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
{
iface->is_writable = g_socket_output_stream_pollable_is_writable;
iface->create_source = g_socket_output_stream_pollable_create_source;
iface->write_nonblocking = g_socket_output_stream_pollable_write_nonblocking;
}
static void static void
g_socket_output_stream_init (GSocketOutputStream *stream) g_socket_output_stream_init (GSocketOutputStream *stream)
{ {

View File

@ -60,7 +60,12 @@ enum {
PROP_CLOSE_FD PROP_CLOSE_FD
}; };
G_DEFINE_TYPE (GUnixInputStream, g_unix_input_stream, G_TYPE_INPUT_STREAM); static void g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
G_DEFINE_TYPE_WITH_CODE (GUnixInputStream, g_unix_input_stream, G_TYPE_INPUT_STREAM,
G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
g_unix_input_stream_pollable_iface_init)
);
struct _GUnixInputStreamPrivate { struct _GUnixInputStreamPrivate {
int fd; int fd;
@ -111,6 +116,9 @@ static gboolean g_unix_input_stream_close_finish (GInputStream *stream,
GAsyncResult *result, GAsyncResult *result,
GError **error); GError **error);
static gboolean g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream);
static GSource *g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
GCancellable *cancellable);
static void static void
g_unix_input_stream_finalize (GObject *object) g_unix_input_stream_finalize (GObject *object)
@ -174,6 +182,13 @@ g_unix_input_stream_class_init (GUnixInputStreamClass *klass)
G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_STATIC_NAME | G_PARAM_STATIC_NICK | G_PARAM_STATIC_BLURB)); G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_STATIC_NAME | G_PARAM_STATIC_NICK | G_PARAM_STATIC_BLURB));
} }
static void
g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
{
iface->is_readable = g_unix_input_stream_pollable_is_readable;
iface->create_source = g_unix_input_stream_pollable_create_source;
}
static void static void
g_unix_input_stream_set_property (GObject *object, g_unix_input_stream_set_property (GObject *object,
guint prop_id, guint prop_id,
@ -637,3 +652,37 @@ g_unix_input_stream_close_finish (GInputStream *stream,
/* Failures handled in generic close_finish code */ /* Failures handled in generic close_finish code */
return TRUE; return TRUE;
} }
static gboolean
g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream)
{
GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
GPollFD poll_fd;
gint result;
poll_fd.fd = unix_stream->priv->fd;
poll_fd.events = G_IO_IN;
do
result = g_poll (&poll_fd, 1, 0);
while (result == -1 && errno == EINTR);
return poll_fd.revents != 0;
}
static GSource *
g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
GCancellable *cancellable)
{
GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
GSource *inner_source, *pollable_source;
pollable_source = g_pollable_source_new (G_OBJECT (stream));
inner_source = _g_fd_source_new (unix_stream->priv->fd, G_IO_IN, cancellable);
g_source_set_dummy_callback (inner_source);
g_source_add_child_source (pollable_source, inner_source);
g_source_unref (inner_source);
return pollable_source;
}

View File

@ -60,8 +60,12 @@ enum {
PROP_CLOSE_FD PROP_CLOSE_FD
}; };
G_DEFINE_TYPE (GUnixOutputStream, g_unix_output_stream, G_TYPE_OUTPUT_STREAM); static void g_unix_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
G_DEFINE_TYPE_WITH_CODE (GUnixOutputStream, g_unix_output_stream, G_TYPE_OUTPUT_STREAM,
G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
g_unix_output_stream_pollable_iface_init)
);
struct _GUnixOutputStreamPrivate { struct _GUnixOutputStreamPrivate {
int fd; int fd;
@ -103,6 +107,9 @@ static gboolean g_unix_output_stream_close_finish (GOutputStream *stream,
GAsyncResult *result, GAsyncResult *result,
GError **error); GError **error);
static gboolean g_unix_output_stream_pollable_is_writable (GPollableOutputStream *stream);
static GSource *g_unix_output_stream_pollable_create_source (GPollableOutputStream *stream,
GCancellable *cancellable);
static void static void
g_unix_output_stream_finalize (GObject *object) g_unix_output_stream_finalize (GObject *object)
@ -160,6 +167,13 @@ g_unix_output_stream_class_init (GUnixOutputStreamClass *klass)
G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_STATIC_NAME | G_PARAM_STATIC_NICK | G_PARAM_STATIC_BLURB)); G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_STATIC_NAME | G_PARAM_STATIC_NICK | G_PARAM_STATIC_BLURB));
} }
static void
g_unix_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
{
iface->is_writable = g_unix_output_stream_pollable_is_writable;
iface->create_source = g_unix_output_stream_pollable_create_source;
}
static void static void
g_unix_output_stream_set_property (GObject *object, g_unix_output_stream_set_property (GObject *object,
guint prop_id, guint prop_id,
@ -593,3 +607,37 @@ g_unix_output_stream_close_finish (GOutputStream *stream,
/* Failures handled in generic close_finish code */ /* Failures handled in generic close_finish code */
return TRUE; return TRUE;
} }
static gboolean
g_unix_output_stream_pollable_is_writable (GPollableOutputStream *stream)
{
GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream);
GPollFD poll_fd;
gint result;
poll_fd.fd = unix_stream->priv->fd;
poll_fd.events = G_IO_OUT;
do
result = g_poll (&poll_fd, 1, 0);
while (result == -1 && errno == EINTR);
return poll_fd.revents != 0;
}
static GSource *
g_unix_output_stream_pollable_create_source (GPollableOutputStream *stream,
GCancellable *cancellable)
{
GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream);
GSource *inner_source, *pollable_source;
pollable_source = g_pollable_source_new (G_OBJECT (stream));
inner_source = _g_fd_source_new (unix_stream->priv->fd, G_IO_OUT, cancellable);
g_source_set_dummy_callback (inner_source);
g_source_add_child_source (pollable_source, inner_source);
g_source_unref (inner_source);
return pollable_source;
}

View File

@ -60,6 +60,7 @@ memory-input-stream
memory-output-stream memory-output-stream
network-address network-address
org.gtk.test.enums.xml org.gtk.test.enums.xml
pollable
proxy proxy
readwrite readwrite
resolver resolver

View File

@ -43,6 +43,7 @@ TEST_PROGS += \
network-address \ network-address \
gdbus-message \ gdbus-message \
socket \ socket \
pollable \
$(NULL) $(NULL)
if OS_UNIX if OS_UNIX
@ -204,6 +205,9 @@ network_address_LDADD = $(progs_ldadd)
socket_SOURCE = socket.c socket_SOURCE = socket.c
socket_LDADD = $(progs_ldadd) socket_LDADD = $(progs_ldadd)
pollable_SOURCE = pollable.c
pollable_LDADD = $(progs_ldadd)
contexts_SOURCES = contexts.c contexts_SOURCES = contexts.c
contexts_LDADD = $(progs_ldadd) \ contexts_LDADD = $(progs_ldadd) \
$(top_builddir)/gthread/libgthread-2.0.la $(top_builddir)/gthread/libgthread-2.0.la

240
gio/tests/pollable.c Normal file
View File

@ -0,0 +1,240 @@
/* GLib testing framework examples and tests
*
* Copyright (C) 2010 Red Hat, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General
* Public License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place, Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include <gio/gio.h>
#ifdef G_OS_UNIX
#include <gio/gunixinputstream.h>
#include <gio/gunixoutputstream.h>
#endif
GMainLoop *loop;
GPollableInputStream *in;
GOutputStream *out;
static gboolean
poll_source_callback (GPollableInputStream *in,
gpointer user_data)
{
GError *error = NULL;
char buf[2];
gssize nread;
gboolean *success = user_data;
nread = g_pollable_input_stream_read_nonblocking (in, buf, 2, NULL, &error);
g_assert_no_error (error);
g_assert_cmpint (nread, ==, 2);
g_assert_cmpstr (buf, ==, "x");
*success = TRUE;
return FALSE;
}
static gboolean
check_source_readability_callback (gpointer user_data)
{
gboolean expected = GPOINTER_TO_INT (user_data);
gboolean readable;
readable = g_pollable_input_stream_is_readable (in);
g_assert_cmpint (readable, ==, expected);
return FALSE;
}
static gboolean
write_callback (gpointer user_data)
{
char *buf = "x";
gssize nwrote;
GError *error = NULL;
nwrote = g_output_stream_write (out, buf, 2, NULL, &error);
g_assert_no_error (error);
g_assert_cmpint (nwrote, ==, 2);
check_source_readability_callback (GINT_TO_POINTER (TRUE));
return FALSE;
}
static gboolean
check_source_and_quit_callback (gpointer user_data)
{
check_source_readability_callback (user_data);
g_main_loop_quit (loop);
return FALSE;
}
static void
test_streams (void)
{
gboolean readable;
GError *error = NULL;
char buf[1];
gssize nread;
GSource *poll_source;
gboolean success = FALSE;
readable = g_pollable_input_stream_is_readable (in);
g_assert (!readable);
nread = g_pollable_input_stream_read_nonblocking (in, buf, 1, NULL, &error);
g_assert_cmpint (nread, ==, -1);
g_assert_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
g_clear_error (&error);
/* Create 4 sources, in decreasing order of priority:
* 1. poll source on @in
* 2. idle source that checks if @in is readable once
* (it won't be) and then removes itself
* 3. idle source that writes a byte to @out, checks that
* @in is now readable, and removes itself
* 4. idle source that checks if @in is readable once
* (it won't be, since the poll source will fire before
* this one does) and then quits the loop.
*
* If the poll source triggers before it should, then it will get a
* %G_IO_ERROR_WOULD_BLOCK, and if check() fails in either
* direction, we will catch it at some point.
*/
poll_source = g_pollable_input_stream_create_source (in, NULL);
g_source_set_priority (poll_source, 1);
g_source_set_callback (poll_source, (GSourceFunc) poll_source_callback, &success, NULL);
g_source_attach (poll_source, NULL);
g_source_unref (poll_source);
g_idle_add_full (2, check_source_readability_callback, GINT_TO_POINTER (FALSE), NULL);
g_idle_add_full (3, write_callback, NULL, NULL);
g_idle_add_full (4, check_source_and_quit_callback, GINT_TO_POINTER (FALSE), NULL);
loop = g_main_loop_new (NULL, FALSE);
g_main_loop_run (loop);
g_main_loop_unref (loop);
g_assert_cmpint (success, ==, TRUE);
}
#ifdef G_OS_UNIX
static void
test_pollable_unix (void)
{
int pipefds[2], status;
status = pipe (pipefds);
g_assert_cmpint (status, ==, 0);
in = G_POLLABLE_INPUT_STREAM (g_unix_input_stream_new (pipefds[0], TRUE));
out = g_unix_output_stream_new (pipefds[1], TRUE);
test_streams ();
g_object_unref (in);
g_object_unref (out);
}
#endif
static void
client_connected (GObject *source,
GAsyncResult *result,
gpointer user_data)
{
GSocketClient *client = G_SOCKET_CLIENT (source);
GSocketConnection **conn = user_data;
GError *error = NULL;
*conn = g_socket_client_connect_finish (client, result, &error);
g_assert_no_error (error);
}
static void
server_connected (GObject *source,
GAsyncResult *result,
gpointer user_data)
{
GSocketListener *listener = G_SOCKET_LISTENER (source);
GSocketConnection **conn = user_data;
GError *error = NULL;
*conn = g_socket_listener_accept_finish (listener, result, NULL, &error);
g_assert_no_error (error);
}
static void
test_pollable_socket (void)
{
GInetAddress *iaddr;
GSocketAddress *saddr, *effective_address;
GSocketListener *listener;
GSocketClient *client;
GError *error = NULL;
GSocketConnection *client_conn = NULL, *server_conn = NULL;
iaddr = g_inet_address_new_loopback (G_SOCKET_FAMILY_IPV4);
saddr = g_inet_socket_address_new (iaddr, 0);
g_object_unref (iaddr);
listener = g_socket_listener_new ();
g_socket_listener_add_address (listener, saddr,
G_SOCKET_TYPE_STREAM,
G_SOCKET_PROTOCOL_TCP,
NULL,
&effective_address,
&error);
g_assert_no_error (error);
g_object_unref (saddr);
client = g_socket_client_new ();
g_socket_client_connect_async (client,
G_SOCKET_CONNECTABLE (effective_address),
NULL, client_connected, &client_conn);
g_socket_listener_accept_async (listener, NULL,
server_connected, &server_conn);
while (!client_conn || !server_conn)
g_main_context_iteration (NULL, TRUE);
in = G_POLLABLE_INPUT_STREAM (g_io_stream_get_input_stream (G_IO_STREAM (client_conn)));
out = g_io_stream_get_output_stream (G_IO_STREAM (server_conn));
test_streams ();
g_object_unref (client_conn);
g_object_unref (server_conn);
g_object_unref (client);
g_object_unref (listener);
}
int
main (int argc,
char *argv[])
{
g_type_init ();
g_test_init (&argc, &argv, NULL);
#ifdef G_OS_UNIX
g_test_add_func ("/pollable/unix", test_pollable_unix);
#endif
g_test_add_func ("/pollable/socket", test_pollable_socket);
return g_test_run();
}