From 3baf1f1a73952277abd05ac82b0b356c6f5ab725 Mon Sep 17 00:00:00 2001 From: Yash Trivedi Date: Thu, 26 Sep 2024 10:27:19 +0000 Subject: [PATCH] gpoll windows: use a threadpool when polling large number of fds --- glib/gpoll.c | 103 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 71 insertions(+), 32 deletions(-) diff --git a/glib/gpoll.c b/glib/gpoll.c index 5afc7596d..4ec0bfdd3 100644 --- a/glib/gpoll.c +++ b/glib/gpoll.c @@ -259,6 +259,7 @@ typedef struct GPollFD *handle_to_fd[MAXIMUM_WAIT_OBJECTS]; GPollFD *msg_fd; GPollFD *stop_fd; + gint retval; gint nhandles; DWORD timeout_ms; } GWin32PollThreadData; @@ -290,8 +291,30 @@ poll_single_thread (GWin32PollThreadData *data) */ retval = poll_rest (data->msg_fd, data->stop_fd, data->handles, data->handle_to_fd, data->nhandles, data->timeout_ms); } + data->retval = retval; - return retval; + return data->retval; +} + +static VOID CALLBACK +poll_single_worker_wrapper (PTP_CALLBACK_INSTANCE instance, + PVOID context, + PTP_WORK work) +{ + UNREFERENCED_PARAMETER (instance); + UNREFERENCED_PARAMETER (work); + + GWin32PollThreadData *data = context; + + poll_single_thread (data); + + /* Signal the stop in case any of the workers did not stop yet */ + if (!SetEvent ((HANDLE) data->stop_fd->fd)) + { + gchar *emsg = g_win32_error_message (GetLastError ()); + g_error ("gpoll: failed to signal the stop event: %s", emsg); + g_free (emsg); + } } static void @@ -344,19 +367,15 @@ fill_poll_thread_data (GPollFD *fds, } } -static guint __stdcall -poll_thread_run (gpointer user_data) +static void +cleanup_workers (guint nworkers, + PTP_WORK *work_handles) { - GWin32PollThreadData *data = user_data; - - /* Docs say that it is safer to call _endthreadex by our own: - * https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/endthread-endthreadex - */ - _endthreadex (poll_single_thread (data)); - - g_assert_not_reached (); - - return 0; + for (guint i = 0; i < nworkers; i++) + { + if (work_handles[i] != NULL) + CloseThreadpoolWork (work_handles[i]); + } } /* One slot for a possible msg object or the stop event */ @@ -368,7 +387,7 @@ g_poll (GPollFD *fds, gint timeout) { guint nthreads, threads_remain; - HANDLE thread_handles[MAXIMUM_WAIT_OBJECTS]; + HANDLE worker_completed_handles[1] = { NULL, }; GWin32PollThreadData *threads_data; GPollFD stop_event = { 0, }; GPollFD *f; @@ -377,6 +396,7 @@ g_poll (GPollFD *fds, DWORD thread_retval; int retval; GPollFD *msg_fd = NULL; + PTP_WORK work_handles[MAXIMUM_WAIT_OBJECTS] = { NULL, }; if (timeout == -1) timeout = INFINITE; @@ -422,8 +442,10 @@ g_poll (GPollFD *fds, stop_event.fd = (gint)CreateEventW (NULL, TRUE, FALSE, NULL); #endif stop_event.events = G_IO_IN; + worker_completed_handles[0] = (HANDLE) stop_event.fd; threads_data = g_new0 (GWin32PollThreadData, nthreads); + for (i = 0; i < nthreads; i++) { guint thread_fds; @@ -444,30 +466,46 @@ g_poll (GPollFD *fds, threads_data[i].msg_fd = NULL; } - thread_handles[i] = (HANDLE) _beginthreadex (NULL, 0, poll_thread_run, &threads_data[i], 0, &ignore); + work_handles[i] = CreateThreadpoolWork (poll_single_worker_wrapper, &threads_data[i], + NULL); + if (work_handles[i] == NULL) + { + gchar *emsg = g_win32_error_message (GetLastError ()); + g_error ("CreateThreadpoolWork failed: %s", emsg); + g_free (emsg); + retval = -1; + goto cleanup; + } + + SubmitThreadpoolWork (work_handles[i]); } - /* Wait for at least one thread to return */ + /* Wait for at least one worker to return */ if (msg_fd != NULL) - ready = MsgWaitForMultipleObjectsEx (nthreads, thread_handles, timeout, + ready = MsgWaitForMultipleObjectsEx (1, worker_completed_handles, timeout, QS_ALLINPUT, MWMO_ALERTABLE); else - ready = WaitForMultipleObjects (nthreads, thread_handles, FALSE, timeout); + ready = WaitForMultipleObjects (1, worker_completed_handles, FALSE, timeout); - /* Signal the stop in case any of the threads did not stop yet */ - if (!SetEvent ((HANDLE)stop_event.fd)) + /* Signal the stop in case any of the workers did not stop yet */ + if (!SetEvent ((HANDLE) stop_event.fd)) { gchar *emsg = g_win32_error_message (GetLastError ()); - g_warning ("gpoll: failed to signal the stop event: %s", emsg); + g_error ("gpoll: failed to signal the stop event: %s", emsg); g_free (emsg); + retval = -1; + goto cleanup; } - /* Wait for the rest of the threads to finish */ - WaitForMultipleObjects (nthreads, thread_handles, TRUE, INFINITE); + /* Wait for the all workers to finish individually, since we're not using a cleanup group. + We disable fCancelPendingCallbacks since we share the default process threadpool. + */ + for (i = 0; i < nthreads; i++) + WaitForThreadpoolWorkCallbacks (work_handles[i], FALSE); /* The return value of all the threads give us all the fds that changed state */ retval = 0; - if (msg_fd != NULL && ready == WAIT_OBJECT_0 + nthreads) + if (msg_fd != NULL && ready == WAIT_OBJECT_0 + 1) { msg_fd->revents |= G_IO_IN; retval = 1; @@ -475,18 +513,19 @@ g_poll (GPollFD *fds, for (i = 0; i < nthreads; i++) { - if (GetExitCodeThread (thread_handles[i], &thread_retval)) - retval = (retval == -1) ? -1 : ((thread_retval == (DWORD) -1) ? -1 : (int) (retval + thread_retval)); - - CloseHandle (thread_handles[i]); + thread_retval = threads_data[i].retval; + retval = (retval == -1) ? -1 : ((thread_retval == (DWORD) -1) ? -1 : (int) (retval + thread_retval)); } +cleanup: if (retval == -1) - for (f = fds; f < &fds[nfds]; ++f) - f->revents = 0; - + { + for (f = fds; f < &fds[nfds]; ++f) + f->revents = 0; + } + cleanup_workers (nthreads, work_handles); g_free (threads_data); - CloseHandle ((HANDLE)stop_event.fd); + CloseHandle ((HANDLE) stop_event.fd); return retval; }