diff mbox series

[10/16] migration/multifd: Enable DSA offloading in multifd sender path.

Message ID 20231025193822.2813204-11-hao.xiang@bytedance.com (mailing list archive)
State New, archived
Headers show
Series Use Intel DSA accelerator to offload zero page checking in multifd live migration. | expand

Commit Message

Hao Xiang Oct. 25, 2023, 7:38 p.m. UTC
Multifd sender path gets an array of pages queued by the migration
thread. It performs zero page checking on every page in the array.
The pages are classfied as either a zero page or a normal page. This
change uses Intel DSA to offload the zero page checking from CPU to
the DSA accelerator. The sender thread submits a batch of pages to DSA
hardware and waits for the DSA completion thread to signal for work
completion.

Signed-off-by: Hao Xiang <hao.xiang@bytedance.com>
---
 migration/multifd.c | 101 +++++++++++++++++++++++++++++++++++++-------
 migration/multifd.h |   3 ++
 2 files changed, 89 insertions(+), 15 deletions(-)

Comments

Fabiano Rosas Oct. 30, 2023, 2:37 p.m. UTC | #1
Hao Xiang <hao.xiang@bytedance.com> writes:

> Multifd sender path gets an array of pages queued by the migration
> thread. It performs zero page checking on every page in the array.
> The pages are classfied as either a zero page or a normal page. This
> change uses Intel DSA to offload the zero page checking from CPU to
> the DSA accelerator. The sender thread submits a batch of pages to DSA
> hardware and waits for the DSA completion thread to signal for work
> completion.
>
> Signed-off-by: Hao Xiang <hao.xiang@bytedance.com>
> ---
>  migration/multifd.c | 101 +++++++++++++++++++++++++++++++++++++-------
>  migration/multifd.h |   3 ++
>  2 files changed, 89 insertions(+), 15 deletions(-)
>
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 452fb158b8..79fecbd3ae 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -13,6 +13,8 @@
>  #include "qemu/osdep.h"
>  #include "qemu/rcu.h"
>  #include "qemu/cutils.h"
> +#include "qemu/dsa.h"
> +#include "qemu/memalign.h"
>  #include "exec/target_page.h"
>  #include "sysemu/sysemu.h"
>  #include "exec/ramblock.h"
> @@ -555,6 +557,8 @@ void multifd_save_cleanup(void)
>              qemu_thread_join(&p->thread);
>          }
>      }
> +    dsa_stop();
> +    dsa_cleanup();
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>          Error *local_err = NULL;
> @@ -571,6 +575,11 @@ void multifd_save_cleanup(void)
>          p->name = NULL;
>          multifd_pages_clear(p->pages);
>          p->pages = NULL;
> +        g_free(p->addr);
> +        p->addr = NULL;
> +        buffer_zero_batch_task_destroy(p->dsa_batch_task);
> +        qemu_vfree(p->dsa_batch_task);
> +        p->dsa_batch_task = NULL;
>          p->packet_len = 0;
>          g_free(p->packet);
>          p->packet = NULL;
> @@ -675,13 +684,71 @@ int multifd_send_sync_main(QEMUFile *f)
>      return 0;
>  }
>  
> +static void set_page(MultiFDSendParams *p, bool zero_page, uint64_t offset)
> +{
> +    RAMBlock *rb = p->pages->block;
> +    if (zero_page) {
> +        p->zero[p->zero_num] = offset;
> +        p->zero_num++;
> +        ram_release_page(rb->idstr, offset);
> +    } else {
> +        p->normal[p->normal_num] = offset;
> +        p->normal_num++;
> +    }
> +}
> +
> +static void buffer_is_zero_use_cpu(MultiFDSendParams *p)
> +{
> +    const void **buf = (const void **)p->addr;
> +    assert(!migrate_use_main_zero_page());
> +    assert(!dsa_is_running());
> +
> +    for (int i = 0; i < p->pages->num; i++) {
> +        p->dsa_batch_task->results[i] = buffer_is_zero(buf[i], p->page_size);
> +    }
> +}
> +
> +static void buffer_is_zero_use_dsa(MultiFDSendParams *p)
> +{
> +    assert(!migrate_use_main_zero_page());
> +    assert(dsa_is_running());
> +
> +    buffer_is_zero_dsa_batch_async(p->dsa_batch_task,
> +                                   (const void **)p->addr,
> +                                   p->pages->num,
> +                                   p->page_size);
> +}
> +
> +static void multifd_zero_page_check(MultiFDSendParams *p)
> +{
> +    /* older qemu don't understand zero page on multifd channel */
> +    bool use_multifd_zero_page = !migrate_use_main_zero_page();
> +    bool use_multifd_dsa_accel = dsa_is_running();
> +
> +    RAMBlock *rb = p->pages->block;
> +
> +    for (int i = 0; i < p->pages->num; i++) {
> +        p->addr[i] = (ram_addr_t)(rb->host + p->pages->offset[i]);
> +    }
> +
> +    if (!use_multifd_zero_page || !use_multifd_dsa_accel) {
> +        buffer_is_zero_use_cpu(p);
> +    } else {
> +        buffer_is_zero_use_dsa(p);
> +    }
> +
> +    for (int i = 0; i < p->pages->num; i++) {
> +        uint64_t offset = p->pages->offset[i];
> +        bool zero_page = p->dsa_batch_task->results[i];
> +        set_page(p, zero_page, offset);
> +    }
> +}

