Implement watches for GIOChannels for write file descriptors on Win32

2006-03-02  Marcus Brinkmann  <mb@g10code.de>

	Implement watches for GIOChannels for write file descriptors on
	Win32 (#333098).

	* glib/giowin32.c (GIOWin32Channel): Add a new direction field.
	(read_thread): Initialize direction.
	(write_thread): New function.
	(buffer_write): New function.
	(g_io_win32_prepare): Handle the G_IO_WIN32_FILE_DESC case for the
	write direction.
	(g_io_win32_fd_write): Call buffer_write() if there is a writer
	thread.
	(g_io_win32_fd_close): Set space_avail_event for writer threads.
	(g_io_win32_fd_create_watch): Create the writer thread if
	condition is G_IO_OUT.
	(g_io_channel_win32_make_pollfd): Likewise here.
This commit is contained in:
Marcus Brinkmann 2006-03-11 21:03:00 +00:00 committed by Tor Lillqvist
parent 4c27a10ad3
commit 6f0ef1bae2
4 changed files with 283 additions and 9 deletions

View File

@ -1,3 +1,21 @@
2006-03-02 Marcus Brinkmann <mb@g10code.de>
Implement watches for GIOChannels for write file descriptors on
Win32 (#333098).
* glib/giowin32.c (GIOWin32Channel): Add a new direction field.
(read_thread): Initialize direction.
(write_thread): New function.
(buffer_write): New function.
(g_io_win32_prepare): Handle the G_IO_WIN32_FILE_DESC case for the
write direction.
(g_io_win32_fd_write): Call buffer_write() if there is a writer
thread.
(g_io_win32_fd_close): Set space_avail_event for writer threads.
(g_io_win32_fd_create_watch): Create the writer thread if
condition is G_IO_OUT.
(g_io_channel_win32_make_pollfd): Likewise here.
2006-03-09 Matthias Clasen <mclasen@redhat.com>
* Makefile.am: Add ChangeLog.pre-2.8 to EXTRA_DIST.

View File

@ -1,3 +1,21 @@
2006-03-02 Marcus Brinkmann <mb@g10code.de>
Implement watches for GIOChannels for write file descriptors on
Win32 (#333098).
* glib/giowin32.c (GIOWin32Channel): Add a new direction field.
(read_thread): Initialize direction.
(write_thread): New function.
(buffer_write): New function.
(g_io_win32_prepare): Handle the G_IO_WIN32_FILE_DESC case for the
write direction.
(g_io_win32_fd_write): Call buffer_write() if there is a writer
thread.
(g_io_win32_fd_close): Set space_avail_event for writer threads.
(g_io_win32_fd_create_watch): Create the writer thread if
condition is G_IO_OUT.
(g_io_channel_win32_make_pollfd): Likewise here.
2006-03-09 Matthias Clasen <mclasen@redhat.com>
* Makefile.am: Add ChangeLog.pre-2.8 to EXTRA_DIST.

View File

@ -1,3 +1,21 @@
2006-03-02 Marcus Brinkmann <mb@g10code.de>
Implement watches for GIOChannels for write file descriptors on
Win32 (#333098).
* glib/giowin32.c (GIOWin32Channel): Add a new direction field.
(read_thread): Initialize direction.
(write_thread): New function.
(buffer_write): New function.
(g_io_win32_prepare): Handle the G_IO_WIN32_FILE_DESC case for the
write direction.
(g_io_win32_fd_write): Call buffer_write() if there is a writer
thread.
(g_io_win32_fd_close): Set space_avail_event for writer threads.
(g_io_win32_fd_create_watch): Create the writer thread if
condition is G_IO_OUT.
(g_io_channel_win32_make_pollfd): Likewise here.
2006-03-09 Matthias Clasen <mclasen@redhat.com>
* Makefile.am: Add ChangeLog.pre-2.8 to EXTRA_DIST.

View File

@ -77,6 +77,10 @@ struct _GIOWin32Channel {
/* Following fields are used by fd channels. */
CRITICAL_SECTION mutex;
int direction; /* 0 means we read from it,
* 1 means we write to it.
*/
gboolean running; /* Is reader thread running. FALSE if
* EOF has been reached.
*/
@ -392,6 +396,7 @@ read_thread (void *parameter)
(guint) channel->data_avail_event,
(guint) channel->space_avail_event);
channel->direction = 0;
channel->buffer = g_malloc (BUFFER_SIZE);
channel->rdp = channel->wrp = 0;
channel->running = TRUE;
@ -486,6 +491,117 @@ read_thread (void *parameter)
return 0;
}
static unsigned __stdcall
write_thread (void *parameter)
{
GIOWin32Channel *channel = parameter;
guchar *buffer;
guint nbytes;
g_io_channel_ref ((GIOChannel *)channel);
if (channel->debug)
g_print ("write_thread %#x: start fd=%d, data_avail=%#x space_avail=%#x\n",
channel->thread_id,
channel->fd,
(guint) channel->data_avail_event,
(guint) channel->space_avail_event);
channel->direction = 1;
channel->buffer = g_malloc (BUFFER_SIZE);
channel->rdp = channel->wrp = 0;
channel->running = TRUE;
SetEvent (channel->space_avail_event);
/* We use the same event objects as for a reader thread, but with
* reversed meaning. So, space_avail is used if data is available
* for writing, and data_avail is used if space is available in the
* write buffer.
*/
LOCK (channel->mutex);
while (channel->running || channel->rdp != channel->wrp)
{
if (channel->debug)
g_print ("write_thread %#x: rdp=%d, wrp=%d\n",
channel->thread_id, channel->rdp, channel->wrp);
if (channel->wrp == channel->rdp)
{
/* Buffer is empty. */
if (channel->debug)
g_print ("write_thread %#x: resetting space_avail\n",
channel->thread_id);
ResetEvent (channel->space_avail_event);
if (channel->debug)
g_print ("write_thread %#x: waiting for data\n",
channel->thread_id);
channel->revents = G_IO_OUT;
SetEvent (channel->data_avail_event);
UNLOCK (channel->mutex);
WaitForSingleObject (channel->space_avail_event, INFINITE);
LOCK (channel->mutex);
if (channel->rdp == channel->wrp)
break;
if (channel->debug)
g_print ("write_thread %#x: rdp=%d, wrp=%d\n",
channel->thread_id, channel->rdp, channel->wrp);
}
buffer = channel->buffer + channel->rdp;
if (channel->rdp < channel->wrp)
nbytes = channel->wrp - channel->rdp;
else
nbytes = BUFFER_SIZE - channel->rdp;
if (channel->debug)
g_print ("write_thread %#x: calling write() for %d bytes\n",
channel->thread_id, nbytes);
UNLOCK (channel->mutex);
nbytes = write (channel->fd, buffer, nbytes);
LOCK (channel->mutex);
if (channel->debug)
g_print ("write_thread %#x: write(%i) returned %d, rdp=%d, wrp=%d\n",
channel->thread_id, channel->fd, nbytes, channel->rdp, channel->wrp);
channel->revents = 0;
if (nbytes > 0)
channel->revents |= G_IO_OUT;
else if (nbytes <= 0)
channel->revents |= G_IO_ERR;
channel->rdp = (channel->rdp + nbytes) % BUFFER_SIZE;
if (nbytes <= 0)
break;
if (channel->debug)
g_print ("write_thread: setting data_avail for thread %#x\n",
channel->thread_id);
SetEvent (channel->data_avail_event);
}
channel->running = FALSE;
if (channel->needs_close)
{
if (channel->debug)
g_print ("write_thread %#x: channel fd %d needs closing\n",
channel->thread_id, channel->fd);
close (channel->fd);
channel->fd = -1;
}
UNLOCK (channel->mutex);
g_io_channel_unref ((GIOChannel *)channel);
return 0;
}
static void
create_thread (GIOWin32Channel *channel,
GIOCondition condition,
@ -575,6 +691,78 @@ buffer_read (GIOWin32Channel *channel,
return (*bytes_read > 0) ? G_IO_STATUS_NORMAL : G_IO_STATUS_EOF;
}
static GIOStatus
buffer_write (GIOWin32Channel *channel,
const guchar *dest,
gsize count,
gsize *bytes_written,
GError **err)
{
guint nbytes;
guint left = count;
LOCK (channel->mutex);
if (channel->debug)
g_print ("buffer_write: writing to thread %#x %d bytes, rdp=%d, wrp=%d\n",
channel->thread_id, count, channel->rdp, channel->wrp);
if ((channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
{
/* Buffer is full */
if (channel->debug)
g_print ("buffer_write: tid %#x: resetting data_avail\n",
channel->thread_id);
ResetEvent (channel->data_avail_event);
if (channel->debug)
g_print ("buffer_write: tid %#x: waiting for space\n",
channel->thread_id);
UNLOCK (channel->mutex);
WaitForSingleObject (channel->data_avail_event, INFINITE);
LOCK (channel->mutex);
if (channel->debug)
g_print ("buffer_write: tid %#x: rdp=%d, wrp=%d\n",
channel->thread_id, channel->rdp, channel->wrp);
}
nbytes = MIN ((channel->rdp + BUFFER_SIZE - channel->wrp - 1) % BUFFER_SIZE,
BUFFER_SIZE - channel->wrp);
UNLOCK (channel->mutex);
nbytes = MIN (left, nbytes);
if (channel->debug)
g_print ("buffer_write: tid %#x: writing %d bytes\n",
channel->thread_id, nbytes);
memcpy (channel->buffer + channel->wrp, dest, nbytes);
dest += nbytes;
left -= nbytes;
LOCK (channel->mutex);
channel->wrp = (channel->wrp + nbytes) % BUFFER_SIZE;
if (channel->debug)
g_print ("buffer_write: tid %#x: rdp=%d, wrp=%d, setting space_avail\n",
channel->thread_id, channel->rdp, channel->wrp);
SetEvent (channel->space_avail_event);
if ((channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
{
/* Buffer is full */
if (channel->debug)
g_print ("buffer_write: tid %#x: resetting data_avail\n",
channel->thread_id);
ResetEvent (channel->data_avail_event);
}
UNLOCK (channel->mutex);
/* We have no way to indicate any errors form the actual
* write() call in the writer thread. Should we have?
*/
*bytes_written = count - left;
return (*bytes_written > 0) ? G_IO_STATUS_NORMAL : G_IO_STATUS_EOF;
}
static gboolean
g_io_win32_prepare (GSource *source,
gint *timeout)
@ -601,12 +789,26 @@ g_io_win32_prepare (GSource *source,
condition_to_string (channel->revents));
LOCK (channel->mutex);
if (channel->running && channel->wrp == channel->rdp)
if (channel->running)
{
if (channel->debug)
g_print ("g_io_win32_prepare: for thread %#x, setting channel->revents = 0\n",
channel->thread_id);
channel->revents = 0;
if (channel->direction == 0 && channel->wrp == channel->rdp)
{
if (channel->debug)
g_print ("g_io_win32_prepare: for thread %#x, setting channel->revents = 0\n",
channel->thread_id);
channel->revents = 0;
}
}
else
{
if (channel->direction == 1
&& (channel->wrp + 1) % BUFFER_SIZE == channel->rdp)
{
if (channel->debug)
g_print ("g_io_win32_prepare: for thread %#x, setting channel->revents = %i\n",
channel->thread_id, 0);
channel->revents = 0;
}
}
UNLOCK (channel->mutex);
break;
@ -965,6 +1167,11 @@ g_io_win32_fd_write (GIOChannel *channel,
GIOWin32Channel *win32_channel = (GIOWin32Channel *)channel;
gint result;
if (win32_channel->thread_id)
{
return buffer_write (win32_channel, buf, count, bytes_written, err);
}
result = write (win32_channel->fd, buf, count);
if (win32_channel->debug)
g_print ("g_io_win32_fd_write: fd=%d count=%d => %d\n",
@ -1061,7 +1268,10 @@ g_io_win32_fd_close (GIOChannel *channel,
win32_channel->thread_id, win32_channel->fd);
win32_channel->running = FALSE;
win32_channel->needs_close = TRUE;
SetEvent (win32_channel->data_avail_event);
if (win32_channel->direction == 0)
SetEvent (win32_channel->data_avail_event);
else
SetEvent (win32_channel->space_avail_event);
}
else
{
@ -1105,7 +1315,12 @@ g_io_win32_fd_create_watch (GIOChannel *channel,
LOCK (win32_channel->mutex);
if (win32_channel->thread_id == 0)
create_thread (win32_channel, condition, read_thread);
{
if (condition & G_IO_IN)
create_thread (win32_channel, condition, read_thread);
else if (condition & G_IO_OUT)
create_thread (win32_channel, condition, write_thread);
}
g_source_add_poll (source, &watch->pollfd);
UNLOCK (win32_channel->mutex);
@ -1720,7 +1935,12 @@ g_io_channel_win32_make_pollfd (GIOChannel *channel,
fd->fd = (gint) win32_channel->data_avail_event;
if (win32_channel->thread_id == 0 && (condition & G_IO_IN))
create_thread (win32_channel, condition, read_thread);
{
if (condition & G_IO_IN)
create_thread (win32_channel, condition, read_thread);
else if (condition & G_IO_OUT)
create_thread (win32_channel, condition, write_thread);
}
break;
case G_IO_WIN32_SOCKET: