Merge branch 'main' into 'main'

gpoll windows: use a threadpool when polling large number of fds

See merge request GNOME/glib!4300
This commit is contained in:
Philip Withnall 2024-09-26 10:27:20 +00:00
commit 2258c08769

View File

@ -259,6 +259,7 @@ typedef struct
GPollFD *handle_to_fd[MAXIMUM_WAIT_OBJECTS]; GPollFD *handle_to_fd[MAXIMUM_WAIT_OBJECTS];
GPollFD *msg_fd; GPollFD *msg_fd;
GPollFD *stop_fd; GPollFD *stop_fd;
gint retval;
gint nhandles; gint nhandles;
DWORD timeout_ms; DWORD timeout_ms;
} GWin32PollThreadData; } GWin32PollThreadData;
@ -290,8 +291,30 @@ poll_single_thread (GWin32PollThreadData *data)
*/ */
retval = poll_rest (data->msg_fd, data->stop_fd, data->handles, data->handle_to_fd, data->nhandles, data->timeout_ms); retval = poll_rest (data->msg_fd, data->stop_fd, data->handles, data->handle_to_fd, data->nhandles, data->timeout_ms);
} }
data->retval = retval;
return retval; return data->retval;
}
static VOID CALLBACK
poll_single_worker_wrapper (PTP_CALLBACK_INSTANCE instance,
PVOID context,
PTP_WORK work)
{
UNREFERENCED_PARAMETER (instance);
UNREFERENCED_PARAMETER (work);
GWin32PollThreadData *data = context;
poll_single_thread (data);
/* Signal the stop in case any of the workers did not stop yet */
if (!SetEvent ((HANDLE) data->stop_fd->fd))
{
gchar *emsg = g_win32_error_message (GetLastError ());
g_error ("gpoll: failed to signal the stop event: %s", emsg);
g_free (emsg);
}
} }
static void static void
@ -344,19 +367,15 @@ fill_poll_thread_data (GPollFD *fds,
} }
} }
static guint __stdcall static void
poll_thread_run (gpointer user_data) cleanup_workers (guint nworkers,
PTP_WORK *work_handles)
{ {
GWin32PollThreadData *data = user_data; for (guint i = 0; i < nworkers; i++)
{
/* Docs say that it is safer to call _endthreadex by our own: if (work_handles[i] != NULL)
* https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/endthread-endthreadex CloseThreadpoolWork (work_handles[i]);
*/ }
_endthreadex (poll_single_thread (data));
g_assert_not_reached ();
return 0;
} }
/* One slot for a possible msg object or the stop event */ /* One slot for a possible msg object or the stop event */
@ -368,7 +387,7 @@ g_poll (GPollFD *fds,
gint timeout) gint timeout)
{ {
guint nthreads, threads_remain; guint nthreads, threads_remain;
HANDLE thread_handles[MAXIMUM_WAIT_OBJECTS]; HANDLE worker_completed_handles[1] = { NULL, };
GWin32PollThreadData *threads_data; GWin32PollThreadData *threads_data;
GPollFD stop_event = { 0, }; GPollFD stop_event = { 0, };
GPollFD *f; GPollFD *f;
@ -377,6 +396,7 @@ g_poll (GPollFD *fds,
DWORD thread_retval; DWORD thread_retval;
int retval; int retval;
GPollFD *msg_fd = NULL; GPollFD *msg_fd = NULL;
PTP_WORK work_handles[MAXIMUM_WAIT_OBJECTS] = { NULL, };
if (timeout == -1) if (timeout == -1)
timeout = INFINITE; timeout = INFINITE;
@ -422,8 +442,10 @@ g_poll (GPollFD *fds,
stop_event.fd = (gint)CreateEventW (NULL, TRUE, FALSE, NULL); stop_event.fd = (gint)CreateEventW (NULL, TRUE, FALSE, NULL);
#endif #endif
stop_event.events = G_IO_IN; stop_event.events = G_IO_IN;
worker_completed_handles[0] = (HANDLE) stop_event.fd;
threads_data = g_new0 (GWin32PollThreadData, nthreads); threads_data = g_new0 (GWin32PollThreadData, nthreads);
for (i = 0; i < nthreads; i++) for (i = 0; i < nthreads; i++)
{ {
guint thread_fds; guint thread_fds;
@ -444,30 +466,46 @@ g_poll (GPollFD *fds,
threads_data[i].msg_fd = NULL; threads_data[i].msg_fd = NULL;
} }
thread_handles[i] = (HANDLE) _beginthreadex (NULL, 0, poll_thread_run, &threads_data[i], 0, &ignore); work_handles[i] = CreateThreadpoolWork (poll_single_worker_wrapper, &threads_data[i],
NULL);
if (work_handles[i] == NULL)
{
gchar *emsg = g_win32_error_message (GetLastError ());
g_error ("CreateThreadpoolWork failed: %s", emsg);
g_free (emsg);
retval = -1;
goto cleanup;
}
SubmitThreadpoolWork (work_handles[i]);
} }
/* Wait for at least one thread to return */ /* Wait for at least one worker to return */
if (msg_fd != NULL) if (msg_fd != NULL)
ready = MsgWaitForMultipleObjectsEx (nthreads, thread_handles, timeout, ready = MsgWaitForMultipleObjectsEx (1, worker_completed_handles, timeout,
QS_ALLINPUT, MWMO_ALERTABLE); QS_ALLINPUT, MWMO_ALERTABLE);
else else
ready = WaitForMultipleObjects (nthreads, thread_handles, FALSE, timeout); ready = WaitForMultipleObjects (1, worker_completed_handles, FALSE, timeout);
/* Signal the stop in case any of the threads did not stop yet */ /* Signal the stop in case any of the workers did not stop yet */
if (!SetEvent ((HANDLE)stop_event.fd)) if (!SetEvent ((HANDLE) stop_event.fd))
{ {
gchar *emsg = g_win32_error_message (GetLastError ()); gchar *emsg = g_win32_error_message (GetLastError ());
g_warning ("gpoll: failed to signal the stop event: %s", emsg); g_error ("gpoll: failed to signal the stop event: %s", emsg);
g_free (emsg); g_free (emsg);
retval = -1;
goto cleanup;
} }
/* Wait for the rest of the threads to finish */ /* Wait for the all workers to finish individually, since we're not using a cleanup group.
WaitForMultipleObjects (nthreads, thread_handles, TRUE, INFINITE); We disable fCancelPendingCallbacks since we share the default process threadpool.
*/
for (i = 0; i < nthreads; i++)
WaitForThreadpoolWorkCallbacks (work_handles[i], FALSE);
/* The return value of all the threads give us all the fds that changed state */ /* The return value of all the threads give us all the fds that changed state */
retval = 0; retval = 0;
if (msg_fd != NULL && ready == WAIT_OBJECT_0 + nthreads) if (msg_fd != NULL && ready == WAIT_OBJECT_0 + 1)
{ {
msg_fd->revents |= G_IO_IN; msg_fd->revents |= G_IO_IN;
retval = 1; retval = 1;
@ -475,18 +513,19 @@ g_poll (GPollFD *fds,
for (i = 0; i < nthreads; i++) for (i = 0; i < nthreads; i++)
{ {
if (GetExitCodeThread (thread_handles[i], &thread_retval)) thread_retval = threads_data[i].retval;
retval = (retval == -1) ? -1 : ((thread_retval == (DWORD) -1) ? -1 : (int) (retval + thread_retval)); retval = (retval == -1) ? -1 : ((thread_retval == (DWORD) -1) ? -1 : (int) (retval + thread_retval));
CloseHandle (thread_handles[i]);
} }
cleanup:
if (retval == -1) if (retval == -1)
for (f = fds; f < &fds[nfds]; ++f) {
f->revents = 0; for (f = fds; f < &fds[nfds]; ++f)
f->revents = 0;
}
cleanup_workers (nthreads, work_handles);
g_free (threads_data); g_free (threads_data);
CloseHandle ((HANDLE)stop_event.fd); CloseHandle ((HANDLE) stop_event.fd);
return retval; return retval;
} }