diff mbox series

[v4,11/33] migration: Add thread pool of optional load threads

Message ID 6bd6ea6a59ef6b350fbffa1ae82830762559fa52.1738171076.git.maciej.szmigiero@oracle.com (mailing list archive)
State New
Headers show
Series Multifd ���� device state transfer support with VFIO consumer | expand

Commit Message

Maciej S. Szmigiero Jan. 30, 2025, 10:08 a.m. UTC
From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>

Some drivers might want to make use of auxiliary helper threads during VM
state loading, for example to make sure that their blocking (sync) I/O
operations don't block the rest of the migration process.

Add a migration core managed thread pool to facilitate this use case.

The migration core will wait for these threads to finish before
(re)starting the VM at destination.

Reviewed-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
---
 include/migration/misc.h |  3 ++
 include/qemu/typedefs.h  |  2 +
 migration/migration.c    |  2 +-
 migration/migration.h    |  5 +++
 migration/savevm.c       | 92 +++++++++++++++++++++++++++++++++++++++-
 migration/savevm.h       |  2 +-
 6 files changed, 103 insertions(+), 3 deletions(-)
diff mbox series

Patch

diff --git a/include/migration/misc.h b/include/migration/misc.h
index 67f7ef7a0e5c..bc5ce31b52e0 100644
--- a/include/migration/misc.h
+++ b/include/migration/misc.h
@@ -45,9 +45,12 @@  bool migrate_ram_is_ignored(RAMBlock *block);
 /* migration/block.c */
 
 AnnounceParameters *migrate_announce_params(void);
+
 /* migration/savevm.c */
 
 void dump_vmstate_json_to_file(FILE *out_fp);
+void qemu_loadvm_start_load_thread(MigrationLoadThread function,
+                                   void *opaque);
 
 /* migration/migration.c */
 void migration_object_init(void);
diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
index 3d84efcac47a..fd23ff7771b1 100644
--- a/include/qemu/typedefs.h
+++ b/include/qemu/typedefs.h
@@ -131,5 +131,7 @@  typedef struct IRQState *qemu_irq;
  * Function types
  */
 typedef void (*qemu_irq_handler)(void *opaque, int n, int level);
+typedef bool (*MigrationLoadThread)(void *opaque, bool *should_quit,
+                                    Error **errp);
 
 #endif /* QEMU_TYPEDEFS_H */
diff --git a/migration/migration.c b/migration/migration.c
index 65b51d360896..0f29188499e4 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -400,7 +400,7 @@  void migration_incoming_state_destroy(void)
      * RAM state cleanup needs to happen after multifd cleanup, because
      * multifd threads can use some of its states (receivedmap).
      */
