migration/multifd: Allow multifd without packets

For the upcoming support to the new 'fixed-ram' migration stream
format, we cannot use multifd packets because each write into the
ramblock section in the migration file is expected to contain only the
guest pages. They are written at their respective offsets relative to
the ramblock section header.

There is no space for the packet information and the expected gains
from the new approach come partly from being able to write the pages
sequentially without extraneous data in between.

The new format also doesn't need the packets and all necessary
information can be taken from the standard migration headers with some
(future) changes to multifd code.

Use the presence of the fixed-ram capability to decide whether to send
packets. For now this has no effect as fixed-ram cannot yet be enabled
with multifd.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
This commit is contained in:
Fabiano Rosas
2023-03-23 18:17:16 -03:00
parent 4471f329b9
commit a53d2953b1
3 changed files with 80 additions and 45 deletions

View File

@@ -658,18 +658,22 @@ static void *multifd_send_thread(void *opaque)
Error *local_err = NULL;
int ret = 0;
bool use_zero_copy_send = migrate_zero_copy_send();
bool use_packets = migrate_multifd_packets();
thread = MigrationThreadAdd(p->name, qemu_get_thread_id());
trace_multifd_send_thread_start(p->id);
rcu_register_thread();
if (multifd_send_initial_packet(p, &local_err) < 0) {
ret = -1;
goto out;
if (use_packets) {
if (multifd_send_initial_packet(p, &local_err) < 0) {
ret = -1;
goto out;
}
/* initial packet */
p->num_packets = 1;
}
/* initial packet */
p->num_packets = 1;
while (true) {
qemu_sem_post(&multifd_send_state->channels_ready);
@@ -681,11 +685,10 @@ static void *multifd_send_thread(void *opaque)
qemu_mutex_lock(&p->mutex);
if (p->pending_job) {
uint64_t packet_num = p->packet_num;
uint32_t flags;
p->normal_num = 0;
if (use_zero_copy_send) {
if (!use_packets || use_zero_copy_send) {
p->iovs_num = 0;
} else {
p->iovs_num = 1;
@@ -703,16 +706,20 @@ static void *multifd_send_thread(void *opaque)
break;
}
}
multifd_send_fill_packet(p);
if (use_packets) {
multifd_send_fill_packet(p);
p->num_packets++;
}
flags = p->flags;
p->flags = 0;
p->num_packets++;
p->total_normal_pages += p->normal_num;
p->pages->num = 0;
p->pages->block = NULL;
qemu_mutex_unlock(&p->mutex);
trace_multifd_send(p->id, packet_num, p->normal_num, flags,
trace_multifd_send(p->id, p->packet_num, p->normal_num, flags,
p->next_packet_size);
if (use_zero_copy_send) {
@@ -722,7 +729,7 @@ static void *multifd_send_thread(void *opaque)
if (ret != 0) {
break;
}
} else {
} else if (use_packets) {
/* Send header using the same writev call */
p->iov[0].iov_len = p->packet_len;
p->iov[0].iov_base = p->packet;
@@ -918,6 +925,7 @@ int multifd_save_setup(Error **errp)
{
int thread_count;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
bool use_packets = migrate_multifd_packets();
uint8_t i;
if (!migrate_multifd()) {
@@ -942,14 +950,20 @@ int multifd_save_setup(Error **errp)
p->pending_job = 0;
p->id = i;
p->pages = multifd_pages_init(page_count);
p->packet_len = sizeof(MultiFDPacket_t)
+ sizeof(uint64_t) * page_count;
p->packet = g_malloc0(p->packet_len);
p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
p->packet->version = cpu_to_be32(MULTIFD_VERSION);
if (use_packets) {
p->packet_len = sizeof(MultiFDPacket_t)
+ sizeof(uint64_t) * page_count;
p->packet = g_malloc0(p->packet_len);
p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
p->packet->version = cpu_to_be32(MULTIFD_VERSION);
/* We need one extra place for the packet header */
p->iov = g_new0(struct iovec, page_count + 1);
} else {
p->iov = g_new0(struct iovec, page_count);
}
p->name = g_strdup_printf("multifdsend_%d", i);
/* We need one extra place for the packet header */
p->iov = g_new0(struct iovec, page_count + 1);
p->normal = g_new0(ram_addr_t, page_count);
p->page_size = qemu_target_page_size();
p->page_count = page_count;
@@ -1081,7 +1095,7 @@ void multifd_recv_sync_main(void)
{
int i;
if (!migrate_multifd()) {
if (!migrate_multifd() || !migrate_multifd_packets()) {
return;
}
for (i = 0; i < migrate_multifd_channels(); i++) {
@@ -1108,6 +1122,7 @@ static void *multifd_recv_thread(void *opaque)
{
MultiFDRecvParams *p = opaque;
Error *local_err = NULL;
bool use_packets = migrate_multifd_packets();
int ret;
trace_multifd_recv_thread_start(p->id);
@@ -1120,17 +1135,20 @@ static void *multifd_recv_thread(void *opaque)
break;
}
ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
p->packet_len, &local_err);
if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
break;
}
if (use_packets) {
ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
p->packet_len, &local_err);
if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
break;
}
qemu_mutex_lock(&p->mutex);
ret = multifd_recv_unfill_packet(p, &local_err);
if (ret) {
qemu_mutex_unlock(&p->mutex);
break;
qemu_mutex_lock(&p->mutex);
ret = multifd_recv_unfill_packet(p, &local_err);
if (ret) {
qemu_mutex_unlock(&p->mutex);
break;
}
p->num_packets++;
}
flags = p->flags;
@@ -1138,7 +1156,7 @@ static void *multifd_recv_thread(void *opaque)
p->flags &= ~MULTIFD_FLAG_SYNC;
trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
p->next_packet_size);
p->num_packets++;
p->total_normal_pages += p->normal_num;
qemu_mutex_unlock(&p->mutex);
@@ -1173,6 +1191,7 @@ int multifd_load_setup(Error **errp)
{
int thread_count;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
bool use_packets = migrate_multifd_packets();
uint8_t i;
/*
@@ -1197,9 +1216,12 @@ int multifd_load_setup(Error **errp)
qemu_sem_init(&p->sem_sync, 0);
p->quit = false;
p->id = i;
p->packet_len = sizeof(MultiFDPacket_t)
+ sizeof(uint64_t) * page_count;
p->packet = g_malloc0(p->packet_len);
if (use_packets) {
p->packet_len = sizeof(MultiFDPacket_t)
+ sizeof(uint64_t) * page_count;
p->packet = g_malloc0(p->packet_len);
}
p->name = g_strdup_printf("multifdrecv_%d", i);
p->iov = g_new0(struct iovec, page_count);
p->normal = g_new0(ram_addr_t, page_count);
@@ -1245,18 +1267,26 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
{
MultiFDRecvParams *p;
Error *local_err = NULL;
int id;
bool use_packets = migrate_multifd_packets();
int id, num_packets = 0;
id = multifd_recv_initial_packet(ioc, &local_err);
if (id < 0) {
multifd_recv_terminate_threads(local_err);
error_propagate_prepend(errp, local_err,
"failed to receive packet"
" via multifd channel %d: ",
qatomic_read(&multifd_recv_state->count));
return;
if (use_packets) {
id = multifd_recv_initial_packet(ioc, &local_err);
if (id < 0) {
multifd_recv_terminate_threads(local_err);
error_propagate_prepend(errp, local_err,
"failed to receive packet"
" via multifd channel %d: ",
qatomic_read(&multifd_recv_state->count));
return;
}
trace_multifd_recv_new_channel(id);
/* initial packet */
num_packets = 1;
} else {
id = 0;
}
trace_multifd_recv_new_channel(id);
p = &multifd_recv_state->params[id];
if (p->c != NULL) {
@@ -1267,9 +1297,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
return;
}
p->c = ioc;
p->num_packets = num_packets;
object_ref(OBJECT(ioc));
/* initial packet */
p->num_packets = 1;
p->running = true;
qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,

View File

@@ -365,6 +365,11 @@ bool migrate_live(void)
return !migrate_suspend();
}
bool migrate_multifd_packets(void)
{
return !migrate_fixed_ram();
}
bool migrate_postcopy(void)
{
return migrate_postcopy_ram() || migrate_dirty_bitmaps();

View File

@@ -63,6 +63,7 @@ bool migrate_zero_copy_send(void);
bool migrate_multifd_flush_after_each_section(void);
bool migrate_live(void);
bool migrate_multifd_packets(void);
bool migrate_postcopy(void);
bool migrate_tls(void);