@@ -263,6 +263,7 @@ typedef struct CompressParam CompressParam;
struct DecompressParam {
bool start;
+ bool done;
QemuMutex mutex;
QemuCond cond;
void *des;
@@ -287,6 +288,8 @@ static bool quit_comp_thread;
static bool quit_decomp_thread;
static DecompressParam *decomp_param;
static QemuThread *decompress_threads;
+static QemuMutex decomp_done_lock;
+static QemuCond decomp_done_cond;
static int do_compress_ram_page(CompressParam *param);
@@ -834,6 +837,7 @@ static inline void start_compression(CompressParam *param)
static inline void start_decompression(DecompressParam *param)
{
+ param->done = false;
qemu_mutex_lock(¶m->mutex);
param->start = true;
qemu_cond_signal(¶m->cond);
@@ -2193,19 +2197,24 @@ static void *do_data_decompress(void *opaque)
qemu_mutex_lock(¶m->mutex);
while (!param->start && !quit_decomp_thread) {
qemu_cond_wait(¶m->cond, ¶m->mutex);
+ }
+ if (!quit_decomp_thread) {
pagesize = TARGET_PAGE_SIZE;
- if (!quit_decomp_thread) {
- /* uncompress() will return failed in some case, especially
- * when the page is dirted when doing the compression, it's
- * not a problem because the dirty page will be retransferred
- * and uncompress() won't break the data in other pages.
- */
- uncompress((Bytef *)param->des, &pagesize,
- (const Bytef *)param->compbuf, param->len);
- }
- param->start = false;
+ /* uncompress() will return failed in some case, especially
+ * when the page is dirted when doing the compression, it's
+ * not a problem because the dirty page will be retransferred
+ * and uncompress() won't break the data in other pages.
+ */
+ uncompress((Bytef *)param->des, &pagesize,
+ (const Bytef *)param->compbuf, param->len);
}
+ param->start = false;
qemu_mutex_unlock(¶m->mutex);
+
+ qemu_mutex_lock(&decomp_done_lock);
+ param->done = true;
+ qemu_cond_signal(&decomp_done_cond);
+ qemu_mutex_unlock(&decomp_done_lock);
}
return NULL;
@@ -2219,10 +2228,13 @@ void migrate_decompress_threads_create(void)
decompress_threads = g_new0(QemuThread, thread_count);
decomp_param = g_new0(DecompressParam, thread_count);
quit_decomp_thread = false;
+ qemu_mutex_init(&decomp_done_lock);
+ qemu_cond_init(&decomp_done_cond);
for (i = 0; i < thread_count; i++) {
qemu_mutex_init(&decomp_param[i].mutex);
qemu_cond_init(&decomp_param[i].cond);
decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+ decomp_param[i].done = true;
qemu_thread_create(decompress_threads + i, "decompress",
do_data_decompress, decomp_param + i,
QEMU_THREAD_JOINABLE);
@@ -2258,9 +2270,10 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
int idx, thread_count;
thread_count = migrate_decompress_threads();
+ qemu_mutex_lock(&decomp_done_lock);
while (true) {
for (idx = 0; idx < thread_count; idx++) {
- if (!decomp_param[idx].start) {
+ if (decomp_param[idx].done) {
qemu_get_buffer(f, decomp_param[idx].compbuf, len);
decomp_param[idx].des = host;
decomp_param[idx].len = len;
@@ -2270,8 +2283,11 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
}
if (idx < thread_count) {
break;
+ } else {
+ qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
}
}
+ qemu_mutex_unlock(&decomp_done_lock);
}
/*