-    qemu_loadvm_state_cleanup();
+    qemu_loadvm_state_cleanup(mis);
 
     if (mis->to_src_file) {
         /* Tell source that we are done */
diff --git a/migration/migration.h b/migration/migration.h
index c5731626bbfb..1699fe7d91cc 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -43,6 +43,7 @@ 
 #define  MIGRATION_THREAD_DST_PREEMPT       "mig/dst/preempt"
 
 struct PostcopyBlocktimeContext;
+typedef struct ThreadPool ThreadPool;
 
 #define  MIGRATION_RESUME_ACK_VALUE  (1)
 
@@ -187,6 +188,10 @@  struct MigrationIncomingState {
     Coroutine *colo_incoming_co;
     QemuSemaphore colo_incoming_sem;
 
+    /* Optional load threads pool and its thread exit request flag */
+    ThreadPool *load_threads;
+    bool load_threads_abort;
+
     /*
      * PostcopyBlocktimeContext to keep information for postcopy
      * live migration, to calculate vCPU block time
diff --git a/migration/savevm.c b/migration/savevm.c
index 0ceea9638cc1..74d1960de3c6 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -54,6 +54,7 @@ 
 #include "qemu/job.h"
 #include "qemu/main-loop.h"
 #include "block/snapshot.h"
+#include "block/thread-pool.h"
 #include "qemu/cutils.h"
 #include "io/channel-buffer.h"
 #include "io/channel-file.h"
@@ -131,6 +132,35 @@  static struct mig_cmd_args {
  * generic extendable format with an exception for two old entities.
  */
 
+/***********************************************************/
+/* Optional load threads pool support */
+
+static void qemu_loadvm_thread_pool_create(MigrationIncomingState *mis)
+{
+    assert(!mis->load_threads);
+    mis->load_threads = thread_pool_new();
+    mis->load_threads_abort = false;
+}
+
+static void qemu_loadvm_thread_pool_destroy(MigrationIncomingState *mis)
+{
+    qatomic_set(&mis->load_threads_abort, true);
+
+    bql_unlock(); /* Load threads might be waiting for BQL */
+    g_clear_pointer(&mis->load_threads, thread_pool_free);
+    bql_lock();
+}
+
+static bool qemu_loadvm_thread_pool_wait(MigrationState *s,
+                                         MigrationIncomingState *mis)
+{
+    bql_unlock(); /* Let load threads do work requiring BQL */
+    thread_pool_wait(mis->load_threads);
+    bql_lock();
+
+    return !migrate_has_error(s);
+}
+
 /***********************************************************/
 /* savevm/loadvm support */
 
@@ -2810,16 +2840,62 @@  static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
     return 0;
 }
 
-void qemu_loadvm_state_cleanup(void)
+struct LoadThreadData {
+    MigrationLoadThread function;
+    void *opaque;
+};
+
+static int qemu_loadvm_load_thread(void *thread_opaque)
+{
+    struct LoadThreadData *data = thread_opaque;
+    MigrationIncomingState *mis = migration_incoming_get_current();
+    g_autoptr(Error) local_err = NULL;
+
+    if (!data->function(data->opaque, &mis->load_threads_abort, &local_err)) {
+        MigrationState *s = migrate_get_current();
+
+        assert(local_err);
+
+        /*
+         * In case of multiple load threads failing which thread error
+         * return we end setting is purely arbitrary.
+         */
+        migrate_set_error(s, local_err);
+    }
+
+    return 0;
+}
+
+void qemu_loadvm_start_load_thread(MigrationLoadThread function,
+                                   void *opaque)
+{
+    MigrationIncomingState *mis = migration_incoming_get_current();
+    struct LoadThreadData *data;
+
+    /* We only set it from this thread so it's okay to read it directly */
+    assert(!mis->load_threads_abort);
+
+    data = g_new(struct LoadThreadData, 1);
+    data->function = function;
+    data->opaque = opaque;
+
+    thread_pool_submit_immediate(mis->load_threads, qemu_loadvm_load_thread,
+                                 data, g_free);
+}
+
+void qemu_loadvm_state_cleanup(MigrationIncomingState *mis)
 {
     SaveStateEntry *se;
 
     trace_loadvm_state_cleanup();
+
     QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
         if (se->ops && se->ops->load_cleanup) {
             se->ops->load_cleanup(se->opaque);
         }
     }
+
+    qemu_loadvm_thread_pool_destroy(mis);
 }
 
 /* Return true if we should continue the migration, or false. */
@@ -2970,6 +3046,7 @@  out:
 
 int qemu_loadvm_state(QEMUFile *f)
 {
+    MigrationState *s = migrate_get_current();
     MigrationIncomingState *mis = migration_incoming_get_current();
     Error *local_err = NULL;
     int ret;
@@ -2979,6 +3056,8 @@  int qemu_loadvm_state(QEMUFile *f)
         return -EINVAL;
     }
 
+    qemu_loadvm_thread_pool_create(mis);
+
     ret = qemu_loadvm_state_header(f);
     if (ret) {
         return ret;
@@ -3008,6 +3087,17 @@  int qemu_loadvm_state(QEMUFile *f)
         return ret;
     }
 
+    if (ret == 0) {
+        if (!qemu_loadvm_thread_pool_wait(s, mis)) {
+            ret = -EINVAL;
+        }
+    }
+    /*
+     * Set this flag unconditionally so we'll catch further attempts to
+     * start additional threads via an appropriate assert()
+     */
+    qatomic_set(&mis->load_threads_abort, true);
+
     if (ret == 0) {
         ret = qemu_file_get_error(f);
     }
diff --git a/migration/savevm.h b/migration/savevm.h
index 8b78493dbc0e..3fa06574e632 100644
--- a/migration/savevm.h
+++ b/migration/savevm.h
@@ -64,7 +64,7 @@  void qemu_savevm_live_state(QEMUFile *f);
 int qemu_save_device_state(QEMUFile *f);
 
 int qemu_loadvm_state(QEMUFile *f);
-void qemu_loadvm_state_cleanup(void);
+void qemu_loadvm_state_cleanup(MigrationIncomingState *mis);
 int qemu_loadvm_state_main(QEMUFile *f, MigrationIncomingState *mis);
 int qemu_load_device_state(QEMUFile *f);
 int qemu_loadvm_approve_switchover(void);