You're moving existing (not really, but ok) code and adding dsa support
at the same time. The introduction of this function needs to be in a
separate patch. That would be a preliminary patch that isolates all of
the use_cpu code and a subsequent one that adds the use_dsa part.
diff mbox series

Patch

diff --git a/migration/multifd.c b/migration/multifd.c
index 452fb158b8..79fecbd3ae 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -13,6 +13,8 @@ 
 #include "qemu/osdep.h"
 #include "qemu/rcu.h"
 #include "qemu/cutils.h"
+#include "qemu/dsa.h"
+#include "qemu/memalign.h"
 #include "exec/target_page.h"
 #include "sysemu/sysemu.h"
 #include "exec/ramblock.h"
@@ -555,6 +557,8 @@  void multifd_save_cleanup(void)
             qemu_thread_join(&p->thread);
         }
     }
+    dsa_stop();
+    dsa_cleanup();
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
         Error *local_err = NULL;
@@ -571,6 +575,11 @@  void multifd_save_cleanup(void)
         p->name = NULL;
         multifd_pages_clear(p->pages);
         p->pages = NULL;
+        g_free(p->addr);
+        p->addr = NULL;
+        buffer_zero_batch_task_destroy(p->dsa_batch_task);
+        qemu_vfree(p->dsa_batch_task);
+        p->dsa_batch_task = NULL;
         p->packet_len = 0;
         g_free(p->packet);
         p->packet = NULL;
@@ -675,13 +684,71 @@  int multifd_send_sync_main(QEMUFile *f)
     return 0;
 }
 
+static void set_page(MultiFDSendParams *p, bool zero_page, uint64_t offset)
+{
+    RAMBlock *rb = p->pages->block;
+    if (zero_page) {
+        p->zero[p->zero_num] = offset;
+        p->zero_num++;
+        ram_release_page(rb->idstr, offset);
+    } else {
+        p->normal[p->normal_num] = offset;
+        p->normal_num++;
+    }
+}
+
+static void buffer_is_zero_use_cpu(MultiFDSendParams *p)
+{
+    const void **buf = (const void **)p->addr;
+    assert(!migrate_use_main_zero_page());
+    assert(!dsa_is_running());
+
+    for (int i = 0; i < p->pages->num; i++) {
+        p->dsa_batch_task->results[i] = buffer_is_zero(buf[i], p->page_size);
+    }
+}
+
+static void buffer_is_zero_use_dsa(MultiFDSendParams *p)
+{
+    assert(!migrate_use_main_zero_page());
+    assert(dsa_is_running());
+
+    buffer_is_zero_dsa_batch_async(p->dsa_batch_task,
+                                   (const void **)p->addr,
+                                   p->pages->num,
+                                   p->page_size);
+}
+
+static void multifd_zero_page_check(MultiFDSendParams *p)
+{
+    /* older qemu don't understand zero page on multifd channel */
+    bool use_multifd_zero_page = !migrate_use_main_zero_page();
+    bool use_multifd_dsa_accel = dsa_is_running();
+
+    RAMBlock *rb = p->pages->block;
+
+    for (int i = 0; i < p->pages->num; i++) {
+        p->addr[i] = (ram_addr_t)(rb->host + p->pages->offset[i]);
+    }
+
+    if (!use_multifd_zero_page || !use_multifd_dsa_accel) {
+        buffer_is_zero_use_cpu(p);
+    } else {
+        buffer_is_zero_use_dsa(p);
+    }
+
+    for (int i = 0; i < p->pages->num; i++) {
+        uint64_t offset = p->pages->offset[i];
+        bool zero_page = p->dsa_batch_task->results[i];
+        set_page(p, zero_page, offset);
+    }
+}
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
     MigrationThread *thread = NULL;
     Error *local_err = NULL;
