Compare commits

...

9 Commits

Author SHA1 Message Date
Fabiano Rosas
6424d5b3df migration/multifd: Extract sem_done waiting into a function
This helps document the intent of the loop via the function name and
we can reuse this in the future.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
2023-10-03 11:31:14 -03:00
Fabiano Rosas
5494d69c58 migration/multifd: Decouple control flow from the SYNC packet
We currently have the sem_sync semaphore that is used:

1) on the sending side, to know when the multifd_send_thread has
   finished sending the MULTIFD_FLAG_SYNC packet;

  This is unnecessary. Multifd sends packets (not pages) one by one
  and completion is already bound by both the channels_ready and sem
  semaphores. The SYNC packet has nothing special that would require
  it to have a separate semaphore on the sending side.

2) on the receiving side, to know when the multifd_recv_thread has
   finished receiving the MULTIFD_FLAG_SYNC packet;

  This is unnecessary because the multifd_recv_state->sem_sync
  semaphore already does the same thing. We care that the SYNC arrived
  from the source, knowing that the SYNC has been received by the recv
  thread doesn't add anything.

3) on both sending and receiving sides, to wait for the multifd threads
   to finish before cleaning up;

   This happens because multifd_send_sync_main() blocks
   ram_save_complete() from finishing until the semaphore is
   posted. This is surprising and not documented.

