migration/multifd: Separate SYNC request with normal jobs
Multifd provide a threaded model for processing jobs.  On sender side,
there can be two kinds of job: (1) a list of pages to send, or (2) a sync
request.
The sync request is a very special kind of job.  It never contains a page
array, but only a multifd packet telling the dest side to synchronize with
sent pages.
Before this patch, both requests use the pending_job field, no matter what
the request is, it will boost pending_job, while multifd sender thread will
decrement it after it finishes one job.
However this should be racy, because SYNC is special in that it needs to
set p->flags with MULTIFD_FLAG_SYNC, showing that this is a sync request.
Consider a sequence of operations where:
  - migration thread enqueue a job to send some pages, pending_job++ (0->1)
  - [...before the selected multifd sender thread wakes up...]
  - migration thread enqueue another job to sync, pending_job++ (1->2),
    setup p->flags=MULTIFD_FLAG_SYNC
  - multifd sender thread wakes up, found pending_job==2
    - send the 1st packet with MULTIFD_FLAG_SYNC and list of pages
    - send the 2nd packet with flags==0 and no pages
This is not expected, because MULTIFD_FLAG_SYNC should hopefully be done
after all the pages are received.  Meanwhile, the 2nd packet will be
completely useless, which contains zero information.
I didn't verify above, but I think this issue is still benign in that at
least on the recv side we always receive pages before handling
MULTIFD_FLAG_SYNC.  However that's not always guaranteed and just tricky.
One other reason I want to separate it is using p->flags to communicate
between the two threads is also not clearly defined, it's very hard to read
and understand why accessing p->flags is always safe; see the current impl
of multifd_send_thread() where we tried to cache only p->flags.  It doesn't
need to be that complicated.
This patch introduces pending_sync, a separate flag just to show that the
requester needs a sync.  Alongside, we remove the tricky caching of
p->flags now because after this patch p->flags should only be used by
multifd sender thread now, which will be crystal clear.  So it is always
thread safe to access p->flags.
With that, we can also safely convert the pending_job into a boolean,
because we don't support >1 pending jobs anyway.
Always use atomic ops to access both flags to make sure no cache effect.
When at it, drop the initial setting of "pending_job = 0" because it's
always allocated using g_new0().
Reviewed-by: Fabiano Rosas <farosas@suse.de>
Link: https://lore.kernel.org/r/20240202102857.110210-7-peterx@redhat.com
Signed-off-by: Peter Xu <peterx@redhat.com>
			
			
This commit is contained in:
		| @@ -442,8 +442,8 @@ static int multifd_send_pages(void) | |||||||
