diff --git a/migration/multifd.c b/migration/multifd.c index 95d58a6657..4b80352a9a 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -658,18 +658,22 @@ static void *multifd_send_thread(void *opaque) Error *local_err = NULL; int ret = 0; bool use_zero_copy_send = migrate_zero_copy_send(); + bool use_packets = migrate_multifd_packets(); thread = MigrationThreadAdd(p->name, qemu_get_thread_id()); trace_multifd_send_thread_start(p->id); rcu_register_thread(); - if (multifd_send_initial_packet(p, &local_err) < 0) { - ret = -1; - goto out; + if (use_packets) { + if (multifd_send_initial_packet(p, &local_err) < 0) { + ret = -1; + goto out; + } + + /* initial packet */ + p->num_packets = 1; } - /* initial packet */ - p->num_packets = 1; while (true) { qemu_sem_post(&multifd_send_state->channels_ready); @@ -681,11 +685,10 @@ static void *multifd_send_thread(void *opaque) qemu_mutex_lock(&p->mutex); if (p->pending_job) { - uint64_t packet_num = p->packet_num; uint32_t flags; p->normal_num = 0; - if (use_zero_copy_send) { + if (!use_packets || use_zero_copy_send) { p->iovs_num = 0; } else { p->iovs_num = 1; @@ -703,16 +706,20 @@ static void *multifd_send_thread(void *opaque) break; } } - multifd_send_fill_packet(p); + + if (use_packets) { + multifd_send_fill_packet(p); + p->num_packets++; + } + flags = p->flags; p->flags = 0; - p->num_packets++; p->total_normal_pages += p->normal_num; p->pages->num = 0; p->pages->block = NULL; qemu_mutex_unlock(&p->mutex); - trace_multifd_send(p->id, packet_num, p->normal_num, flags, + trace_multifd_send(p->id, p->packet_num, p->normal_num, flags, p->next_packet_size); if (use_zero_copy_send) { @@ -722,7 +729,7 @@ static void *multifd_send_thread(void *opaque) if (ret != 0) { break; } - } else { + } else if (use_packets) { /* Send header using the same writev call */ p->iov[0].iov_len = p->packet_len; p->iov[0].iov_base = p->packet; @@ -918,6 +925,7 @@ int multifd_save_setup(Error **errp) { int thread_count; uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); + bool use_packets = migrate_multifd_packets(); uint8_t i; if (!migrate_multifd()) { @@ -942,14 +950,20 @@ int multifd_save_setup(Error **errp) p->pending_job = 0; p->id = i; p->pages = multifd_pages_init(page_count); - p->packet_len = sizeof(MultiFDPacket_t) - + sizeof(uint64_t) * page_count; - p->packet = g_malloc0(p->packet_len); - p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); - p->packet->version = cpu_to_be32(MULTIFD_VERSION); + + if (use_packets) { + p->packet_len = sizeof(MultiFDPacket_t) + + sizeof(uint64_t) * page_count; + p->packet = g_malloc0(p->packet_len); + p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); + p->packet->version = cpu_to_be32(MULTIFD_VERSION); + + /* We need one extra place for the packet header */ + p->iov = g_new0(struct iovec, page_count + 1); + } else { + p->iov = g_new0(struct iovec, page_count); + } p->name = g_strdup_printf("multifdsend_%d", i); - /* We need one extra place for the packet header */ - p->iov = g_new0(struct iovec, page_count + 1); p->normal = g_new0(ram_addr_t, page_count); p->page_size = qemu_target_page_size(); p->page_count = page_count; @@ -1081,7 +1095,7 @@ void multifd_recv_sync_main(void) { int i; - if (!migrate_multifd()) { + if (!migrate_multifd() || !migrate_multifd_packets()) { return; } for (i = 0; i < migrate_multifd_channels(); i++) { @@ -1108,6 +1122,7 @@ static void *multifd_recv_thread(void *opaque) { MultiFDRecvParams *p = opaque; Error *local_err = NULL; + bool use_packets = migrate_multifd_packets(); int ret; trace_multifd_recv_thread_start(p->id); @@ -1120,17 +1135,20 @@ static void *multifd_recv_thread(void *opaque) break; } - ret = qio_channel_read_all_eof(p->c, (void *)p->packet, - p->packet_len, &local_err); - if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ - break; - } + if (use_packets) { + ret = qio_channel_read_all_eof(p->c, (void *)p->packet, + p->packet_len, &local_err); + if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ + break; + } - qemu_mutex_lock(&p->mutex); - ret = multifd_recv_unfill_packet(p, &local_err); - if (ret) { - qemu_mutex_unlock(&p->mutex); - break; + qemu_mutex_lock(&p->mutex); + ret = multifd_recv_unfill_packet(p, &local_err); + if (ret) { + qemu_mutex_unlock(&p->mutex); + break; + } + p->num_packets++; } flags = p->flags; @@ -1138,7 +1156,7 @@ static void *multifd_recv_thread(void *opaque) p->flags &= ~MULTIFD_FLAG_SYNC; trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags, p->next_packet_size); - p->num_packets++; + p->total_normal_pages += p->normal_num; qemu_mutex_unlock(&p->mutex); @@ -1173,6 +1191,7 @@ int multifd_load_setup(Error **errp) { int thread_count; uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); + bool use_packets = migrate_multifd_packets(); uint8_t i; /* @@ -1197,9 +1216,12 @@ int multifd_load_setup(Error **errp) qemu_sem_init(&p->sem_sync, 0); p->quit = false; p->id = i; - p->packet_len = sizeof(MultiFDPacket_t) - + sizeof(uint64_t) * page_count; - p->packet = g_malloc0(p->packet_len); + + if (use_packets) { + p->packet_len = sizeof(MultiFDPacket_t) + + sizeof(uint64_t) * page_count; + p->packet = g_malloc0(p->packet_len); + } p->name = g_strdup_printf("multifdrecv_%d", i); p->iov = g_new0(struct iovec, page_count); p->normal = g_new0(ram_addr_t, page_count); @@ -1245,18 +1267,26 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) { MultiFDRecvParams *p; Error *local_err = NULL; - int id; + bool use_packets = migrate_multifd_packets(); + int id, num_packets = 0; - id = multifd_recv_initial_packet(ioc, &local_err); - if (id < 0) { - multifd_recv_terminate_threads(local_err); - error_propagate_prepend(errp, local_err, - "failed to receive packet" - " via multifd channel %d: ", - qatomic_read(&multifd_recv_state->count)); - return; + if (use_packets) { + id = multifd_recv_initial_packet(ioc, &local_err); + if (id < 0) { + multifd_recv_terminate_threads(local_err); + error_propagate_prepend(errp, local_err, + "failed to receive packet" + " via multifd channel %d: ", + qatomic_read(&multifd_recv_state->count)); + return; + } + trace_multifd_recv_new_channel(id); + + /* initial packet */ + num_packets = 1; + } else { + id = 0; } - trace_multifd_recv_new_channel(id); p = &multifd_recv_state->params[id]; if (p->c != NULL) { @@ -1267,9 +1297,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) return; } p->c = ioc; + p->num_packets = num_packets; object_ref(OBJECT(ioc)); - /* initial packet */ - p->num_packets = 1; p->running = true; qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, diff --git a/migration/options.c b/migration/options.c index 63654a09fd..e3f9e2aa16 100644 --- a/migration/options.c +++ b/migration/options.c @@ -365,6 +365,11 @@ bool migrate_live(void) return !migrate_suspend(); } +bool migrate_multifd_packets(void) +{ + return !migrate_fixed_ram(); +} + bool migrate_postcopy(void) { return migrate_postcopy_ram() || migrate_dirty_bitmaps(); diff --git a/migration/options.h b/migration/options.h index 08ec579f4d..2807e1548e 100644 --- a/migration/options.h +++ b/migration/options.h @@ -63,6 +63,7 @@ bool migrate_zero_copy_send(void); bool migrate_multifd_flush_after_each_section(void); bool migrate_live(void); +bool migrate_multifd_packets(void); bool migrate_postcopy(void); bool migrate_tls(void);