Clarify the above situation by renaming 'sem_sync' to 'sem_done' and
making the #3 usage the main one. Stop tracking the SYNC packet on
source (#1) and leave multifd_recv_state->sem_sync untouched on the
destination (#2).

Due to the 'channels_ready' and 'sem' semaphores, we always send
packets in lockstep with switching MultiFDSendParams, so
p->pending_job is always either 1 or 0. The thread has no knowledge of
whether it will have more to send once it posts to
channels_ready. Send it on an extra loop so it sees no pending_job and
releases the semaphore.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
2023-10-03 11:31:13 -03:00
Fabiano Rosas
374cb846eb migration/multifd: Centralize multifd_send thread release actions
When the multifd_send thread finishes or when an error is detected and
we want it to finish, there are some actions that need to be taken:

- The 'quit' variable should be set. It is used both as a signal for
  the thread to end and as an indicative that the thread has already
  ended.

- The channels_ready and sem_sync semaphores need to be released. The
  main thread might be waiting to send another packet or waiting for
  the confirmation that the SYNC packet has been sent. If an error
  occurred, the multifd_send thread might not be able to send more
  packets or send the SYNC packet. The main thread should be released
  so we can do cleanup.

These two actions need to occur in this order because the side queuing
the packets always checks for p->quit after it is allowed to continue.

There are a few moments where we want to perform these actions, so
extract that code into function to be reused.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
2023-10-03 11:31:13 -03:00
Fabiano Rosas
c2866a080d migration/multifd: Clarify Error usage in multifd_channel_connect
The function is currently called from two sites, one always gives it a
NULL Error and the other always gives it a non-NULL Error.

In the non-NULL case, all it does it trace the error and return. One
of the callers already have tracing, add a tracepoint to the other and
stop passing the error into the function.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
2023-10-03 11:31:13 -03:00
Fabiano Rosas
375c45671e migration/multifd: Set p->quit before releasing channels_ready
All waiters of channels_ready check p->quit shortly after being
released. We need to set the variable before posting the semaphore.

We probably never seen any issue here because this is a "not even
started" error case, which is probably very unlikely to happen.

The other place that releases the channels_ready semaphore is
multifd_send_thread() which does already set p->quit before the post
(via multifd_send_terminate_threads).

Signed-off-by: Fabiano Rosas <farosas@suse.de>
2023-10-03 11:31:13 -03:00
Fabiano Rosas
39875ab892 migration/multifd: Move error handling back into the multifd thread
The multifd_send_terminate_threads() is doing double duty terminating
the threads and setting the migration error and state. Clean it up.

This will allow for further simplification of the multifd thread
cleanup path in the following patches.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
2023-10-03 11:31:13 -03:00
Fabiano Rosas
4a2c3ad4fa migration/multifd: Unify multifd_send_thread error paths
The preferred usage of the Error type is to always set both the return
code and the error when a failure happens. As all code called from the
send thread follows this pattern, we'll always have the return code
and the error set at the same time.

Aside from the convention, in this piece of code this must be the
case, otherwise the if (ret != 0) would be exiting the thread without
calling multifd_send_terminate_threads() which is incorrect.

Unify both paths to make it clear that both are taken when there's an
error.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
2023-10-03 11:31:13 -03:00
Fabiano Rosas
f2accad0c8 migration/multifd: Remove channels_ready semaphore
The channels_ready semaphore is a global variable not linked to any
single multifd channel. Waiting on it only means that "some" channel
has become ready to send data. Since we need to address the channels
by index (multifd_send_state->params[i]), that information adds
nothing of value. The channel being addressed is not necessarily the
one that just released the semaphore.

The only usage of this semaphore that makes sense is to wait for it in
a loop that iterates for the number of channels. That could mean: all
channels have been setup and are operational OR all channels have
finished their work and are idle.

Currently all code that waits on channels_ready is redundant. There is
always a subsequent lock or semaphore that does the actual data
protection/synchronization.

- at multifd_send_pages: Waiting on channels_ready doesn't mean the
  'next_channel' is ready, it could be any other channel. So there are
  already cases where this code runs as if no semaphore was there.

  Waiting outside of the loop is also incorrect because if the current
  channel already has a pending_job, then it will loop into the next
  one without waiting the semaphore and the count will be greater than
  zero at the end of the execution.

  Checking that "any" channel is ready as a proxy for all channels
  being ready would work, but it's not what the code is doing and not
  really needed because the channel lock and 'sem' would be enough.

- at multifd_send_sync: This usage is correct, but it is made
  redundant by the wait on sem_sync. What this piece of code is doing
  is making sure all channels have sent the SYNC packet and became
  idle afterwards.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
2023-10-02 19:33:06 -03:00
Fabiano Rosas
b8d09729d9 migration/multifd: Remove direct "socket" references
We're about to enable support for other transports in multifd, so
remove direct references to sockets.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
2023-10-02 11:17:29 -03:00
3 changed files with 138 additions and 122 deletions

View File

@@ -362,8 +362,6 @@ struct {
MultiFDPages_t *pages; MultiFDPages_t *pages;
/* global number of generated multifd packets */ /* global number of generated multifd packets */
uint64_t packet_num; uint64_t packet_num;
/* send channels ready */
QemuSemaphore channels_ready;
/* /*
* Have we already run terminate threads. There is a race when it * Have we already run terminate threads. There is a race when it
* happens that we got one error while we are exiting. * happens that we got one error while we are exiting.
@@ -403,7 +401,6 @@ static int multifd_send_pages(QEMUFile *f)
return -1; return -1;
} }
qemu_sem_wait(&multifd_send_state->channels_ready);
/* /*
* next_channel can remain from a previous migration that was * next_channel can remain from a previous migration that was
* using more channels, so ensure it doesn't overflow if the * using more channels, so ensure it doesn't overflow if the
@@ -469,23 +466,11 @@ int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
return 1; return 1;
} }
static void multifd_send_terminate_threads(Error *err) static void multifd_send_terminate_threads(void)
{ {
int i; int i;
trace_multifd_send_terminate_threads(err != NULL); trace_multifd_send_terminate_threads();
if (err) {
MigrationState *s = migrate_get_current();
migrate_set_error(s, err);
if (s->state == MIGRATION_STATUS_SETUP ||
s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
s->state == MIGRATION_STATUS_DEVICE ||
s->state == MIGRATION_STATUS_ACTIVE) {
migrate_set_state(&s->state, s->state,
MIGRATION_STATUS_FAILED);
}
}
/* /*
* We don't want to exit each threads twice. Depending on where * We don't want to exit each threads twice. Depending on where
@@ -510,6 +495,11 @@ static void multifd_send_terminate_threads(Error *err)
} }
} }
static int multifd_send_channel_destroy(QIOChannel *send)
{
return socket_send_channel_destroy(send);
}
void multifd_save_cleanup(void) void multifd_save_cleanup(void)
{ {
int i; int i;
@@ -517,7 +507,7 @@ void multifd_save_cleanup(void)
if (!migrate_multifd()) { if (!migrate_multifd()) {
return; return;
} }
multifd_send_terminate_threads(NULL); multifd_send_terminate_threads();
for (i = 0; i < migrate_multifd_channels(); i++) { for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i]; MultiFDSendParams *p = &multifd_send_state->params[i];
@@ -532,11 +522,11 @@ void multifd_save_cleanup(void)
if (p->registered_yank) { if (p->registered_yank) {
migration_ioc_unregister_yank(p->c); migration_ioc_unregister_yank(p->c);
} }
socket_send_channel_destroy(p->c); multifd_send_channel_destroy(p->c);
p->c = NULL; p->c = NULL;
qemu_mutex_destroy(&p->mutex); qemu_mutex_destroy(&p->mutex);
qemu_sem_destroy(&p->sem); qemu_sem_destroy(&p->sem);
qemu_sem_destroy(&p->sem_sync); qemu_sem_destroy(&p->sem_done);
g_free(p->name); g_free(p->name);
p->name = NULL; p->name = NULL;
multifd_pages_clear(p->pages); multifd_pages_clear(p->pages);
@@ -554,7 +544,6 @@ void multifd_save_cleanup(void)
error_free(local_err); error_free(local_err);
} }
} }
qemu_sem_destroy(&multifd_send_state->channels_ready);
g_free(multifd_send_state->params); g_free(multifd_send_state->params);
multifd_send_state->params = NULL; multifd_send_state->params = NULL;
multifd_pages_clear(multifd_send_state->pages); multifd_pages_clear(multifd_send_state->pages);
@@ -580,19 +569,15 @@ static int multifd_zero_copy_flush(QIOChannel *c)
return ret; return ret;
} }
int multifd_send_sync_main(QEMUFile *f) static int multifd_send_wait(void)
{ {
int i;
bool flush_zero_copy; bool flush_zero_copy;
int i;
if (!migrate_multifd()) { /* wait for all channels to be idle */
return 0; for (i = 0; i < migrate_multifd_channels(); i++) {
} trace_multifd_send_wait(migrate_multifd_channels() - i);
if (multifd_send_state->pages->num) { qemu_sem_wait(&multifd_send_state->channels_ready);
if (multifd_send_pages(f) < 0) {
error_report("%s: multifd_send_pages fail", __func__);
return -1;
}
} }
/* /*
@@ -605,9 +590,46 @@ int multifd_send_sync_main(QEMUFile *f)
* to be less frequent, e.g. only after we finished one whole scanning of * to be less frequent, e.g. only after we finished one whole scanning of
* all the dirty bitmaps. * all the dirty bitmaps.
*/ */
flush_zero_copy = migrate_zero_copy_send(); flush_zero_copy = migrate_zero_copy_send();
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
qemu_mutex_lock(&p->mutex);
assert(!p->pending_job);
qemu_mutex_unlock(&p->mutex);
qemu_sem_post(&p->sem);
qemu_sem_wait(&p->sem_done);
if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
return -1;
}
}
/*
* All channels went idle and have no more jobs. Unless we send
* them more work, we're good to allow any cleanup code to run at
* this point.
*/
return 0;
}
int multifd_send_sync_main(QEMUFile *f)
{
int i, ret;
if (!migrate_multifd()) {
return 0;
}
if (multifd_send_state->pages->num) {
if (multifd_send_pages(f) < 0) {
error_report("%s: multifd_send_pages fail", __func__);
return -1;
}
}
for (i = 0; i < migrate_multifd_channels(); i++) { for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i]; MultiFDSendParams *p = &multifd_send_state->params[i];
@@ -627,20 +649,30 @@ int multifd_send_sync_main(QEMUFile *f)
qemu_mutex_unlock(&p->mutex); qemu_mutex_unlock(&p->mutex);
qemu_sem_post(&p->sem); qemu_sem_post(&p->sem);
} }
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
qemu_sem_wait(&multifd_send_state->channels_ready); ret = multifd_send_wait();
trace_multifd_send_sync_main_wait(p->id);
qemu_sem_wait(&p->sem_sync);
if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
return -1;
}
}
trace_multifd_send_sync_main(multifd_send_state->packet_num); trace_multifd_send_sync_main(multifd_send_state->packet_num);
return 0; return ret;
}
/*
* Mark that the thread has quit (including if it never even started)
* and release any waiters that might be stuck.
*/
static void multifd_send_thread_release(MultiFDSendParams *p)
{
/*
* Not all instances where this function is called happen with a
* live thread, but let's be conservative and always take the
* lock.
*/
qemu_mutex_lock(&p->mutex);
p->quit = true;
qemu_mutex_unlock(&p->mutex);
qemu_sem_post(&multifd_send_state->channels_ready);
qemu_sem_post(&p->sem_done);
} }
static void *multifd_send_thread(void *opaque) static void *multifd_send_thread(void *opaque)
@@ -664,7 +696,6 @@ static void *multifd_send_thread(void *opaque)
p->num_packets = 1; p->num_packets = 1;
while (true) { while (true) {
qemu_sem_post(&multifd_send_state->channels_ready);
qemu_sem_wait(&p->sem); qemu_sem_wait(&p->sem);
if (qatomic_read(&multifd_send_state->exiting)) { if (qatomic_read(&multifd_send_state->exiting)) {
@@ -734,32 +765,31 @@ static void *multifd_send_thread(void *opaque)
p->pending_job--; p->pending_job--;
qemu_mutex_unlock(&p->mutex); qemu_mutex_unlock(&p->mutex);
if (flags & MULTIFD_FLAG_SYNC) {
qemu_sem_post(&p->sem_sync);
}
} else if (p->quit) {
qemu_mutex_unlock(&p->mutex);
break;
} else { } else {
qemu_sem_post(&p->sem_done);
qemu_mutex_unlock(&p->mutex); qemu_mutex_unlock(&p->mutex);
/* sometimes there are spurious wakeups */
} }
} }
out: out:
if (local_err) { if (ret) {
trace_multifd_send_error(p->id); MigrationState *s = migrate_get_current();
multifd_send_terminate_threads(local_err);
error_free(local_err);
}
/* trace_multifd_send_error(p->id);
* Error happen, I will exit, but I can't just leave, tell assert(local_err);
* who pay attention to me. migrate_set_error(s, local_err);
*/
if (ret != 0) { if (s->state == MIGRATION_STATUS_SETUP ||
qemu_sem_post(&p->sem_sync); s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
qemu_sem_post(&multifd_send_state->channels_ready); s->state == MIGRATION_STATUS_DEVICE ||
s->state == MIGRATION_STATUS_ACTIVE) {
migrate_set_state(&s->state, s->state,
MIGRATION_STATUS_FAILED);
}
multifd_send_terminate_threads();
multifd_send_thread_release(p);
error_free(local_err);
} }
qemu_mutex_lock(&p->mutex); qemu_mutex_lock(&p->mutex);
@@ -775,7 +805,7 @@ out:
static bool multifd_channel_connect(MultiFDSendParams *p, static bool multifd_channel_connect(MultiFDSendParams *p,
QIOChannel *ioc, QIOChannel *ioc,
Error *error); Error **errp);
static void multifd_tls_outgoing_handshake(QIOTask *task, static void multifd_tls_outgoing_handshake(QIOTask *task,
gpointer opaque) gpointer opaque)
@@ -784,21 +814,15 @@ static void multifd_tls_outgoing_handshake(QIOTask *task,
QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
Error *err = NULL; Error *err = NULL;
if (qio_task_propagate_error(task, &err)) { if (!qio_task_propagate_error(task, &err)) {
trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
} else {
trace_multifd_tls_outgoing_handshake_complete(ioc); trace_multifd_tls_outgoing_handshake_complete(ioc);
if (multifd_channel_connect(p, ioc, &err)) {
return;
}
} }
if (!multifd_channel_connect(p, ioc, err)) { trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
/* multifd_send_thread_release(p);
* Error happen, mark multifd_send_thread status as 'quit' although it
* is not created, and then tell who pay attention to me.
*/
p->quit = true;
qemu_sem_post(&multifd_send_state->channels_ready);
qemu_sem_post(&p->sem_sync);
}
} }
static void *multifd_tls_handshake_thread(void *opaque) static void *multifd_tls_handshake_thread(void *opaque)
@@ -814,7 +838,7 @@ static void *multifd_tls_handshake_thread(void *opaque)
return NULL; return NULL;
} }
static void multifd_tls_channel_connect(MultiFDSendParams *p, static bool multifd_tls_channel_connect(MultiFDSendParams *p,
QIOChannel *ioc, QIOChannel *ioc,
Error **errp) Error **errp)
{ {
@@ -824,7 +848,7 @@ static void multifd_tls_channel_connect(MultiFDSendParams *p,
tioc = migration_tls_client_create(ioc, hostname, errp); tioc = migration_tls_client_create(ioc, hostname, errp);
if (!tioc) { if (!tioc) {
return; return false;
} }
object_unref(OBJECT(ioc)); object_unref(OBJECT(ioc));
@@ -834,31 +858,25 @@ static void multifd_tls_channel_connect(MultiFDSendParams *p,
qemu_thread_create(&p->thread, "multifd-tls-handshake-worker", qemu_thread_create(&p->thread, "multifd-tls-handshake-worker",
multifd_tls_handshake_thread, p, multifd_tls_handshake_thread, p,
QEMU_THREAD_JOINABLE); QEMU_THREAD_JOINABLE);
return true;
} }
static bool multifd_channel_connect(MultiFDSendParams *p, static bool multifd_channel_connect(MultiFDSendParams *p,
QIOChannel *ioc, QIOChannel *ioc,
Error *error) Error **errp)
{ {
trace_multifd_set_outgoing_channel( trace_multifd_set_outgoing_channel(
ioc, object_get_typename(OBJECT(ioc)), ioc, object_get_typename(OBJECT(ioc)),
migrate_get_current()->hostname, error); migrate_get_current()->hostname);
if (error) {
return false;
}
if (migrate_channel_requires_tls_upgrade(ioc)) { if (migrate_channel_requires_tls_upgrade(ioc)) {
multifd_tls_channel_connect(p, ioc, &error); /*
if (!error) { * tls_channel_connect will call back to this
/* * function after the TLS handshake,
* tls_channel_connect will call back to this * so we mustn't call multifd_send_thread until then
* function after the TLS handshake, */
* so we mustn't call multifd_send_thread until then return multifd_tls_channel_connect(p, ioc, errp);
*/
return true;
} else {
return false;
}
} else { } else {
migration_ioc_register_yank(ioc); migration_ioc_register_yank(ioc);
p->registered_yank = true; p->registered_yank = true;
@@ -873,15 +891,7 @@ static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
QIOChannel *ioc, Error *err) QIOChannel *ioc, Error *err)
{ {
migrate_set_error(migrate_get_current(), err); migrate_set_error(migrate_get_current(), err);
/* Error happen, we need to tell who pay attention to me */ multifd_send_thread_release(p);
qemu_sem_post(&multifd_send_state->channels_ready);
qemu_sem_post(&p->sem_sync);
/*
* Although multifd_send_thread is not created, but main migration
* thread need to judge whether it is running, so we need to mark
* its status.
*/
p->quit = true;
object_unref(OBJECT(ioc)); object_unref(OBJECT(ioc));
error_free(err); error_free(err);
} }
@@ -889,20 +899,26 @@ static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
{ {
MultiFDSendParams *p = opaque; MultiFDSendParams *p = opaque;
QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
Error *local_err = NULL; Error *local_err = NULL;
trace_multifd_new_send_channel_async(p->id); trace_multifd_new_send_channel_async(p->id);
if (!qio_task_propagate_error(task, &local_err)) { if (!qio_task_propagate_error(task, &local_err)) {
p->c = sioc; p->c = ioc;
qio_channel_set_delay(p->c, false); qio_channel_set_delay(p->c, false);
p->running = true; p->running = true;
if (multifd_channel_connect(p, sioc, local_err)) { if (multifd_channel_connect(p, ioc, &local_err)) {
return; return;
} }
} }
multifd_new_send_channel_cleanup(p, sioc, local_err); trace_multifd_new_send_channel_async_error(p->id, local_err);
multifd_new_send_channel_cleanup(p, ioc, local_err);
}
static void multifd_new_send_channel_create(gpointer opaque)
{
socket_send_channel_create(multifd_new_send_channel_async, opaque);
} }
int multifd_save_setup(Error **errp) int multifd_save_setup(Error **errp)
@@ -919,7 +935,6 @@ int multifd_save_setup(Error **errp)
multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
multifd_send_state->pages = multifd_pages_init(page_count); multifd_send_state->pages = multifd_pages_init(page_count);
qemu_sem_init(&multifd_send_state->channels_ready, 0);
qatomic_set(&multifd_send_state->exiting, 0); qatomic_set(&multifd_send_state->exiting, 0);
multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
@@ -928,7 +943,7 @@ int multifd_save_setup(Error **errp)
qemu_mutex_init(&p->mutex); qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0); qemu_sem_init(&p->sem, 0);
qemu_sem_init(&p->sem_sync, 0); qemu_sem_init(&p->sem_done, 0);
p->quit = false; p->quit = false;
p->pending_job = 0; p->pending_job = 0;
p->id = i; p->id = i;
@@ -951,7 +966,7 @@ int multifd_save_setup(Error **errp)
p->write_flags = 0; p->write_flags = 0;
} }
socket_send_channel_create(multifd_new_send_channel_async, p); multifd_new_send_channel_create(p);
} }
for (i = 0; i < thread_count; i++) { for (i = 0; i < thread_count; i++) {
@@ -1037,7 +1052,7 @@ void multifd_load_cleanup(void)
* multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
* however try to wakeup it without harm in cleanup phase. * however try to wakeup it without harm in cleanup phase.
*/ */
qemu_sem_post(&p->sem_sync); qemu_sem_post(&p->sem_done);
} }
qemu_thread_join(&p->thread); qemu_thread_join(&p->thread);
@@ -1049,7 +1064,7 @@ void multifd_load_cleanup(void)
object_unref(OBJECT(p->c)); object_unref(OBJECT(p->c));
p->c = NULL; p->c = NULL;
qemu_mutex_destroy(&p->mutex); qemu_mutex_destroy(&p->mutex);
qemu_sem_destroy(&p->sem_sync); qemu_sem_destroy(&p->sem_done);
g_free(p->name); g_free(p->name);
p->name = NULL; p->name = NULL;
p->packet_len = 0; p->packet_len = 0;
@@ -1090,7 +1105,7 @@ void multifd_recv_sync_main(void)
} }
} }
trace_multifd_recv_sync_main_signal(p->id); trace_multifd_recv_sync_main_signal(p->id);
qemu_sem_post(&p->sem_sync); qemu_sem_post(&p->sem_done);
} }
trace_multifd_recv_sync_main(multifd_recv_state->packet_num); trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
} }
@@ -1142,7 +1157,7 @@ static void *multifd_recv_thread(void *opaque)
if (flags & MULTIFD_FLAG_SYNC) { if (flags & MULTIFD_FLAG_SYNC) {
qemu_sem_post(&multifd_recv_state->sem_sync); qemu_sem_post(&multifd_recv_state->sem_sync);
qemu_sem_wait(&p->sem_sync); qemu_sem_wait(&p->sem_done);
} }
} }
@@ -1185,7 +1200,7 @@ int multifd_load_setup(Error **errp)
MultiFDRecvParams *p = &multifd_recv_state->params[i]; MultiFDRecvParams *p = &multifd_recv_state->params[i];
qemu_mutex_init(&p->mutex); qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem_sync, 0); qemu_sem_init(&p->sem_done, 0);
p->quit = false; p->quit = false;
p->id = i; p->id = i;
p->packet_len = sizeof(MultiFDPacket_t) p->packet_len = sizeof(MultiFDPacket_t)

