thread-pool: replace semaphore with condition variable
Since commit f9fc8932b1 ("thread-posix: remove the posix semaphore
support", 2022-04-06) QemuSemaphore has its own mutex and condition
variable; this adds unnecessary overhead on I/O with small block sizes.
Check the QTAILQ directly instead of adding the indirection of a
semaphore's count.  Using a semaphore has not been necessary since
qemu_cond_timedwait was introduced; the new code has to be careful about
spurious wakeups but it is simpler, for example thread_pool_cancel does
not have to worry about synchronizing the semaphore count with the number
of elements of pool->request_list.
Note that the return value of qemu_cond_timedwait (0 for timeout, 1 for
signal or spurious wakeup) is different from that of qemu_sem_timedwait
(-1 for timeout, 0 for success).
Reported-by: Lukáš Doktor <ldoktor@redhat.com>
Suggested-by: Stefan Hajnoczi <stefanha@redhat.com>
Reviewed-by: Nicolas Saenz Julienne <nsaenzju@redhat.com>
Message-Id: <20220514065012.1149539-3-pbonzini@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
			
			
This commit is contained in:
		| @@ -57,7 +57,7 @@ struct ThreadPool { | |||||||
|     QEMUBH *completion_bh; |     QEMUBH *completion_bh; | ||||||
|     QemuMutex lock; |     QemuMutex lock; | ||||||
|     QemuCond worker_stopped; |     QemuCond worker_stopped; | ||||||
|     QemuSemaphore sem; |     QemuCond request_cond; | ||||||
|     QEMUBH *new_thread_bh; |     QEMUBH *new_thread_bh; | ||||||
|  |  | ||||||
|     /* The following variables are only accessed from one AioContext. */ |     /* The following variables are only accessed from one AioContext. */ | ||||||
| @@ -74,23 +74,6 @@ struct ThreadPool { | |||||||
|     int max_threads; |     int max_threads; | ||||||
| }; | }; | ||||||
|  |  | ||||||
| static inline bool back_to_sleep(ThreadPool *pool, int ret) |  | ||||||
| { |  | ||||||
|     /* |  | ||||||
|      * The semaphore timed out, we should exit the loop except when: |  | ||||||
|      *  - There is work to do, we raced with the signal. |  | ||||||
|      *  - The max threads threshold just changed, we raced with the signal. |  | ||||||
|      *  - The thread pool forces a minimum number of readily available threads. |  | ||||||
|      */ |  | ||||||
|     if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) || |  | ||||||
|             pool->cur_threads > pool->max_threads || |  | ||||||
|             pool->cur_threads <= pool->min_threads)) { |  | ||||||
|             return true; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     return false; |  | ||||||
| } |  | ||||||
|  |  | ||||||
| static void *worker_thread(void *opaque) | static void *worker_thread(void *opaque) | ||||||
| { | { | ||||||
|     ThreadPool *pool = opaque; |     ThreadPool *pool = opaque; | ||||||
| @@ -99,20 +82,25 @@ static void *worker_thread(void *opaque) | |||||||
|     pool->pending_threads--; |     pool->pending_threads--; | ||||||
|     do_spawn_thread(pool); |     do_spawn_thread(pool); | ||||||
|  |  | ||||||
|     while (!pool->stopping) { |     while (!pool->stopping && pool->cur_threads <= pool->max_threads) { | ||||||
|         ThreadPoolElement *req; |         ThreadPoolElement *req; | ||||||
|         int ret; |         int ret; | ||||||
|  |  | ||||||
|         do { |         if (QTAILQ_EMPTY(&pool->request_list)) { | ||||||
|             pool->idle_threads++; |             pool->idle_threads++; | ||||||
|             qemu_mutex_unlock(&pool->lock); |             ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000); | ||||||
|             ret = qemu_sem_timedwait(&pool->sem, 10000); |  | ||||||
|             qemu_mutex_lock(&pool->lock); |  | ||||||
|             pool->idle_threads--; |             pool->idle_threads--; | ||||||
|         } while (back_to_sleep(pool, ret)); |             if (ret == 0 && | ||||||
|         if (ret == -1 || pool->stopping || |                 QTAILQ_EMPTY(&pool->request_list) && | ||||||
|             pool->cur_threads > pool->max_threads) { |                 pool->cur_threads > pool->min_threads) { | ||||||
|             break; |                 /* Timed out + no work to do + no need for warm threads = exit.  */ | ||||||
|  |                 break; | ||||||
|  |             } | ||||||
|  |             /* | ||||||
|  |              * Even if there was some work to do, check if there aren't | ||||||
|  |              * too many worker threads before picking it up. | ||||||
|  |              */ | ||||||
|  |             continue; | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         req = QTAILQ_FIRST(&pool->request_list); |         req = QTAILQ_FIRST(&pool->request_list); | ||||||
| @@ -134,6 +122,12 @@ static void *worker_thread(void *opaque) | |||||||
|     pool->cur_threads--; |     pool->cur_threads--; | ||||||
|     qemu_cond_signal(&pool->worker_stopped); |     qemu_cond_signal(&pool->worker_stopped); | ||||||
|     qemu_mutex_unlock(&pool->lock); |     qemu_mutex_unlock(&pool->lock); | ||||||
|  |  | ||||||
|  |     /* | ||||||
|  |      * Wake up another thread, in case we got a wakeup but decided | ||||||
|  |      * to exit due to pool->cur_threads > pool->max_threads. | ||||||
|  |      */ | ||||||
|  |     qemu_cond_signal(&pool->request_cond); | ||||||
|     return NULL; |     return NULL; | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -229,13 +223,7 @@ static void thread_pool_cancel(BlockAIOCB *acb) | |||||||
|     trace_thread_pool_cancel(elem, elem->common.opaque); |     trace_thread_pool_cancel(elem, elem->common.opaque); | ||||||
|  |  | ||||||
|     QEMU_LOCK_GUARD(&pool->lock); |     QEMU_LOCK_GUARD(&pool->lock); | ||||||
|     if (elem->state == THREAD_QUEUED && |     if (elem->state == THREAD_QUEUED) { | ||||||
|         /* No thread has yet started working on elem. we can try to "steal" |  | ||||||
|          * the item from the worker if we can get a signal from the |  | ||||||
|          * semaphore.  Because this is non-blocking, we can do it with |  | ||||||
|          * the lock taken and ensure that elem will remain THREAD_QUEUED. |  | ||||||
|          */ |  | ||||||
|         qemu_sem_timedwait(&pool->sem, 0) == 0) { |  | ||||||
|         QTAILQ_REMOVE(&pool->request_list, elem, reqs); |         QTAILQ_REMOVE(&pool->request_list, elem, reqs); | ||||||
|         qemu_bh_schedule(pool->completion_bh); |         qemu_bh_schedule(pool->completion_bh); | ||||||
|  |  | ||||||
| @@ -280,7 +268,7 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, | |||||||
|     } |     } | ||||||
|     QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); |     QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); | ||||||
|     qemu_mutex_unlock(&pool->lock); |     qemu_mutex_unlock(&pool->lock); | ||||||
|     qemu_sem_post(&pool->sem); |     qemu_cond_signal(&pool->request_cond); | ||||||
|     return &req->common; |     return &req->common; | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -323,7 +311,7 @@ void thread_pool_update_params(ThreadPool *pool, AioContext *ctx) | |||||||
|      * We either have to: |      * We either have to: | ||||||
|      *  - Increase the number available of threads until over the min_threads |      *  - Increase the number available of threads until over the min_threads | ||||||
|      *    threshold. |      *    threshold. | ||||||
|      *  - Decrease the number of available threads until under the max_threads |      *  - Bump the worker threads so that they exit, until under the max_threads | ||||||
|      *    threshold. |      *    threshold. | ||||||
|      *  - Do nothing. The current number of threads fall in between the min and |      *  - Do nothing. The current number of threads fall in between the min and | ||||||
|      *    max thresholds. We'll let the pool manage itself. |      *    max thresholds. We'll let the pool manage itself. | ||||||
| @@ -333,7 +321,7 @@ void thread_pool_update_params(ThreadPool *pool, AioContext *ctx) | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     for (int i = pool->cur_threads; i > pool->max_threads; i--) { |     for (int i = pool->cur_threads; i > pool->max_threads; i--) { | ||||||
|         qemu_sem_post(&pool->sem); |         qemu_cond_signal(&pool->request_cond); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     qemu_mutex_unlock(&pool->lock); |     qemu_mutex_unlock(&pool->lock); | ||||||
| @@ -350,7 +338,7 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) | |||||||
|     pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); |     pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); | ||||||
|     qemu_mutex_init(&pool->lock); |     qemu_mutex_init(&pool->lock); | ||||||
|     qemu_cond_init(&pool->worker_stopped); |     qemu_cond_init(&pool->worker_stopped); | ||||||
|     qemu_sem_init(&pool->sem, 0); |     qemu_cond_init(&pool->request_cond); | ||||||
|     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); |     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); | ||||||
|  |  | ||||||
|     QLIST_INIT(&pool->head); |     QLIST_INIT(&pool->head); | ||||||
| @@ -383,15 +371,15 @@ void thread_pool_free(ThreadPool *pool) | |||||||
|  |  | ||||||
|     /* Wait for worker threads to terminate */ |     /* Wait for worker threads to terminate */ | ||||||
|     pool->stopping = true; |     pool->stopping = true; | ||||||
|  |     qemu_cond_broadcast(&pool->request_cond); | ||||||
|     while (pool->cur_threads > 0) { |     while (pool->cur_threads > 0) { | ||||||
|         qemu_sem_post(&pool->sem); |  | ||||||
|         qemu_cond_wait(&pool->worker_stopped, &pool->lock); |         qemu_cond_wait(&pool->worker_stopped, &pool->lock); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     qemu_mutex_unlock(&pool->lock); |     qemu_mutex_unlock(&pool->lock); | ||||||
|  |  | ||||||
|     qemu_bh_delete(pool->completion_bh); |     qemu_bh_delete(pool->completion_bh); | ||||||
|     qemu_sem_destroy(&pool->sem); |     qemu_cond_destroy(&pool->request_cond); | ||||||
|     qemu_cond_destroy(&pool->worker_stopped); |     qemu_cond_destroy(&pool->worker_stopped); | ||||||
|     qemu_mutex_destroy(&pool->lock); |     qemu_mutex_destroy(&pool->lock); | ||||||
|     g_free(pool); |     g_free(pool); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user