@@ -710,9 +710,9 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t),
blen, p, size);
if (blen < 0) {
- error_report("Compress Failed!");
- return 0;
+ return -1;
}
+
qemu_put_be32(f, blen);
if (f->ops->writev_buffer) {
add_to_iovec(f, f->buf + f->buf_index, blen, false);
@@ -269,7 +269,10 @@ struct CompressParam {
QemuCond cond;
RAMBlock *block;
ram_addr_t offset;
+
+ /* internally used fields */
z_stream stream;
+ uint8_t *originbuf;
};
typedef struct CompressParam CompressParam;
@@ -296,13 +299,14 @@ static QemuCond comp_done_cond;
/* The empty QEMUFileOps will be used by file in CompressParam */
static const QEMUFileOps empty_ops = { };
+static QEMUFile *decomp_file;
static DecompressParam *decomp_param;
static QemuThread *decompress_threads;
static QemuMutex decomp_done_lock;
static QemuCond decomp_done_cond;
static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
- ram_addr_t offset);
+ ram_addr_t offset, uint8_t *source_buf);
static void *do_data_compress(void *opaque)
{
@@ -318,7 +322,8 @@ static void *do_data_compress(void *opaque)
param->block = NULL;
qemu_mutex_unlock(¶m->mutex);
- do_compress_ram_page(param->file, ¶m->stream, block, offset);
+ do_compress_ram_page(param->file, ¶m->stream, block, offset,
+ param->originbuf);
qemu_mutex_lock(&comp_done_lock);
param->done = true;
@@ -370,6 +375,7 @@ static void compress_threads_save_cleanup(void)
qemu_mutex_destroy(&comp_param[i].mutex);
qemu_cond_destroy(&comp_param[i].cond);
deflateEnd(&comp_param[i].stream);
+ g_free(comp_param[i].originbuf);
qemu_fclose(comp_param[i].file);
comp_param[i].file = NULL;
}
@@ -394,8 +400,14 @@ static int compress_threads_save_setup(void)
qemu_cond_init(&comp_done_cond);
qemu_mutex_init(&comp_done_lock);
for (i = 0; i < thread_count; i++) {
+ comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
+ if (!comp_param[i].originbuf) {
+ goto exit;
+ }
+
if (deflateInit(&comp_param[i].stream,
migrate_compress_level()) != Z_OK) {
+ g_free(comp_param[i].originbuf);
goto exit;
}
@@ -1053,7 +1065,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
}
static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
- ram_addr_t offset)
+ ram_addr_t offset, uint8_t *source_buf)
{
RAMState *rs = ram_state;
int bytes_sent, blen;
@@ -1061,7 +1073,14 @@ static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
bytes_sent = save_page_header(rs, f, block, offset |
RAM_SAVE_FLAG_COMPRESS_PAGE);
- blen = qemu_put_compression_data(f, stream, p, TARGET_PAGE_SIZE);
+
+ /*
+ * copy it to a internal buffer to avoid it being modified by VM
+ * so that we can catch up the error during compression and
+ * decompression
+ */
+ memcpy(source_buf, p, TARGET_PAGE_SIZE);
+ blen = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
if (blen < 0) {
bytes_sent = 0;
qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
@@ -2555,7 +2574,7 @@ static void *do_data_decompress(void *opaque)
DecompressParam *param = opaque;
unsigned long pagesize;
uint8_t *des;
- int len;
+ int len, ret;
qemu_mutex_lock(¶m->mutex);
while (!param->quit) {
@@ -2566,13 +2585,13 @@ static void *do_data_decompress(void *opaque)
qemu_mutex_unlock(¶m->mutex);
pagesize = TARGET_PAGE_SIZE;
- /* qemu_uncompress_data() will return failed in some case,
- * especially when the page is dirtied when doing the compression,
- * it's not a problem because the dirty page will be retransferred
- * and uncompress() won't break the data in other pages.
- */
- qemu_uncompress_data(¶m->stream, des, pagesize, param->compbuf,
- len);
+
+ ret = qemu_uncompress_data(¶m->stream, des, pagesize,
+ param->compbuf, len);
+ if (ret < 0) {
+ error_report("decompress data failed");
+ qemu_file_set_error(decomp_file, ret);
+ }
qemu_mutex_lock(&decomp_done_lock);
param->done = true;
@@ -2589,12 +2608,12 @@ static void *do_data_decompress(void *opaque)
return NULL;
}
-static void wait_for_decompress_done(void)
+static int wait_for_decompress_done(void)
{
int idx, thread_count;
if (!migrate_use_compression()) {
- return;
+ return 0;
}
thread_count = migrate_decompress_threads();
@@ -2605,6 +2624,7 @@ static void wait_for_decompress_done(void)
}
}
qemu_mutex_unlock(&decomp_done_lock);
+ return qemu_file_get_error(decomp_file);
}
static void compress_threads_load_cleanup(void)
@@ -2645,9 +2665,10 @@ static void compress_threads_load_cleanup(void)
g_free(decomp_param);
decompress_threads = NULL;
decomp_param = NULL;
+ decomp_file = NULL;
}
-static int compress_threads_load_setup(void)
+static int compress_threads_load_setup(QEMUFile *f)
{
int i, thread_count;
@@ -2660,6 +2681,7 @@ static int compress_threads_load_setup(void)
decomp_param = g_new0(DecompressParam, thread_count);
qemu_mutex_init(&decomp_done_lock);
qemu_cond_init(&decomp_done_cond);
+ decomp_file = f;
for (i = 0; i < thread_count; i++) {
if (inflateInit(&decomp_param[i].stream) != Z_OK) {
goto exit;
@@ -2719,7 +2741,7 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
*/
static int ram_load_setup(QEMUFile *f, void *opaque)
{
- if (compress_threads_load_setup()) {
+ if (compress_threads_load_setup(f)) {
return -1;
}
@@ -3074,7 +3096,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
}
}
- wait_for_decompress_done();
+ ret |= wait_for_decompress_done();
rcu_read_unlock();
trace_ram_load_complete(ret, seq_iter);
return ret;