|         } |         } | ||||||
|         p = &multifd_send_state->params[i]; |         p = &multifd_send_state->params[i]; | ||||||
|         qemu_mutex_lock(&p->mutex); |         qemu_mutex_lock(&p->mutex); | ||||||
|         if (!p->pending_job) { |         if (qatomic_read(&p->pending_job) == false) { | ||||||
|             p->pending_job++; |             qatomic_set(&p->pending_job, true); | ||||||
|             next_channel = (i + 1) % migrate_multifd_channels(); |             next_channel = (i + 1) % migrate_multifd_channels(); | ||||||
|             break; |             break; | ||||||
|         } |         } | ||||||
| @@ -631,8 +631,12 @@ int multifd_send_sync_main(void) | |||||||
|  |  | ||||||
|         qemu_mutex_lock(&p->mutex); |         qemu_mutex_lock(&p->mutex); | ||||||
|         p->packet_num = multifd_send_state->packet_num++; |         p->packet_num = multifd_send_state->packet_num++; | ||||||
|         p->flags |= MULTIFD_FLAG_SYNC; |         /* | ||||||
|         p->pending_job++; |          * We should be the only user so far, so not possible to be set by | ||||||
|  |          * others concurrently. | ||||||
|  |          */ | ||||||
|  |         assert(qatomic_read(&p->pending_sync) == false); | ||||||
|  |         qatomic_set(&p->pending_sync, true); | ||||||
|         qemu_mutex_unlock(&p->mutex); |         qemu_mutex_unlock(&p->mutex); | ||||||
|         qemu_sem_post(&p->sem); |         qemu_sem_post(&p->sem); | ||||||
|     } |     } | ||||||
| @@ -685,10 +689,9 @@ static void *multifd_send_thread(void *opaque) | |||||||
|         } |         } | ||||||
|         qemu_mutex_lock(&p->mutex); |         qemu_mutex_lock(&p->mutex); | ||||||
|  |  | ||||||
|         if (p->pending_job) { |         if (qatomic_read(&p->pending_job)) { | ||||||
|             uint64_t packet_num = p->packet_num; |             uint64_t packet_num = p->packet_num; | ||||||
|             MultiFDPages_t *pages = p->pages; |             MultiFDPages_t *pages = p->pages; | ||||||
|             uint32_t flags; |  | ||||||
|  |  | ||||||
|             if (use_zero_copy_send) { |             if (use_zero_copy_send) { | ||||||
|                 p->iovs_num = 0; |                 p->iovs_num = 0; | ||||||
| @@ -704,13 +707,11 @@ static void *multifd_send_thread(void *opaque) | |||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|             multifd_send_fill_packet(p); |             multifd_send_fill_packet(p); | ||||||
|             flags = p->flags; |  | ||||||
|             p->flags = 0; |  | ||||||
|             p->num_packets++; |             p->num_packets++; | ||||||
|             p->total_normal_pages += pages->num; |             p->total_normal_pages += pages->num; | ||||||
|             qemu_mutex_unlock(&p->mutex); |             qemu_mutex_unlock(&p->mutex); | ||||||
|  |  | ||||||
|             trace_multifd_send(p->id, packet_num, pages->num, flags, |             trace_multifd_send(p->id, packet_num, pages->num, p->flags, | ||||||
|                                p->next_packet_size); |                                p->next_packet_size); | ||||||
|  |  | ||||||
|             if (use_zero_copy_send) { |             if (use_zero_copy_send) { | ||||||
| @@ -738,12 +739,23 @@ static void *multifd_send_thread(void *opaque) | |||||||
|             multifd_pages_reset(p->pages); |             multifd_pages_reset(p->pages); | ||||||
|             p->next_packet_size = 0; |             p->next_packet_size = 0; | ||||||
|             qemu_mutex_lock(&p->mutex); |             qemu_mutex_lock(&p->mutex); | ||||||
|             p->pending_job--; |             qatomic_set(&p->pending_job, false); | ||||||
|             qemu_mutex_unlock(&p->mutex); |             qemu_mutex_unlock(&p->mutex); | ||||||
|  |         } else if (qatomic_read(&p->pending_sync)) { | ||||||
|             if (flags & MULTIFD_FLAG_SYNC) { |             p->flags = MULTIFD_FLAG_SYNC; | ||||||
|                 qemu_sem_post(&p->sem_sync); |             multifd_send_fill_packet(p); | ||||||
|  |             ret = qio_channel_write_all(p->c, (void *)p->packet, | ||||||
|  |                                         p->packet_len, &local_err); | ||||||
|  |             if (ret != 0) { | ||||||
|  |                 qemu_mutex_unlock(&p->mutex); | ||||||
|  |                 break; | ||||||
|             } |             } | ||||||
|  |             /* p->next_packet_size will always be zero for a SYNC packet */ | ||||||
|  |             stat64_add(&mig_stats.multifd_bytes, p->packet_len); | ||||||
|  |             p->flags = 0; | ||||||
|  |             qatomic_set(&p->pending_sync, false); | ||||||
|  |             qemu_mutex_unlock(&p->mutex); | ||||||
|  |             qemu_sem_post(&p->sem_sync); | ||||||
|         } else { |         } else { | ||||||
|             qemu_mutex_unlock(&p->mutex); |             qemu_mutex_unlock(&p->mutex); | ||||||
|             /* sometimes there are spurious wakeups */ |             /* sometimes there are spurious wakeups */ | ||||||
| @@ -907,7 +919,6 @@ int multifd_save_setup(Error **errp) | |||||||
|         qemu_mutex_init(&p->mutex); |         qemu_mutex_init(&p->mutex); | ||||||
|         qemu_sem_init(&p->sem, 0); |         qemu_sem_init(&p->sem, 0); | ||||||
|         qemu_sem_init(&p->sem_sync, 0); |         qemu_sem_init(&p->sem_sync, 0); | ||||||
|         p->pending_job = 0; |  | ||||||
|         p->id = i; |         p->id = i; | ||||||
|         p->pages = multifd_pages_init(page_count); |         p->pages = multifd_pages_init(page_count); | ||||||
|         p->packet_len = sizeof(MultiFDPacket_t) |         p->packet_len = sizeof(MultiFDPacket_t) | ||||||
|   | |||||||
| @@ -99,8 +99,17 @@ typedef struct { | |||||||
|     uint32_t flags; |     uint32_t flags; | ||||||
|     /* global number of generated multifd packets */ |     /* global number of generated multifd packets */ | ||||||
|     uint64_t packet_num; |     uint64_t packet_num; | ||||||
|     /* thread has work to do */ |     /* | ||||||
|     int pending_job; |      * The sender thread has work to do if either of below boolean is set. | ||||||
|  |      * | ||||||
|  |      * @pending_job:  a job is pending | ||||||
|  |      * @pending_sync: a sync request is pending | ||||||
|  |      * | ||||||
|  |      * For both of these fields, they're only set by the requesters, and | ||||||
|  |      * cleared by the multifd sender threads. | ||||||
|  |      */ | ||||||
|  |     bool pending_job; | ||||||
|  |     bool pending_sync; | ||||||
|     /* array of pages to sent. |     /* array of pages to sent. | ||||||
|      * The owner of 'pages' depends of 'pending_job' value: |      * The owner of 'pages' depends of 'pending_job' value: | ||||||
|      * pending_job == 0 -> migration_thread can use it. |      * pending_job == 0 -> migration_thread can use it. | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user