View File

@@ -90,8 +90,8 @@ typedef struct {
/* sem where to wait for more work */ /* sem where to wait for more work */
QemuSemaphore sem; QemuSemaphore sem;
/* syncs main thread and channels */ /* channel is done transmitting until more pages are queued */
QemuSemaphore sem_sync; QemuSemaphore sem_done;
/* this mutex protects the following parameters */ /* this mutex protects the following parameters */
QemuMutex mutex; QemuMutex mutex;
@@ -153,8 +153,8 @@ typedef struct {
/* number of pages in a full packet */ /* number of pages in a full packet */
uint32_t page_count; uint32_t page_count;
/* syncs main thread and channels */ /* channel is done transmitting until more pages are queued */
QemuSemaphore sem_sync; QemuSemaphore sem_done;
/* this mutex protects the following parameters */ /* this mutex protects the following parameters */
QemuMutex mutex; QemuMutex mutex;

View File

@@ -124,6 +124,7 @@ postcopy_preempt_reset_channel(void) ""
# multifd.c # multifd.c
multifd_new_send_channel_async(uint8_t id) "channel %u" multifd_new_send_channel_async(uint8_t id) "channel %u"
multifd_new_send_channel_async_error(uint8_t id, void *err) "channel=%u err=%p"
multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " pages %u flags 0x%x next packet size %u" multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " pages %u flags 0x%x next packet size %u"
multifd_recv_new_channel(uint8_t id) "channel %u" multifd_recv_new_channel(uint8_t id) "channel %u"
multifd_recv_sync_main(long packet_num) "packet num %ld" multifd_recv_sync_main(long packet_num) "packet num %ld"
@@ -136,14 +137,14 @@ multifd_send(uint8_t id, uint64_t packet_num, uint32_t normal, uint32_t flags, u
multifd_send_error(uint8_t id) "channel %u" multifd_send_error(uint8_t id) "channel %u"
multifd_send_sync_main(long packet_num) "packet num %ld" multifd_send_sync_main(long packet_num) "packet num %ld"
multifd_send_sync_main_signal(uint8_t id) "channel %u" multifd_send_sync_main_signal(uint8_t id) "channel %u"
multifd_send_sync_main_wait(uint8_t id) "channel %u" multifd_send_wait(uint8_t n) "waiting for %u channels to finish sending"
multifd_send_terminate_threads(bool error) "error %d" multifd_send_terminate_threads(void) ""
multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64 multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64
multifd_send_thread_start(uint8_t id) "%u" multifd_send_thread_start(uint8_t id) "%u"
multifd_tls_outgoing_handshake_start(void *ioc, void *tioc, const char *hostname) "ioc=%p tioc=%p hostname=%s" multifd_tls_outgoing_handshake_start(void *ioc, void *tioc, const char *hostname) "ioc=%p tioc=%p hostname=%s"
multifd_tls_outgoing_handshake_error(void *ioc, const char *err) "ioc=%p err=%s" multifd_tls_outgoing_handshake_error(void *ioc, const char *err) "ioc=%p err=%s"
multifd_tls_outgoing_handshake_complete(void *ioc) "ioc=%p" multifd_tls_outgoing_handshake_complete(void *ioc) "ioc=%p"
multifd_set_outgoing_channel(void *ioc, const char *ioctype, const char *hostname, void *err) "ioc=%p ioctype=%s hostname=%s err=%p" multifd_set_outgoing_channel(void *ioc, const char *ioctype, const char *hostname) "ioc=%p ioctype=%s hostname=%s"
# migration.c # migration.c
await_return_path_close_on_source_close(void) "" await_return_path_close_on_source_close(void) ""