Message ID | 20181122072028.22819-4-xiaoguangrong@tencent.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | migration: improve multithreads | expand |
* guangrong.xiao@gmail.com (guangrong.xiao@gmail.com) wrote: > From: Xiao Guangrong <xiaoguangrong@tencent.com> > > Adapt the compression code to the threaded workqueue > > Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com> > --- > migration/ram.c | 308 ++++++++++++++++++++------------------------------------ > 1 file changed, 110 insertions(+), 198 deletions(-) > > diff --git a/migration/ram.c b/migration/ram.c > index 7e7deec4d8..254c08f27b 100644 > --- a/migration/ram.c > +++ b/migration/ram.c > @@ -57,6 +57,7 @@ > #include "qemu/uuid.h" > #include "savevm.h" > #include "qemu/iov.h" > +#include "qemu/threaded-workqueue.h" > > /***********************************************************/ > /* ram save/restore */ > @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus; > > CompressionStats compression_counters; > > -struct CompressParam { > - bool done; > - bool quit; > - bool zero_page; > - QEMUFile *file; > - QemuMutex mutex; > - QemuCond cond; > - RAMBlock *block; > - ram_addr_t offset; > - > - /* internally used fields */ > - z_stream stream; > - uint8_t *originbuf; > -}; > -typedef struct CompressParam CompressParam; > - > struct DecompressParam { > bool done; > bool quit; > @@ -377,15 +362,6 @@ struct DecompressParam { > }; > typedef struct DecompressParam DecompressParam; > > -static CompressParam *comp_param; > -static QemuThread *compress_threads; > -/* comp_done_cond is used to wake up the migration thread when > - * one of the compression threads has finished the compression. > - * comp_done_lock is used to co-work with comp_done_cond. > - */ > -static QemuMutex comp_done_lock; > -static QemuCond comp_done_cond; > -/* The empty QEMUFileOps will be used by file in CompressParam */ > static const QEMUFileOps empty_ops = { }; > > static QEMUFile *decomp_file; > @@ -394,125 +370,6 @@ static QemuThread *decompress_threads; > static QemuMutex decomp_done_lock; > static QemuCond decomp_done_cond; > > -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, > - ram_addr_t offset, uint8_t *source_buf); > - > -static void *do_data_compress(void *opaque) > -{ > - CompressParam *param = opaque; > - RAMBlock *block; > - ram_addr_t offset; > - bool zero_page; > - > - qemu_mutex_lock(¶m->mutex); > - while (!param->quit) { > - if (param->block) { > - block = param->block; > - offset = param->offset; > - param->block = NULL; > - qemu_mutex_unlock(¶m->mutex); > - > - zero_page = do_compress_ram_page(param->file, ¶m->stream, > - block, offset, param->originbuf); > - > - qemu_mutex_lock(&comp_done_lock); > - param->done = true; > - param->zero_page = zero_page; > - qemu_cond_signal(&comp_done_cond); > - qemu_mutex_unlock(&comp_done_lock); > - > - qemu_mutex_lock(¶m->mutex); > - } else { > - qemu_cond_wait(¶m->cond, ¶m->mutex); > - } > - } > - qemu_mutex_unlock(¶m->mutex); > - > - return NULL; > -} > - > -static void compress_threads_save_cleanup(void) > -{ > - int i, thread_count; > - > - if (!migrate_use_compression() || !comp_param) { > - return; > - } > - > - thread_count = migrate_compress_threads(); > - for (i = 0; i < thread_count; i++) { > - /* > - * we use it as a indicator which shows if the thread is > - * properly init'd or not > - */ > - if (!comp_param[i].file) { > - break; > - } > - > - qemu_mutex_lock(&comp_param[i].mutex); > - comp_param[i].quit = true; > - qemu_cond_signal(&comp_param[i].cond); > - qemu_mutex_unlock(&comp_param[i].mutex); > - > - qemu_thread_join(compress_threads + i); > - qemu_mutex_destroy(&comp_param[i].mutex); > - qemu_cond_destroy(&comp_param[i].cond); > - deflateEnd(&comp_param[i].stream); > - g_free(comp_param[i].originbuf); > - qemu_fclose(comp_param[i].file); > - comp_param[i].file = NULL; > - } > - qemu_mutex_destroy(&comp_done_lock); > - qemu_cond_destroy(&comp_done_cond); > - g_free(compress_threads); > - g_free(comp_param); > - compress_threads = NULL; > - comp_param = NULL; > -} > - > -static int compress_threads_save_setup(void) > -{ > - int i, thread_count; > - > - if (!migrate_use_compression()) { > - return 0; > - } > - thread_count = migrate_compress_threads(); > - compress_threads = g_new0(QemuThread, thread_count); > - comp_param = g_new0(CompressParam, thread_count); > - qemu_cond_init(&comp_done_cond); > - qemu_mutex_init(&comp_done_lock); > - for (i = 0; i < thread_count; i++) { > - comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE); > - if (!comp_param[i].originbuf) { > - goto exit; > - } > - > - if (deflateInit(&comp_param[i].stream, > - migrate_compress_level()) != Z_OK) { > - g_free(comp_param[i].originbuf); > - goto exit; > - } > - > - /* comp_param[i].file is just used as a dummy buffer to save data, > - * set its ops to empty. > - */ > - comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops); > - comp_param[i].done = true; > - comp_param[i].quit = false; > - qemu_mutex_init(&comp_param[i].mutex); > - qemu_cond_init(&comp_param[i].cond); > - qemu_thread_create(compress_threads + i, "compress", > - do_data_compress, comp_param + i, > - QEMU_THREAD_JOINABLE); > - } > - return 0; > - > -exit: > - compress_threads_save_cleanup(); > - return -1; > -} > - > /* Multiple fd's */ > > #define MULTIFD_MAGIC 0x11223344U > @@ -1909,12 +1766,25 @@ exit: > return zero_page; > } > > +struct CompressData { > + /* filled by migration thread.*/ > + RAMBlock *block; > + ram_addr_t offset; > + > + /* filled by compress thread. */ > + QEMUFile *file; > + z_stream stream; > + uint8_t *originbuf; > + bool zero_page; > +}; > +typedef struct CompressData CompressData; > + > static void > -update_compress_thread_counts(const CompressParam *param, int bytes_xmit) > +update_compress_thread_counts(CompressData *cd, int bytes_xmit) Keep the const? > { > ram_counters.transferred += bytes_xmit; > > - if (param->zero_page) { > + if (cd->zero_page) { > ram_counters.duplicate++; > return; > } > @@ -1924,81 +1794,123 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit) > compression_counters.pages++; > } > > +static int compress_thread_data_init(void *request) > +{ > + CompressData *cd = request; > + > + cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE); > + if (!cd->originbuf) { > + return -1; > + } > + > + if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) { > + g_free(cd->originbuf); > + return -1; > + } Please print errors if you fail in any case so we can easily tell what happened. > + cd->file = qemu_fopen_ops(NULL, &empty_ops); > + return 0; > +} > + > +static void compress_thread_data_fini(void *request) > +{ > + CompressData *cd = request; > + > + qemu_fclose(cd->file); > + deflateEnd(&cd->stream); > + g_free(cd->originbuf); > +} > + > +static void compress_thread_data_handler(void *request) > +{ > + CompressData *cd = request; > + > + /* > + * if compression fails, it will be indicated by > + * migrate_get_current()->to_dst_file. > + */ > + cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block, > + cd->offset, cd->originbuf); > +} > + > +static void compress_thread_data_done(void *request) > +{ > + CompressData *cd = request; > + RAMState *rs = ram_state; > + int bytes_xmit; > + > + bytes_xmit = qemu_put_qemu_file(rs->f, cd->file); > + update_compress_thread_counts(cd, bytes_xmit); > +} > + > +static const ThreadedWorkqueueOps compress_ops = { > + .thread_request_init = compress_thread_data_init, > + .thread_request_uninit = compress_thread_data_fini, > + .thread_request_handler = compress_thread_data_handler, > + .thread_request_done = compress_thread_data_done, > + .request_size = sizeof(CompressData), > +}; > + > +static Threads *compress_threads; > + > static bool save_page_use_compression(RAMState *rs); > > static void flush_compressed_data(RAMState *rs) > { > - int idx, len, thread_count; > - > if (!save_page_use_compression(rs)) { > return; > } > - thread_count = migrate_compress_threads(); > > - qemu_mutex_lock(&comp_done_lock); > - for (idx = 0; idx < thread_count; idx++) { > - while (!comp_param[idx].done) { > - qemu_cond_wait(&comp_done_cond, &comp_done_lock); > - } > - } > - qemu_mutex_unlock(&comp_done_lock); > + threaded_workqueue_wait_for_requests(compress_threads); > +} > > - for (idx = 0; idx < thread_count; idx++) { > - qemu_mutex_lock(&comp_param[idx].mutex); > - if (!comp_param[idx].quit) { > - len = qemu_put_qemu_file(rs->f, comp_param[idx].file); > - /* > - * it's safe to fetch zero_page without holding comp_done_lock > - * as there is no further request submitted to the thread, > - * i.e, the thread should be waiting for a request at this point. > - */ > - update_compress_thread_counts(&comp_param[idx], len); > - } > - qemu_mutex_unlock(&comp_param[idx].mutex); > +static void compress_threads_save_cleanup(void) > +{ > + if (!compress_threads) { > + return; > } > + > + threaded_workqueue_destroy(compress_threads); > + compress_threads = NULL; > } > > -static inline void set_compress_params(CompressParam *param, RAMBlock *block, > - ram_addr_t offset) > +static int compress_threads_save_setup(void) > { > - param->block = block; > - param->offset = offset; > + if (!migrate_use_compression()) { > + return 0; > + } > + > + compress_threads = threaded_workqueue_create("compress", > + migrate_compress_threads(), > + DEFAULT_THREAD_REQUEST_NR, &compress_ops); > + return compress_threads ? 0 : -1; > } > > static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block, > ram_addr_t offset) > { > - int idx, thread_count, bytes_xmit = -1, pages = -1; > + CompressData *cd; > bool wait = migrate_compress_wait_thread(); > > - thread_count = migrate_compress_threads(); > - qemu_mutex_lock(&comp_done_lock); > retry: > - for (idx = 0; idx < thread_count; idx++) { > - if (comp_param[idx].done) { > - comp_param[idx].done = false; > - bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file); > - qemu_mutex_lock(&comp_param[idx].mutex); > - set_compress_params(&comp_param[idx], block, offset); > - qemu_cond_signal(&comp_param[idx].cond); > - qemu_mutex_unlock(&comp_param[idx].mutex); > - pages = 1; > - update_compress_thread_counts(&comp_param[idx], bytes_xmit); > - break; > + cd = threaded_workqueue_get_request(compress_threads); > + if (!cd) { > + /* > + * wait for the free thread if the user specifies > + * 'compress-wait-thread', otherwise we will post > + * the page out in the main thread as normal page. > + */ > + if (wait) { > + cpu_relax(); > + goto retry; Is there nothing better we can use to wait without eating CPU time? Dave > } > - } > > - /* > - * wait for the free thread if the user specifies 'compress-wait-thread', > - * otherwise we will post the page out in the main thread as normal page. > - */ > - if (pages < 0 && wait) { > - qemu_cond_wait(&comp_done_cond, &comp_done_lock); > - goto retry; > - } > - qemu_mutex_unlock(&comp_done_lock); > - > - return pages; > + return -1; > + } > + cd->block = block; > + cd->offset = offset; > + threaded_workqueue_submit_request(compress_threads, cd); > + return 1; > } > > /** > -- > 2.14.5 > -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
On 23/11/18 19:17, Dr. David Alan Gilbert wrote: > * guangrong.xiao@gmail.com (guangrong.xiao@gmail.com) wrote: >> From: Xiao Guangrong <xiaoguangrong@tencent.com> >> >> Adapt the compression code to the threaded workqueue >> >> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com> >> --- >> migration/ram.c | 308 ++++++++++++++++++++------------------------------------ >> 1 file changed, 110 insertions(+), 198 deletions(-) >> >> diff --git a/migration/ram.c b/migration/ram.c >> index 7e7deec4d8..254c08f27b 100644 >> --- a/migration/ram.c >> +++ b/migration/ram.c >> @@ -57,6 +57,7 @@ >> #include "qemu/uuid.h" >> #include "savevm.h" >> #include "qemu/iov.h" >> +#include "qemu/threaded-workqueue.h" >> >> /***********************************************************/ >> /* ram save/restore */ >> @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus; >> >> CompressionStats compression_counters; >> >> -struct CompressParam { >> - bool done; >> - bool quit; >> - bool zero_page; >> - QEMUFile *file; >> - QemuMutex mutex; >> - QemuCond cond; >> - RAMBlock *block; >> - ram_addr_t offset; >> - >> - /* internally used fields */ >> - z_stream stream; >> - uint8_t *originbuf; >> -}; >> -typedef struct CompressParam CompressParam; >> - >> struct DecompressParam { >> bool done; >> bool quit; >> @@ -377,15 +362,6 @@ struct DecompressParam { >> }; >> typedef struct DecompressParam DecompressParam; >> >> -static CompressParam *comp_param; >> -static QemuThread *compress_threads; >> -/* comp_done_cond is used to wake up the migration thread when >> - * one of the compression threads has finished the compression. >> - * comp_done_lock is used to co-work with comp_done_cond. >> - */ >> -static QemuMutex comp_done_lock; >> -static QemuCond comp_done_cond; >> -/* The empty QEMUFileOps will be used by file in CompressParam */ >> static const QEMUFileOps empty_ops = { }; >> >> static QEMUFile *decomp_file; >> @@ -394,125 +370,6 @@ static QemuThread *decompress_threads; >> static QemuMutex decomp_done_lock; >> static QemuCond decomp_done_cond; >> >> -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, >> - ram_addr_t offset, uint8_t *source_buf); >> - >> -static void *do_data_compress(void *opaque) >> -{ >> - CompressParam *param = opaque; >> - RAMBlock *block; >> - ram_addr_t offset; >> - bool zero_page; >> - >> - qemu_mutex_lock(¶m->mutex); >> - while (!param->quit) { >> - if (param->block) { >> - block = param->block; >> - offset = param->offset; >> - param->block = NULL; >> - qemu_mutex_unlock(¶m->mutex); >> - >> - zero_page = do_compress_ram_page(param->file, ¶m->stream, >> - block, offset, param->originbuf); >> - >> - qemu_mutex_lock(&comp_done_lock); >> - param->done = true; >> - param->zero_page = zero_page; >> - qemu_cond_signal(&comp_done_cond); >> - qemu_mutex_unlock(&comp_done_lock); >> - >> - qemu_mutex_lock(¶m->mutex); >> - } else { >> - qemu_cond_wait(¶m->cond, ¶m->mutex); >> - } >> - } >> - qemu_mutex_unlock(¶m->mutex); >> - >> - return NULL; >> -} >> - >> -static void compress_threads_save_cleanup(void) >> -{ >> - int i, thread_count; >> - >> - if (!migrate_use_compression() || !comp_param) { >> - return; >> - } >> - >> - thread_count = migrate_compress_threads(); >> - for (i = 0; i < thread_count; i++) { >> - /* >> - * we use it as a indicator which shows if the thread is >> - * properly init'd or not >> - */ >> - if (!comp_param[i].file) { >> - break; >> - } >> - >> - qemu_mutex_lock(&comp_param[i].mutex); >> - comp_param[i].quit = true; >> - qemu_cond_signal(&comp_param[i].cond); >> - qemu_mutex_unlock(&comp_param[i].mutex); >> - >> - qemu_thread_join(compress_threads + i); >> - qemu_mutex_destroy(&comp_param[i].mutex); >> - qemu_cond_destroy(&comp_param[i].cond); >> - deflateEnd(&comp_param[i].stream); >> - g_free(comp_param[i].originbuf); >> - qemu_fclose(comp_param[i].file); >> - comp_param[i].file = NULL; >> - } >> - qemu_mutex_destroy(&comp_done_lock); >> - qemu_cond_destroy(&comp_done_cond); >> - g_free(compress_threads); >> - g_free(comp_param); >> - compress_threads = NULL; >> - comp_param = NULL; >> -} >> - >> -static int compress_threads_save_setup(void) >> -{ >> - int i, thread_count; >> - >> - if (!migrate_use_compression()) { >> - return 0; >> - } >> - thread_count = migrate_compress_threads(); >> - compress_threads = g_new0(QemuThread, thread_count); >> - comp_param = g_new0(CompressParam, thread_count); >> - qemu_cond_init(&comp_done_cond); >> - qemu_mutex_init(&comp_done_lock); >> - for (i = 0; i < thread_count; i++) { >> - comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE); >> - if (!comp_param[i].originbuf) { >> - goto exit; >> - } >> - >> - if (deflateInit(&comp_param[i].stream, >> - migrate_compress_level()) != Z_OK) { >> - g_free(comp_param[i].originbuf); >> - goto exit; >> - } >> - >> - /* comp_param[i].file is just used as a dummy buffer to save data, >> - * set its ops to empty. >> - */ >> - comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops); >> - comp_param[i].done = true; >> - comp_param[i].quit = false; >> - qemu_mutex_init(&comp_param[i].mutex); >> - qemu_cond_init(&comp_param[i].cond); >> - qemu_thread_create(compress_threads + i, "compress", >> - do_data_compress, comp_param + i, >> - QEMU_THREAD_JOINABLE); >> - } >> - return 0; >> - >> -exit: >> - compress_threads_save_cleanup(); >> - return -1; >> -} >> - >> /* Multiple fd's */ >> >> #define MULTIFD_MAGIC 0x11223344U >> @@ -1909,12 +1766,25 @@ exit: >> return zero_page; >> } >> >> +struct CompressData { >> + /* filled by migration thread.*/ >> + RAMBlock *block; >> + ram_addr_t offset; >> + >> + /* filled by compress thread. */ >> + QEMUFile *file; >> + z_stream stream; >> + uint8_t *originbuf; >> + bool zero_page; >> +}; >> +typedef struct CompressData CompressData; >> + >> static void >> -update_compress_thread_counts(const CompressParam *param, int bytes_xmit) >> +update_compress_thread_counts(CompressData *cd, int bytes_xmit) > > Keep the const? >> { >> ram_counters.transferred += bytes_xmit; >> >> - if (param->zero_page) { >> + if (cd->zero_page) { >> ram_counters.duplicate++; >> return; >> } >> @@ -1924,81 +1794,123 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit) >> compression_counters.pages++; >> } >> >> +static int compress_thread_data_init(void *request) >> +{ >> + CompressData *cd = request; >> + >> + cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE); >> + if (!cd->originbuf) { >> + return -1; >> + } >> + >> + if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) { >> + g_free(cd->originbuf); >> + return -1; >> + } > > Please print errors if you fail in any case so we can easily tell what > happened. > >> + cd->file = qemu_fopen_ops(NULL, &empty_ops); >> + return 0; >> +} >> + >> +static void compress_thread_data_fini(void *request) >> +{ >> + CompressData *cd = request; >> + >> + qemu_fclose(cd->file); >> + deflateEnd(&cd->stream); >> + g_free(cd->originbuf); >> +} >> + >> +static void compress_thread_data_handler(void *request) >> +{ >> + CompressData *cd = request; >> + >> + /* >> + * if compression fails, it will be indicated by >> + * migrate_get_current()->to_dst_file. >> + */ >> + cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block, >> + cd->offset, cd->originbuf); >> +} >> + >> +static void compress_thread_data_done(void *request) >> +{ >> + CompressData *cd = request; >> + RAMState *rs = ram_state; >> + int bytes_xmit; >> + >> + bytes_xmit = qemu_put_qemu_file(rs->f, cd->file); >> + update_compress_thread_counts(cd, bytes_xmit); >> +} >> + >> +static const ThreadedWorkqueueOps compress_ops = { >> + .thread_request_init = compress_thread_data_init, >> + .thread_request_uninit = compress_thread_data_fini, >> + .thread_request_handler = compress_thread_data_handler, >> + .thread_request_done = compress_thread_data_done, >> + .request_size = sizeof(CompressData), >> +}; >> + >> +static Threads *compress_threads; >> + >> static bool save_page_use_compression(RAMState *rs); >> >> static void flush_compressed_data(RAMState *rs) >> { >> - int idx, len, thread_count; >> - >> if (!save_page_use_compression(rs)) { >> return; >> } >> - thread_count = migrate_compress_threads(); >> >> - qemu_mutex_lock(&comp_done_lock); >> - for (idx = 0; idx < thread_count; idx++) { >> - while (!comp_param[idx].done) { >> - qemu_cond_wait(&comp_done_cond, &comp_done_lock); >> - } >> - } >> - qemu_mutex_unlock(&comp_done_lock); >> + threaded_workqueue_wait_for_requests(compress_threads); >> +} >> >> - for (idx = 0; idx < thread_count; idx++) { >> - qemu_mutex_lock(&comp_param[idx].mutex); >> - if (!comp_param[idx].quit) { >> - len = qemu_put_qemu_file(rs->f, comp_param[idx].file); >> - /* >> - * it's safe to fetch zero_page without holding comp_done_lock >> - * as there is no further request submitted to the thread, >> - * i.e, the thread should be waiting for a request at this point. >> - */ >> - update_compress_thread_counts(&comp_param[idx], len); >> - } >> - qemu_mutex_unlock(&comp_param[idx].mutex); >> +static void compress_threads_save_cleanup(void) >> +{ >> + if (!compress_threads) { >> + return; >> } >> + >> + threaded_workqueue_destroy(compress_threads); >> + compress_threads = NULL; >> } >> >> -static inline void set_compress_params(CompressParam *param, RAMBlock *block, >> - ram_addr_t offset) >> +static int compress_threads_save_setup(void) >> { >> - param->block = block; >> - param->offset = offset; >> + if (!migrate_use_compression()) { >> + return 0; >> + } >> + >> + compress_threads = threaded_workqueue_create("compress", >> + migrate_compress_threads(), >> + DEFAULT_THREAD_REQUEST_NR, &compress_ops); >> + return compress_threads ? 0 : -1; >> } >> >> static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block, >> ram_addr_t offset) >> { >> - int idx, thread_count, bytes_xmit = -1, pages = -1; >> + CompressData *cd; >> bool wait = migrate_compress_wait_thread(); >> >> - thread_count = migrate_compress_threads(); >> - qemu_mutex_lock(&comp_done_lock); >> retry: >> - for (idx = 0; idx < thread_count; idx++) { >> - if (comp_param[idx].done) { >> - comp_param[idx].done = false; >> - bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file); >> - qemu_mutex_lock(&comp_param[idx].mutex); >> - set_compress_params(&comp_param[idx], block, offset); >> - qemu_cond_signal(&comp_param[idx].cond); >> - qemu_mutex_unlock(&comp_param[idx].mutex); >> - pages = 1; >> - update_compress_thread_counts(&comp_param[idx], bytes_xmit); >> - break; >> + cd = threaded_workqueue_get_request(compress_threads); >> + if (!cd) { >> + /* >> + * wait for the free thread if the user specifies >> + * 'compress-wait-thread', otherwise we will post >> + * the page out in the main thread as normal page. >> + */ >> + if (wait) { >> + cpu_relax(); >> + goto retry; > > Is there nothing better we can use to wait without eating CPU time? There is a mechanism to wait without eating CPU time in the data structure, but it makes sense to busy wait. There are 4 threads in the workqueue, so you have to compare 1/4th of the time spent compressing a page, with the trip into the kernel to wake you up. You're adding 20% CPU usage, but I'm not surprised it's worthwhile. Paolo
* Paolo Bonzini (pbonzini@redhat.com) wrote: > On 23/11/18 19:17, Dr. David Alan Gilbert wrote: > > * guangrong.xiao@gmail.com (guangrong.xiao@gmail.com) wrote: > >> From: Xiao Guangrong <xiaoguangrong@tencent.com> > >> > >> Adapt the compression code to the threaded workqueue > >> > >> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com> > >> --- > >> migration/ram.c | 308 ++++++++++++++++++++------------------------------------ > >> 1 file changed, 110 insertions(+), 198 deletions(-) > >> > >> diff --git a/migration/ram.c b/migration/ram.c > >> index 7e7deec4d8..254c08f27b 100644 > >> --- a/migration/ram.c > >> +++ b/migration/ram.c > >> @@ -57,6 +57,7 @@ > >> #include "qemu/uuid.h" > >> #include "savevm.h" > >> #include "qemu/iov.h" > >> +#include "qemu/threaded-workqueue.h" > >> > >> /***********************************************************/ > >> /* ram save/restore */ > >> @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus; > >> > >> CompressionStats compression_counters; > >> > >> -struct CompressParam { > >> - bool done; > >> - bool quit; > >> - bool zero_page; > >> - QEMUFile *file; > >> - QemuMutex mutex; > >> - QemuCond cond; > >> - RAMBlock *block; > >> - ram_addr_t offset; > >> - > >> - /* internally used fields */ > >> - z_stream stream; > >> - uint8_t *originbuf; > >> -}; > >> -typedef struct CompressParam CompressParam; > >> - > >> struct DecompressParam { > >> bool done; > >> bool quit; > >> @@ -377,15 +362,6 @@ struct DecompressParam { > >> }; > >> typedef struct DecompressParam DecompressParam; > >> > >> -static CompressParam *comp_param; > >> -static QemuThread *compress_threads; > >> -/* comp_done_cond is used to wake up the migration thread when > >> - * one of the compression threads has finished the compression. > >> - * comp_done_lock is used to co-work with comp_done_cond. > >> - */ > >> -static QemuMutex comp_done_lock; > >> -static QemuCond comp_done_cond; > >> -/* The empty QEMUFileOps will be used by file in CompressParam */ > >> static const QEMUFileOps empty_ops = { }; > >> > >> static QEMUFile *decomp_file; > >> @@ -394,125 +370,6 @@ static QemuThread *decompress_threads; > >> static QemuMutex decomp_done_lock; > >> static QemuCond decomp_done_cond; > >> > >> -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, > >> - ram_addr_t offset, uint8_t *source_buf); > >> - > >> -static void *do_data_compress(void *opaque) > >> -{ > >> - CompressParam *param = opaque; > >> - RAMBlock *block; > >> - ram_addr_t offset; > >> - bool zero_page; > >> - > >> - qemu_mutex_lock(¶m->mutex); > >> - while (!param->quit) { > >> - if (param->block) { > >> - block = param->block; > >> - offset = param->offset; > >> - param->block = NULL; > >> - qemu_mutex_unlock(¶m->mutex); > >> - > >> - zero_page = do_compress_ram_page(param->file, ¶m->stream, > >> - block, offset, param->originbuf); > >> - > >> - qemu_mutex_lock(&comp_done_lock); > >> - param->done = true; > >> - param->zero_page = zero_page; > >> - qemu_cond_signal(&comp_done_cond); > >> - qemu_mutex_unlock(&comp_done_lock); > >> - > >> - qemu_mutex_lock(¶m->mutex); > >> - } else { > >> - qemu_cond_wait(¶m->cond, ¶m->mutex); > >> - } > >> - } > >> - qemu_mutex_unlock(¶m->mutex); > >> - > >> - return NULL; > >> -} > >> - > >> -static void compress_threads_save_cleanup(void) > >> -{ > >> - int i, thread_count; > >> - > >> - if (!migrate_use_compression() || !comp_param) { > >> - return; > >> - } > >> - > >> - thread_count = migrate_compress_threads(); > >> - for (i = 0; i < thread_count; i++) { > >> - /* > >> - * we use it as a indicator which shows if the thread is > >> - * properly init'd or not > >> - */ > >> - if (!comp_param[i].file) { > >> - break; > >> - } > >> - > >> - qemu_mutex_lock(&comp_param[i].mutex); > >> - comp_param[i].quit = true; > >> - qemu_cond_signal(&comp_param[i].cond); > >> - qemu_mutex_unlock(&comp_param[i].mutex); > >> - > >> - qemu_thread_join(compress_threads + i); > >> - qemu_mutex_destroy(&comp_param[i].mutex); > >> - qemu_cond_destroy(&comp_param[i].cond); > >> - deflateEnd(&comp_param[i].stream); > >> - g_free(comp_param[i].originbuf); > >> - qemu_fclose(comp_param[i].file); > >> - comp_param[i].file = NULL; > >> - } > >> - qemu_mutex_destroy(&comp_done_lock); > >> - qemu_cond_destroy(&comp_done_cond); > >> - g_free(compress_threads); > >> - g_free(comp_param); > >> - compress_threads = NULL; > >> - comp_param = NULL; > >> -} > >> - > >> -static int compress_threads_save_setup(void) > >> -{ > >> - int i, thread_count; > >> - > >> - if (!migrate_use_compression()) { > >> - return 0; > >> - } > >> - thread_count = migrate_compress_threads(); > >> - compress_threads = g_new0(QemuThread, thread_count); > >> - comp_param = g_new0(CompressParam, thread_count); > >> - qemu_cond_init(&comp_done_cond); > >> - qemu_mutex_init(&comp_done_lock); > >> - for (i = 0; i < thread_count; i++) { > >> - comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE); > >> - if (!comp_param[i].originbuf) { > >> - goto exit; > >> - } > >> - > >> - if (deflateInit(&comp_param[i].stream, > >> - migrate_compress_level()) != Z_OK) { > >> - g_free(comp_param[i].originbuf); > >> - goto exit; > >> - } > >> - > >> - /* comp_param[i].file is just used as a dummy buffer to save data, > >> - * set its ops to empty. > >> - */ > >> - comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops); > >> - comp_param[i].done = true; > >> - comp_param[i].quit = false; > >> - qemu_mutex_init(&comp_param[i].mutex); > >> - qemu_cond_init(&comp_param[i].cond); > >> - qemu_thread_create(compress_threads + i, "compress", > >> - do_data_compress, comp_param + i, > >> - QEMU_THREAD_JOINABLE); > >> - } > >> - return 0; > >> - > >> -exit: > >> - compress_threads_save_cleanup(); > >> - return -1; > >> -} > >> - > >> /* Multiple fd's */ > >> > >> #define MULTIFD_MAGIC 0x11223344U > >> @@ -1909,12 +1766,25 @@ exit: > >> return zero_page; > >> } > >> > >> +struct CompressData { > >> + /* filled by migration thread.*/ > >> + RAMBlock *block; > >> + ram_addr_t offset; > >> + > >> + /* filled by compress thread. */ > >> + QEMUFile *file; > >> + z_stream stream; > >> + uint8_t *originbuf; > >> + bool zero_page; > >> +}; > >> +typedef struct CompressData CompressData; > >> + > >> static void > >> -update_compress_thread_counts(const CompressParam *param, int bytes_xmit) > >> +update_compress_thread_counts(CompressData *cd, int bytes_xmit) > > > > Keep the const? > >> { > >> ram_counters.transferred += bytes_xmit; > >> > >> - if (param->zero_page) { > >> + if (cd->zero_page) { > >> ram_counters.duplicate++; > >> return; > >> } > >> @@ -1924,81 +1794,123 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit) > >> compression_counters.pages++; > >> } > >> > >> +static int compress_thread_data_init(void *request) > >> +{ > >> + CompressData *cd = request; > >> + > >> + cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE); > >> + if (!cd->originbuf) { > >> + return -1; > >> + } > >> + > >> + if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) { > >> + g_free(cd->originbuf); > >> + return -1; > >> + } > > > > Please print errors if you fail in any case so we can easily tell what > > happened. > > > >> + cd->file = qemu_fopen_ops(NULL, &empty_ops); > >> + return 0; > >> +} > >> + > >> +static void compress_thread_data_fini(void *request) > >> +{ > >> + CompressData *cd = request; > >> + > >> + qemu_fclose(cd->file); > >> + deflateEnd(&cd->stream); > >> + g_free(cd->originbuf); > >> +} > >> + > >> +static void compress_thread_data_handler(void *request) > >> +{ > >> + CompressData *cd = request; > >> + > >> + /* > >> + * if compression fails, it will be indicated by > >> + * migrate_get_current()->to_dst_file. > >> + */ > >> + cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block, > >> + cd->offset, cd->originbuf); > >> +} > >> + > >> +static void compress_thread_data_done(void *request) > >> +{ > >> + CompressData *cd = request; > >> + RAMState *rs = ram_state; > >> + int bytes_xmit; > >> + > >> + bytes_xmit = qemu_put_qemu_file(rs->f, cd->file); > >> + update_compress_thread_counts(cd, bytes_xmit); > >> +} > >> + > >> +static const ThreadedWorkqueueOps compress_ops = { > >> + .thread_request_init = compress_thread_data_init, > >> + .thread_request_uninit = compress_thread_data_fini, > >> + .thread_request_handler = compress_thread_data_handler, > >> + .thread_request_done = compress_thread_data_done, > >> + .request_size = sizeof(CompressData), > >> +}; > >> + > >> +static Threads *compress_threads; > >> + > >> static bool save_page_use_compression(RAMState *rs); > >> > >> static void flush_compressed_data(RAMState *rs) > >> { > >> - int idx, len, thread_count; > >> - > >> if (!save_page_use_compression(rs)) { > >> return; > >> } > >> - thread_count = migrate_compress_threads(); > >> > >> - qemu_mutex_lock(&comp_done_lock); > >> - for (idx = 0; idx < thread_count; idx++) { > >> - while (!comp_param[idx].done) { > >> - qemu_cond_wait(&comp_done_cond, &comp_done_lock); > >> - } > >> - } > >> - qemu_mutex_unlock(&comp_done_lock); > >> + threaded_workqueue_wait_for_requests(compress_threads); > >> +} > >> > >> - for (idx = 0; idx < thread_count; idx++) { > >> - qemu_mutex_lock(&comp_param[idx].mutex); > >> - if (!comp_param[idx].quit) { > >> - len = qemu_put_qemu_file(rs->f, comp_param[idx].file); > >> - /* > >> - * it's safe to fetch zero_page without holding comp_done_lock > >> - * as there is no further request submitted to the thread, > >> - * i.e, the thread should be waiting for a request at this point. > >> - */ > >> - update_compress_thread_counts(&comp_param[idx], len); > >> - } > >> - qemu_mutex_unlock(&comp_param[idx].mutex); > >> +static void compress_threads_save_cleanup(void) > >> +{ > >> + if (!compress_threads) { > >> + return; > >> } > >> + > >> + threaded_workqueue_destroy(compress_threads); > >> + compress_threads = NULL; > >> } > >> > >> -static inline void set_compress_params(CompressParam *param, RAMBlock *block, > >> - ram_addr_t offset) > >> +static int compress_threads_save_setup(void) > >> { > >> - param->block = block; > >> - param->offset = offset; > >> + if (!migrate_use_compression()) { > >> + return 0; > >> + } > >> + > >> + compress_threads = threaded_workqueue_create("compress", > >> + migrate_compress_threads(), > >> + DEFAULT_THREAD_REQUEST_NR, &compress_ops); > >> + return compress_threads ? 0 : -1; > >> } > >> > >> static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block, > >> ram_addr_t offset) > >> { > >> - int idx, thread_count, bytes_xmit = -1, pages = -1; > >> + CompressData *cd; > >> bool wait = migrate_compress_wait_thread(); > >> > >> - thread_count = migrate_compress_threads(); > >> - qemu_mutex_lock(&comp_done_lock); > >> retry: > >> - for (idx = 0; idx < thread_count; idx++) { > >> - if (comp_param[idx].done) { > >> - comp_param[idx].done = false; > >> - bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file); > >> - qemu_mutex_lock(&comp_param[idx].mutex); > >> - set_compress_params(&comp_param[idx], block, offset); > >> - qemu_cond_signal(&comp_param[idx].cond); > >> - qemu_mutex_unlock(&comp_param[idx].mutex); > >> - pages = 1; > >> - update_compress_thread_counts(&comp_param[idx], bytes_xmit); > >> - break; > >> + cd = threaded_workqueue_get_request(compress_threads); > >> + if (!cd) { > >> + /* > >> + * wait for the free thread if the user specifies > >> + * 'compress-wait-thread', otherwise we will post > >> + * the page out in the main thread as normal page. > >> + */ > >> + if (wait) { > >> + cpu_relax(); > >> + goto retry; > > > > Is there nothing better we can use to wait without eating CPU time? > > There is a mechanism to wait without eating CPU time in the data > structure, but it makes sense to busy wait. There are 4 threads in the > workqueue, so you have to compare 1/4th of the time spent compressing a > page, with the trip into the kernel to wake you up. You're adding 20% > CPU usage, but I'm not surprised it's worthwhile. Hmm OK; in that case it does at least need a comment because it's a bit odd, and we should watch out how that scales - I guess it's less of an overhead the more threads you use. Dave > Paolo > -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
On 11/24/18 2:29 AM, Dr. David Alan Gilbert wrote: >>>> static void >>>> -update_compress_thread_counts(const CompressParam *param, int bytes_xmit) >>>> +update_compress_thread_counts(CompressData *cd, int bytes_xmit) >>> >>> Keep the const? Yes, indeed. Will correct it in the next version. >>>> + if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) { >>>> + g_free(cd->originbuf); >>>> + return -1; >>>> + } >>> >>> Please print errors if you fail in any case so we can easily tell what >>> happened. Sure, will do. >>>> + if (wait) { >>>> + cpu_relax(); >>>> + goto retry; >>> >>> Is there nothing better we can use to wait without eating CPU time? >> >> There is a mechanism to wait without eating CPU time in the data >> structure, but it makes sense to busy wait. There are 4 threads in the >> workqueue, so you have to compare 1/4th of the time spent compressing a >> page, with the trip into the kernel to wake you up. You're adding 20% >> CPU usage, but I'm not surprised it's worthwhile. > > Hmm OK; in that case it does at least need a comment because it's a bit > odd, and we should watch out how that scales - I guess it's less of > an overhead the more threads you use. > Sure, will add some comments to explain the purpose.
diff --git a/migration/ram.c b/migration/ram.c index 7e7deec4d8..254c08f27b 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -57,6 +57,7 @@ #include "qemu/uuid.h" #include "savevm.h" #include "qemu/iov.h" +#include "qemu/threaded-workqueue.h" /***********************************************************/ /* ram save/restore */ @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus; CompressionStats compression_counters; -struct CompressParam { - bool done; - bool quit; - bool zero_page; - QEMUFile *file; - QemuMutex mutex; - QemuCond cond; - RAMBlock *block; - ram_addr_t offset; - - /* internally used fields */ - z_stream stream; - uint8_t *originbuf; -}; -typedef struct CompressParam CompressParam; - struct DecompressParam { bool done; bool quit; @@ -377,15 +362,6 @@ struct DecompressParam { }; typedef struct DecompressParam DecompressParam; -static CompressParam *comp_param; -static QemuThread *compress_threads; -/* comp_done_cond is used to wake up the migration thread when - * one of the compression threads has finished the compression. - * comp_done_lock is used to co-work with comp_done_cond. - */ -static QemuMutex comp_done_lock; -static QemuCond comp_done_cond; -/* The empty QEMUFileOps will be used by file in CompressParam */ static const QEMUFileOps empty_ops = { }; static QEMUFile *decomp_file; @@ -394,125 +370,6 @@ static QemuThread *decompress_threads; static QemuMutex decomp_done_lock; static QemuCond decomp_done_cond; -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, - ram_addr_t offset, uint8_t *source_buf); - -static void *do_data_compress(void *opaque) -{ - CompressParam *param = opaque; - RAMBlock *block; - ram_addr_t offset; - bool zero_page; - - qemu_mutex_lock(¶m->mutex); - while (!param->quit) { - if (param->block) { - block = param->block; - offset = param->offset; - param->block = NULL; - qemu_mutex_unlock(¶m->mutex); - - zero_page = do_compress_ram_page(param->file, ¶m->stream, - block, offset, param->originbuf); - - qemu_mutex_lock(&comp_done_lock); - param->done = true; - param->zero_page = zero_page; - qemu_cond_signal(&comp_done_cond); - qemu_mutex_unlock(&comp_done_lock); - - qemu_mutex_lock(¶m->mutex); - } else { - qemu_cond_wait(¶m->cond, ¶m->mutex); - } - } - qemu_mutex_unlock(¶m->mutex); - - return NULL; -} - -static void compress_threads_save_cleanup(void) -{ - int i, thread_count; - - if (!migrate_use_compression() || !comp_param) { - return; - } - - thread_count = migrate_compress_threads(); - for (i = 0; i < thread_count; i++) { - /* - * we use it as a indicator which shows if the thread is - * properly init'd or not - */ - if (!comp_param[i].file) { - break; - } - - qemu_mutex_lock(&comp_param[i].mutex); - comp_param[i].quit = true; - qemu_cond_signal(&comp_param[i].cond); - qemu_mutex_unlock(&comp_param[i].mutex); - - qemu_thread_join(compress_threads + i); - qemu_mutex_destroy(&comp_param[i].mutex); - qemu_cond_destroy(&comp_param[i].cond); - deflateEnd(&comp_param[i].stream); - g_free(comp_param[i].originbuf); - qemu_fclose(comp_param[i].file); - comp_param[i].file = NULL; - } - qemu_mutex_destroy(&comp_done_lock); - qemu_cond_destroy(&comp_done_cond); - g_free(compress_threads); - g_free(comp_param); - compress_threads = NULL; - comp_param = NULL; -} - -static int compress_threads_save_setup(void) -{ - int i, thread_count; - - if (!migrate_use_compression()) { - return 0; - } - thread_count = migrate_compress_threads(); - compress_threads = g_new0(QemuThread, thread_count); - comp_param = g_new0(CompressParam, thread_count); - qemu_cond_init(&comp_done_cond); - qemu_mutex_init(&comp_done_lock); - for (i = 0; i < thread_count; i++) { - comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE); - if (!comp_param[i].originbuf) { - goto exit; - } - - if (deflateInit(&comp_param[i].stream, - migrate_compress_level()) != Z_OK) { - g_free(comp_param[i].originbuf); - goto exit; - } - - /* comp_param[i].file is just used as a dummy buffer to save data, - * set its ops to empty. - */ - comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops); - comp_param[i].done = true; - comp_param[i].quit = false; - qemu_mutex_init(&comp_param[i].mutex); - qemu_cond_init(&comp_param[i].cond); - qemu_thread_create(compress_threads + i, "compress", - do_data_compress, comp_param + i, - QEMU_THREAD_JOINABLE); - } - return 0; - -exit: - compress_threads_save_cleanup(); - return -1; -} - /* Multiple fd's */ #define MULTIFD_MAGIC 0x11223344U @@ -1909,12 +1766,25 @@ exit: return zero_page; } +struct CompressData { + /* filled by migration thread.*/ + RAMBlock *block; + ram_addr_t offset; + + /* filled by compress thread. */ + QEMUFile *file; + z_stream stream; + uint8_t *originbuf; + bool zero_page; +}; +typedef struct CompressData CompressData; + static void -update_compress_thread_counts(const CompressParam *param, int bytes_xmit) +update_compress_thread_counts(CompressData *cd, int bytes_xmit) { ram_counters.transferred += bytes_xmit; - if (param->zero_page) { + if (cd->zero_page) { ram_counters.duplicate++; return; } @@ -1924,81 +1794,123 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit) compression_counters.pages++; } +static int compress_thread_data_init(void *request) +{ + CompressData *cd = request; + + cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE); + if (!cd->originbuf) { + return -1; + } + + if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) { + g_free(cd->originbuf); + return -1; + } + + cd->file = qemu_fopen_ops(NULL, &empty_ops); + return 0; +} + +static void compress_thread_data_fini(void *request) +{ + CompressData *cd = request; + + qemu_fclose(cd->file); + deflateEnd(&cd->stream); + g_free(cd->originbuf); +} + +static void compress_thread_data_handler(void *request) +{ + CompressData *cd = request; + + /* + * if compression fails, it will be indicated by + * migrate_get_current()->to_dst_file. + */ + cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block, + cd->offset, cd->originbuf); +} + +static void compress_thread_data_done(void *request) +{ + CompressData *cd = request; + RAMState *rs = ram_state; + int bytes_xmit; + + bytes_xmit = qemu_put_qemu_file(rs->f, cd->file); + update_compress_thread_counts(cd, bytes_xmit); +} + +static const ThreadedWorkqueueOps compress_ops = { + .thread_request_init = compress_thread_data_init, + .thread_request_uninit = compress_thread_data_fini, + .thread_request_handler = compress_thread_data_handler, + .thread_request_done = compress_thread_data_done, + .request_size = sizeof(CompressData), +}; + +static Threads *compress_threads; + static bool save_page_use_compression(RAMState *rs); static void flush_compressed_data(RAMState *rs) { - int idx, len, thread_count; - if (!save_page_use_compression(rs)) { return; } - thread_count = migrate_compress_threads(); - qemu_mutex_lock(&comp_done_lock); - for (idx = 0; idx < thread_count; idx++) { - while (!comp_param[idx].done) { - qemu_cond_wait(&comp_done_cond, &comp_done_lock); - } - } - qemu_mutex_unlock(&comp_done_lock); + threaded_workqueue_wait_for_requests(compress_threads); +} - for (idx = 0; idx < thread_count; idx++) { - qemu_mutex_lock(&comp_param[idx].mutex); - if (!comp_param[idx].quit) { - len = qemu_put_qemu_file(rs->f, comp_param[idx].file); - /* - * it's safe to fetch zero_page without holding comp_done_lock - * as there is no further request submitted to the thread, - * i.e, the thread should be waiting for a request at this point. - */ - update_compress_thread_counts(&comp_param[idx], len); - } - qemu_mutex_unlock(&comp_param[idx].mutex); +static void compress_threads_save_cleanup(void) +{ + if (!compress_threads) { + return; } + + threaded_workqueue_destroy(compress_threads); + compress_threads = NULL; } -static inline void set_compress_params(CompressParam *param, RAMBlock *block, - ram_addr_t offset) +static int compress_threads_save_setup(void) { - param->block = block; - param->offset = offset; + if (!migrate_use_compression()) { + return 0; + } + + compress_threads = threaded_workqueue_create("compress", + migrate_compress_threads(), + DEFAULT_THREAD_REQUEST_NR, &compress_ops); + return compress_threads ? 0 : -1; } static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block, ram_addr_t offset) { - int idx, thread_count, bytes_xmit = -1, pages = -1; + CompressData *cd; bool wait = migrate_compress_wait_thread(); - thread_count = migrate_compress_threads(); - qemu_mutex_lock(&comp_done_lock); retry: - for (idx = 0; idx < thread_count; idx++) { - if (comp_param[idx].done) { - comp_param[idx].done = false; - bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file); - qemu_mutex_lock(&comp_param[idx].mutex); - set_compress_params(&comp_param[idx], block, offset); - qemu_cond_signal(&comp_param[idx].cond); - qemu_mutex_unlock(&comp_param[idx].mutex); - pages = 1; - update_compress_thread_counts(&comp_param[idx], bytes_xmit); - break; + cd = threaded_workqueue_get_request(compress_threads); + if (!cd) { + /* + * wait for the free thread if the user specifies + * 'compress-wait-thread', otherwise we will post + * the page out in the main thread as normal page. + */ + if (wait) { + cpu_relax(); + goto retry; } - } - /* - * wait for the free thread if the user specifies 'compress-wait-thread', - * otherwise we will post the page out in the main thread as normal page. - */ - if (pages < 0 && wait) { - qemu_cond_wait(&comp_done_cond, &comp_done_lock); - goto retry; - } - qemu_mutex_unlock(&comp_done_lock); - - return pages; + return -1; + } + cd->block = block; + cd->offset = offset; + threaded_workqueue_submit_request(compress_threads, cd); + return 1; } /**