-    /* older qemu don't understand zero page on multifd channel */
-    bool use_multifd_zero_page = !migrate_use_main_zero_page();
     int ret = 0;
     bool use_zero_copy_send = migrate_zero_copy_send();
 
@@ -707,7 +774,6 @@  static void *multifd_send_thread(void *opaque)
         qemu_mutex_lock(&p->mutex);
 
         if (p->pending_job) {
-            RAMBlock *rb = p->pages->block;
             uint64_t packet_num = p->packet_num;
             p->flags = 0;
             if (p->sync_needed) {
@@ -725,18 +791,7 @@  static void *multifd_send_thread(void *opaque)
                 p->iovs_num = 1;
             }
 
-            for (int i = 0; i < p->pages->num; i++) {
-                uint64_t offset = p->pages->offset[i];
-                if (use_multifd_zero_page &&
-                    buffer_is_zero(rb->host + offset, p->page_size)) {
-                    p->zero[p->zero_num] = offset;
-                    p->zero_num++;
-                    ram_release_page(rb->idstr, offset);
-                } else {
-                    p->normal[p->normal_num] = offset;
-                    p->normal_num++;
-                }
-            }
+            multifd_zero_page_check(p);
 
             if (p->normal_num) {
                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
@@ -958,11 +1013,15 @@  int multifd_save_setup(Error **errp)
     int thread_count;
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
     uint8_t i;
+    const char *dsa_parameter = migrate_multifd_dsa_accel();
 
     if (!migrate_multifd()) {
         return 0;
     }
 
+    dsa_init(dsa_parameter);
+    dsa_start();
+
     thread_count = migrate_multifd_channels();
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
@@ -981,6 +1040,10 @@  int multifd_save_setup(Error **errp)
         p->pending_job = 0;
         p->id = i;
         p->pages = multifd_pages_init(page_count);
+        p->addr = g_new0(ram_addr_t, page_count);
+        p->dsa_batch_task = 
+            (struct buffer_zero_batch_task *)qemu_memalign(64, sizeof(*p->dsa_batch_task));
+        buffer_zero_batch_task_init(p->dsa_batch_task, page_count);
         p->packet_len = sizeof(MultiFDPacket_t)
                       + sizeof(uint64_t) * page_count;
         p->packet = g_malloc0(p->packet_len);
@@ -1014,6 +1077,7 @@  int multifd_save_setup(Error **errp)
             return ret;
         }
     }
+
     return 0;
 }
 
@@ -1091,6 +1155,8 @@  void multifd_load_cleanup(void)
 
         qemu_thread_join(&p->thread);
     }
+    dsa_stop();
+    dsa_cleanup();
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
@@ -1225,6 +1291,7 @@  int multifd_load_setup(Error **errp)
     int thread_count;
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
     uint8_t i;
+    const char *dsa_parameter = migrate_multifd_dsa_accel();
 
     /*
      * Return successfully if multiFD recv state is already initialised
@@ -1234,6 +1301,9 @@  int multifd_load_setup(Error **errp)
         return 0;
     }
 
+    dsa_init(dsa_parameter);
+    dsa_start();
+
     thread_count = migrate_multifd_channels();
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
@@ -1270,6 +1340,7 @@  int multifd_load_setup(Error **errp)
             return ret;
         }
     }
+
     return 0;
 }
 
diff --git a/migration/multifd.h b/migration/multifd.h
index e8f90776bb..297b055e2b 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -114,6 +114,9 @@  typedef struct {
      * pending_job != 0 -> multifd_channel can use it.
      */
     MultiFDPages_t *pages;
+    /* Address of each pages in pages */
+    ram_addr_t *addr;
+    struct buffer_zero_batch_task *dsa_batch_task;
 
     /* thread local variables. No locking required */