diff --git a/migration/multifd.c b/migration/multifd.c index dd4cbf8ce1..df6fb4a25e 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -1014,6 +1014,8 @@ int multifd_save_setup(Error **errp) struct { MultiFDRecvParams *params; + /* array of pages to receive */ + MultiFDPages_t *pages; /* number of created threads */ int count; /* syncs main thread and channels */ @@ -1024,6 +1026,75 @@ struct { MultiFDMethods *ops; } *multifd_recv_state; +static int multifd_recv_pages(QEMUFile *f) +{ + int i; + static int next_recv_channel; + MultiFDRecvParams *p = NULL; + MultiFDPages_t *pages = multifd_recv_state->pages; + + /* + * next_channel can remain from a previous migration that was + * using more channels, so ensure it doesn't overflow if the + * limit is lower now. + */ + next_recv_channel %= migrate_multifd_channels(); + for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { + p = &multifd_recv_state->params[i]; + + qemu_mutex_lock(&p->mutex); + if (p->quit) { + error_report("%s: channel %d has already quit!", __func__, i); + qemu_mutex_unlock(&p->mutex); + return -1; + } + if (!p->pending_job) { + p->pending_job++; + next_recv_channel = (i + 1) % migrate_multifd_channels(); + break; + } + qemu_mutex_unlock(&p->mutex); + } + + multifd_recv_state->pages = p->pages; + p->pages = pages; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem); + + return 1; +} + +int multifd_recv_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) +{ + MultiFDPages_t *pages = multifd_recv_state->pages; + bool changed = false; + + if (!pages->block) { + pages->block = block; + } + + if (pages->block == block) { + pages->offset[pages->num] = offset; + pages->num++; + + if (pages->num < pages->allocated) { + return 1; + } + } else { + changed = true; + } + + if (multifd_recv_pages(f) < 0) { + return -1; + } + + if (changed) { + return multifd_recv_queue_page(f, block, offset); + } + + return 1; +} + static void multifd_recv_terminate_threads(Error *err) { int i; @@ -1045,6 +1116,7 @@ static void multifd_recv_terminate_threads(Error *err) qemu_mutex_lock(&p->mutex); p->quit = true; + qemu_sem_post(&p->sem); /* * We could arrive here for two reasons: * - normal quit, i.e. everything went fine, just finished @@ -1093,9 +1165,12 @@ void multifd_load_cleanup(void) object_unref(OBJECT(p->c)); p->c = NULL; qemu_mutex_destroy(&p->mutex); + qemu_sem_destroy(&p->sem); qemu_sem_destroy(&p->sem_done); g_free(p->name); p->name = NULL; + multifd_pages_clear(p->pages); + p->pages = NULL; p->packet_len = 0; g_free(p->packet); p->packet = NULL; @@ -1108,6 +1183,8 @@ void multifd_load_cleanup(void) qemu_sem_destroy(&multifd_recv_state->sem_sync); g_free(multifd_recv_state->params); multifd_recv_state->params = NULL; + multifd_pages_clear(multifd_recv_state->pages); + multifd_recv_state->pages = NULL; g_free(multifd_recv_state); multifd_recv_state = NULL; } @@ -1170,6 +1247,25 @@ static void *multifd_recv_thread(void *opaque) break; } p->num_packets++; + } else { + /* + * No packets, so we need to wait for the vmstate code to + * queue pages. + */ + qemu_sem_wait(&p->sem); + qemu_mutex_lock(&p->mutex); + if (!p->pending_job) { + qemu_mutex_unlock(&p->mutex); + break; + } + + for (int i = 0; i < p->pages->num; i++) { + p->normal[p->normal_num] = p->pages->offset[i]; + p->normal_num++; + } + + p->pages->num = 0; + p->host = p->pages->block->host; } flags = p->flags; @@ -1192,6 +1288,13 @@ static void *multifd_recv_thread(void *opaque) qemu_sem_post(&multifd_recv_state->sem_sync); qemu_sem_wait(&p->sem_done); } + + if (!use_packets) { + qemu_mutex_lock(&p->mutex); + p->pending_job--; + p->pages->block = NULL; + qemu_mutex_unlock(&p->mutex); + } } if (local_err) { @@ -1226,6 +1329,7 @@ int multifd_load_setup(Error **errp) thread_count = migrate_multifd_channels(); multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); + multifd_recv_state->pages = multifd_pages_init(page_count); qatomic_set(&multifd_recv_state->count, 0); qemu_sem_init(&multifd_recv_state->sem_sync, 0); multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; @@ -1234,9 +1338,12 @@ int multifd_load_setup(Error **errp) MultiFDRecvParams *p = &multifd_recv_state->params[i]; qemu_mutex_init(&p->mutex); + qemu_sem_init(&p->sem, 0); qemu_sem_init(&p->sem_done, 0); p->quit = false; + p->pending_job = 0; p->id = i; + p->pages = multifd_pages_init(page_count); if (use_packets) { p->packet_len = sizeof(MultiFDPacket_t) diff --git a/migration/multifd.h b/migration/multifd.h index 9c68178526..7bb611db50 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -24,6 +24,7 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp); void multifd_recv_sync_main(void); int multifd_send_sync_main(QEMUFile *f); int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset); +int multifd_recv_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset); /* Multifd Compression flags */ #define MULTIFD_FLAG_SYNC (1 << 0) @@ -153,7 +154,11 @@ typedef struct { uint32_t page_size; /* number of pages in a full packet */ uint32_t page_count; + /* multifd flags for receiving ram */ + int read_flags; + /* sem where to wait for more work */ + QemuSemaphore sem; /* channel is done transmitting until more pages are queued */ QemuSemaphore sem_done; @@ -167,6 +172,13 @@ typedef struct { uint32_t flags; /* global number of generated multifd packets */ uint64_t packet_num; + int pending_job; + /* array of pages to sent. + * The owner of 'pages' depends of 'pending_job' value: + * pending_job == 0 -> migration_thread can use it. + * pending_job != 0 -> multifd_channel can use it. + */ + MultiFDPages_t *pages; /* thread local variables. No locking required */ @@ -210,4 +222,3 @@ typedef struct { void multifd_register_ops(int method, MultiFDMethods *ops); #endif -