@@ -658,8 +658,32 @@ uint64_t qemu_get_be64(QEMUFile *f)
return v;
}
-/* Compress size bytes of data start at p with specific compression
- * level and store the compressed data to the buffer of f.
+/* return the size after compression, or negative value on error */
+static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
+ const uint8_t *source, size_t source_len)
+{
+ int err;
+
+ err = deflateReset(stream);
+ if (err != Z_OK) {
+ return -1;
+ }
+
+ stream->avail_in = source_len;
+ stream->next_in = (uint8_t *)source;
+ stream->avail_out = dest_len;
+ stream->next_out = dest;
+
+ err = deflate(stream, Z_FINISH);
+ if (err != Z_STREAM_END) {
+ return -1;
+ }
+
+ return stream->next_out - dest;
+}
+
+/* Compress size bytes of data start at p and store the compressed
+ * data to the buffer of f.
*
* When f is not writable, return -1 if f has no space to save the
* compressed data.
@@ -667,9 +691,8 @@ uint64_t qemu_get_be64(QEMUFile *f)
* do fflush first, if f still has no space to save the compressed
* data, return -1.
*/
-
-ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
- int level)
+ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
+ const uint8_t *p, size_t size)
{
ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t);
@@ -683,8 +706,10 @@ ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
return -1;
}
}
- if (compress2(f->buf + f->buf_index + sizeof(int32_t), (uLongf *)&blen,
- (Bytef *)p, size, level) != Z_OK) {
+
+ blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t),
+ blen, p, size);
+ if (blen < 0) {
error_report("Compress Failed!");
return 0;
}
@@ -25,6 +25,8 @@
#ifndef MIGRATION_QEMU_FILE_H
#define MIGRATION_QEMU_FILE_H
+#include <zlib.h>
+
/* Read a chunk of data from a file at the given position. The pos argument
* can be ignored if the file is only be used for streaming. The number of
* bytes actually read should be returned.
@@ -132,8 +134,8 @@ bool qemu_file_is_writable(QEMUFile *f);
size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset);
size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size);
-ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
- int level);
+ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
+ const uint8_t *p, size_t size);
int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src);
/*
@@ -269,6 +269,7 @@ struct CompressParam {
QemuCond cond;
RAMBlock *block;
ram_addr_t offset;
+ z_stream stream;
};
typedef struct CompressParam CompressParam;
@@ -299,7 +300,7 @@ static QemuThread *decompress_threads;
static QemuMutex decomp_done_lock;
static QemuCond decomp_done_cond;
-static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
+static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
ram_addr_t offset);
static void *do_data_compress(void *opaque)
@@ -316,7 +317,7 @@ static void *do_data_compress(void *opaque)
param->block = NULL;
qemu_mutex_unlock(¶m->mutex);
- do_compress_ram_page(param->file, block, offset);
+ do_compress_ram_page(param->file, ¶m->stream, block, offset);
qemu_mutex_lock(&comp_done_lock);
param->done = true;
@@ -357,10 +358,20 @@ static void compress_threads_save_cleanup(void)
terminate_compression_threads();
thread_count = migrate_compress_threads();
for (i = 0; i < thread_count; i++) {
+ /*
+ * stream.opaque can be used to store private data, we use it
+ * as a indicator which shows if the thread is properly init'd
+ * or not
+ */
+ if (!comp_param[i].stream.opaque) {
+ break;
+ }
qemu_thread_join(compress_threads + i);
qemu_fclose(comp_param[i].file);
qemu_mutex_destroy(&comp_param[i].mutex);
qemu_cond_destroy(&comp_param[i].cond);
+ deflateEnd(&comp_param[i].stream);
+ comp_param[i].stream.opaque = NULL;
}
qemu_mutex_destroy(&comp_done_lock);
qemu_cond_destroy(&comp_done_cond);
@@ -370,12 +381,12 @@ static void compress_threads_save_cleanup(void)
comp_param = NULL;
}
-static void compress_threads_save_setup(void)
+static int compress_threads_save_setup(void)
{
int i, thread_count;
if (!migrate_use_compression()) {
- return;
+ return 0;
}
thread_count = migrate_compress_threads();
compress_threads = g_new0(QemuThread, thread_count);
@@ -383,6 +394,12 @@ static void compress_threads_save_setup(void)
qemu_cond_init(&comp_done_cond);
qemu_mutex_init(&comp_done_lock);
for (i = 0; i < thread_count; i++) {
+ if (deflateInit(&comp_param[i].stream,
+ migrate_compress_level()) != Z_OK) {
+ goto exit;
+ }
+ comp_param[i].stream.opaque = &comp_param[i];
+
/* comp_param[i].file is just used as a dummy buffer to save data,
* set its ops to empty.
*/
@@ -395,6 +412,11 @@ static void compress_threads_save_setup(void)
do_data_compress, comp_param + i,
QEMU_THREAD_JOINABLE);
}
+ return 0;
+
+exit:
+ compress_threads_save_cleanup();
+ return -1;
}
/* Multiple fd's */
@@ -1031,7 +1053,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
return pages;
}
-static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
+static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
ram_addr_t offset)
{
RAMState *rs = ram_state;
@@ -1040,8 +1062,7 @@ static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
bytes_sent = save_page_header(rs, f, block, offset |
RAM_SAVE_FLAG_COMPRESS_PAGE);
- blen = qemu_put_compression_data(f, p, TARGET_PAGE_SIZE,
- migrate_compress_level());
+ blen = qemu_put_compression_data(f, stream, p, TARGET_PAGE_SIZE);
if (blen < 0) {
bytes_sent = 0;
qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
@@ -2214,9 +2235,14 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
RAMState **rsp = opaque;
RAMBlock *block;
+ if (compress_threads_save_setup()) {
+ return -1;
+ }
+
/* migration has already setup the bitmap, reuse it. */
if (!migration_in_colo_state()) {
if (ram_init_all(rsp) != 0) {
+ compress_threads_save_cleanup();
return -1;
}
}
@@ -2236,7 +2262,6 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
}
rcu_read_unlock();
- compress_threads_save_setup();
ram_control_before_iterate(f, RAM_CONTROL_SETUP);
ram_control_after_iterate(f, RAM_CONTROL_SETUP);