@@ -281,6 +281,7 @@ struct DecompressParam {
void *des;
uint8_t *compbuf;
int len;
+ z_stream stream;
};
typedef struct DecompressParam DecompressParam;
@@ -2526,6 +2527,31 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
}
}
+/* return the size after decompression, or negative value on error */
+static int
+qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
+ const uint8_t *source, size_t source_len)
+{
+ int err;
+
+ err = inflateReset(stream);
+ if (err != Z_OK) {
+ return -1;
+ }
+
+ stream->avail_in = source_len;
+ stream->next_in = (uint8_t *)source;
+ stream->avail_out = dest_len;
+ stream->next_out = dest;
+
+ err = inflate(stream, Z_NO_FLUSH);
+ if (err != Z_STREAM_END) {
+ return -1;
+ }
+
+ return stream->total_out;
+}
+
static void *do_data_decompress(void *opaque)
{
DecompressParam *param = opaque;
@@ -2542,13 +2568,13 @@ static void *do_data_decompress(void *opaque)
qemu_mutex_unlock(¶m->mutex);
pagesize = TARGET_PAGE_SIZE;
- /* uncompress() will return failed in some case, especially
- * when the page is dirted when doing the compression, it's
- * not a problem because the dirty page will be retransferred
+ /* qemu_uncompress_data() will return failed in some case,
+ * especially when the page is dirtied when doing the compression,
+ * it's not a problem because the dirty page will be retransferred
* and uncompress() won't break the data in other pages.
*/
- uncompress((Bytef *)des, &pagesize,
- (const Bytef *)param->compbuf, len);
+ qemu_uncompress_data(¶m->stream, des, pagesize, param->compbuf,
+ len);
qemu_mutex_lock(&decomp_done_lock);
param->done = true;
@@ -2583,30 +2609,6 @@ static void wait_for_decompress_done(void)
qemu_mutex_unlock(&decomp_done_lock);
}
-static void compress_threads_load_setup(void)
-{
- int i, thread_count;
-
- if (!migrate_use_compression()) {
- return;
- }
- thread_count = migrate_decompress_threads();
- decompress_threads = g_new0(QemuThread, thread_count);
- decomp_param = g_new0(DecompressParam, thread_count);
- qemu_mutex_init(&decomp_done_lock);
- qemu_cond_init(&decomp_done_cond);
- for (i = 0; i < thread_count; i++) {
- qemu_mutex_init(&decomp_param[i].mutex);
- qemu_cond_init(&decomp_param[i].cond);
- decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
- decomp_param[i].done = true;
- decomp_param[i].quit = false;
- qemu_thread_create(decompress_threads + i, "decompress",
- do_data_decompress, decomp_param + i,
- QEMU_THREAD_JOINABLE);
- }
-}
-
static void compress_threads_load_cleanup(void)
{
int i, thread_count;
@@ -2616,16 +2618,27 @@ static void compress_threads_load_cleanup(void)
}
thread_count = migrate_decompress_threads();
for (i = 0; i < thread_count; i++) {
+ /* see the comments in compress_threads_save_cleanup() */
+ if (!decomp_param[i].stream.opaque) {
+ break;
+ }
+
qemu_mutex_lock(&decomp_param[i].mutex);
decomp_param[i].quit = true;
qemu_cond_signal(&decomp_param[i].cond);
qemu_mutex_unlock(&decomp_param[i].mutex);
}
for (i = 0; i < thread_count; i++) {
+ if (!decomp_param[i].stream.opaque) {
+ break;
+ }
+
qemu_thread_join(decompress_threads + i);
qemu_mutex_destroy(&decomp_param[i].mutex);
qemu_cond_destroy(&decomp_param[i].cond);
g_free(decomp_param[i].compbuf);
+ inflateEnd(&decomp_param[i].stream);
+ decomp_param[i].stream.opaque = NULL;
}
g_free(decompress_threads);
g_free(decomp_param);
@@ -2633,6 +2646,40 @@ static void compress_threads_load_cleanup(void)
decomp_param = NULL;
}
+static int compress_threads_load_setup(void)
+{
+ int i, thread_count;
+
+ if (!migrate_use_compression()) {
+ return 0;
+ }
+
+ thread_count = migrate_decompress_threads();
+ decompress_threads = g_new0(QemuThread, thread_count);
+ decomp_param = g_new0(DecompressParam, thread_count);
+ qemu_mutex_init(&decomp_done_lock);
+ qemu_cond_init(&decomp_done_cond);
+ for (i = 0; i < thread_count; i++) {
+ if (inflateInit(&decomp_param[i].stream) != Z_OK) {
+ goto exit;
+ }
+ decomp_param[i].stream.opaque = &decomp_param[i];
+
+ qemu_mutex_init(&decomp_param[i].mutex);
+ qemu_cond_init(&decomp_param[i].cond);
+ decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+ decomp_param[i].done = true;
+ decomp_param[i].quit = false;
+ qemu_thread_create(decompress_threads + i, "decompress",
+ do_data_decompress, decomp_param + i,
+ QEMU_THREAD_JOINABLE);
+ }
+ return 0;
+exit:
+ compress_threads_load_cleanup();
+ return -1;
+}
+
static void decompress_data_with_multi_threads(QEMUFile *f,
void *host, int len)
{
@@ -2672,8 +2719,11 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
*/
static int ram_load_setup(QEMUFile *f, void *opaque)
{
+ if (compress_threads_load_setup()) {
+ return -1;
+ }
+
xbzrle_load_setup();
- compress_threads_load_setup();
ramblock_recv_map_init();
return 0;
}