@@ -386,6 +386,8 @@ struct {
int exiting;
/* multifd ops */
MultiFDMethods *ops;
+ /* multifd setup ops */
+ MultiFDSetup *setup_ops;
} *multifd_send_state;
/*
@@ -805,8 +807,9 @@ static bool multifd_channel_connect(MultiFDSendParams *p,
} else {
/* update for tls qio channel */
p->c = ioc;
- qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
- QEMU_THREAD_JOINABLE);
+ qemu_thread_create(&p->thread, p->name,
+ multifd_send_state->setup_ops->send_thread,
+ p, QEMU_THREAD_JOINABLE);
}
return false;
}
@@ -854,6 +857,11 @@ cleanup:
multifd_new_send_channel_cleanup(p, sioc, local_err);
}
+static void multifd_send_channel_setup(MultiFDSendParams *p)
+{
+ socket_send_channel_create(multifd_new_send_channel_async, p);
+}
+
int multifd_save_setup(Error **errp)
{
int thread_count;
@@ -871,6 +879,7 @@ int multifd_save_setup(Error **errp)
multifd_send_state->pages = multifd_pages_init(page_count);
qemu_sem_init(&multifd_send_state->channels_ready, 0);
qatomic_set(&multifd_send_state->exiting, 0);
+ multifd_send_state->setup_ops = multifd_setup_ops_init();
multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
for (i = 0; i < thread_count; i++) {
@@ -890,7 +899,7 @@ int multifd_save_setup(Error **errp)
p->packet->version = cpu_to_be32(MULTIFD_VERSION);
p->name = g_strdup_printf("multifdsend_%d", i);
p->tls_hostname = g_strdup(s->hostname);
- socket_send_channel_create(multifd_new_send_channel_async, p);
+ multifd_send_state->setup_ops->send_channel_setup(p);
}
for (i = 0; i < thread_count; i++) {
@@ -917,6 +926,8 @@ struct {
uint64_t packet_num;
/* multifd ops */
MultiFDMethods *ops;
+ /* multifd setup ops */
+ MultiFDSetup *setup_ops;
} *multifd_recv_state;
static void multifd_recv_terminate_threads(Error *err)
@@ -1117,6 +1128,7 @@ int multifd_load_setup(Error **errp)
multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
qatomic_set(&multifd_recv_state->count, 0);
qemu_sem_init(&multifd_recv_state->sem_sync, 0);
+ multifd_recv_state->setup_ops = multifd_setup_ops_init();
multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
for (i = 0; i < thread_count; i++) {
@@ -1195,9 +1207,31 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
p->num_packets = 1;
p->running = true;
- qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
- QEMU_THREAD_JOINABLE);
+ multifd_recv_state->setup_ops->recv_channel_setup(ioc, p);
+ qemu_thread_create(&p->thread, p->name,
+ multifd_recv_state->setup_ops->recv_thread,
+ p, QEMU_THREAD_JOINABLE);
qatomic_inc(&multifd_recv_state->count);
return qatomic_read(&multifd_recv_state->count) ==
migrate_multifd_channels();
}
+
+static void multifd_recv_channel_setup(QIOChannel *ioc, MultiFDRecvParams *p)
+{
+ return;
+}
+
+static MultiFDSetup multifd_socket_ops = {
+ .send_thread = multifd_send_thread,
+ .recv_thread = multifd_recv_thread,
+ .send_channel_setup = multifd_send_channel_setup,
+ .recv_channel_setup = multifd_recv_channel_setup
+};
+
+MultiFDSetup *multifd_setup_ops_init(void)
+{
+ MultiFDSetup *ops;
+
+ ops = &multifd_socket_ops;
+ return ops;
+}
@@ -166,6 +166,13 @@ typedef struct {
int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **errp);
} MultiFDMethods;
+typedef struct {
+ void *(*send_thread)(void *opaque);
+ void *(*recv_thread)(void *opaque);
+ void (*send_channel_setup)(MultiFDSendParams *p);
+ void (*recv_channel_setup)(QIOChannel *ioc, MultiFDRecvParams *p);
+} MultiFDSetup;
+
void multifd_register_ops(int method, MultiFDMethods *ops);
#endif
Create multifd_setup_ops for TxRx thread, no logic change. Signed-off-by: Chuan Zheng <zhengchuan@huawei.com> --- migration/multifd.c | 44 +++++++++++++++++++++++++++++++++++++++----- migration/multifd.h | 7 +++++++ 2 files changed, 46 insertions(+), 5 deletions(-)