Message ID | 20180327091043.30220-5-xiaoguangrong@tencent.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
On Tue, Mar 27, 2018 at 05:10:37PM +0800, guangrong.xiao@gmail.com wrote: [...] > -static int compress_threads_load_setup(void) > +static int compress_threads_load_setup(QEMUFile *f) > { > int i, thread_count; > > @@ -2665,6 +2685,7 @@ static int compress_threads_load_setup(void) > } > decomp_param[i].stream.opaque = &decomp_param[i]; > > + decomp_param[i].file = f; On the source side the error will be set via: qemu_file_set_error(migrate_get_current()->to_dst_file, blen); Maybe we can do similar things using migrate_incoming_get_current() to avoid caching the QEMUFile multiple times? I think both are not good since qemu_file_set_error() can be called by multiple threads, but it's only setting a fault value so maybe it's fine. Other than that it looks good to me. Thanks,
On 03/28/2018 05:59 PM, Peter Xu wrote: > On Tue, Mar 27, 2018 at 05:10:37PM +0800, guangrong.xiao@gmail.com wrote: > > [...] > >> -static int compress_threads_load_setup(void) >> +static int compress_threads_load_setup(QEMUFile *f) >> { >> int i, thread_count; >> >> @@ -2665,6 +2685,7 @@ static int compress_threads_load_setup(void) >> } >> decomp_param[i].stream.opaque = &decomp_param[i]; >> >> + decomp_param[i].file = f; > > On the source side the error will be set via: > > qemu_file_set_error(migrate_get_current()->to_dst_file, blen); > > Maybe we can do similar things using migrate_incoming_get_current() to > avoid caching the QEMUFile multiple times? > I have considered it, however, it can not work as the @file used by ram loader is not the file got from migrate_incoming_get_current() under some cases. For example, in colo_process_incoming_thread(), the file passed to qemu_loadvm_state() is a internal buffer and it is not easy to switch it to incoming file.
On Thu, Mar 29, 2018 at 11:51:03AM +0800, Xiao Guangrong wrote: > > > On 03/28/2018 05:59 PM, Peter Xu wrote: > > On Tue, Mar 27, 2018 at 05:10:37PM +0800, guangrong.xiao@gmail.com wrote: > > > > [...] > > > > > -static int compress_threads_load_setup(void) > > > +static int compress_threads_load_setup(QEMUFile *f) > > > { > > > int i, thread_count; > > > @@ -2665,6 +2685,7 @@ static int compress_threads_load_setup(void) > > > } > > > decomp_param[i].stream.opaque = &decomp_param[i]; > > > + decomp_param[i].file = f; > > > > On the source side the error will be set via: > > > > qemu_file_set_error(migrate_get_current()->to_dst_file, blen); > > > > Maybe we can do similar things using migrate_incoming_get_current() to > > avoid caching the QEMUFile multiple times? > > > > I have considered it, however, it can not work as the @file used by ram > loader is not the file got from migrate_incoming_get_current() under some > cases. > > For example, in colo_process_incoming_thread(), the file passed to > qemu_loadvm_state() is a internal buffer and it is not easy to switch it > to incoming file. I see. How about cache it in a global variable? We have these already: thread_count = migrate_decompress_threads(); decompress_threads = g_new0(QemuThread, thread_count); decomp_param = g_new0(DecompressParam, thread_count); ... IMHO we can add a new one too, at least we don't cache it multiple times (after all decomp_param[i]s are global variables too).
On 03/29/2018 12:25 PM, Peter Xu wrote: > On Thu, Mar 29, 2018 at 11:51:03AM +0800, Xiao Guangrong wrote: >> >> >> On 03/28/2018 05:59 PM, Peter Xu wrote: >>> On Tue, Mar 27, 2018 at 05:10:37PM +0800, guangrong.xiao@gmail.com wrote: >>> >>> [...] >>> >>>> -static int compress_threads_load_setup(void) >>>> +static int compress_threads_load_setup(QEMUFile *f) >>>> { >>>> int i, thread_count; >>>> @@ -2665,6 +2685,7 @@ static int compress_threads_load_setup(void) >>>> } >>>> decomp_param[i].stream.opaque = &decomp_param[i]; >>>> + decomp_param[i].file = f; >>> >>> On the source side the error will be set via: >>> >>> qemu_file_set_error(migrate_get_current()->to_dst_file, blen); >>> >>> Maybe we can do similar things using migrate_incoming_get_current() to >>> avoid caching the QEMUFile multiple times? >>> >> >> I have considered it, however, it can not work as the @file used by ram >> loader is not the file got from migrate_incoming_get_current() under some >> cases. >> >> For example, in colo_process_incoming_thread(), the file passed to >> qemu_loadvm_state() is a internal buffer and it is not easy to switch it >> to incoming file. > > I see. How about cache it in a global variable? We have these > already: > > thread_count = migrate_decompress_threads(); > decompress_threads = g_new0(QemuThread, thread_count); > decomp_param = g_new0(DecompressParam, thread_count); > ... > > IMHO we can add a new one too, at least we don't cache it multiple > times (after all decomp_param[i]s are global variables too). > Nice, that's good to me. Will add your Reviewed-by on this patch as well if you do not mind. :)
On Fri, Mar 30, 2018 at 11:11:27AM +0800, Xiao Guangrong wrote: > > > On 03/29/2018 12:25 PM, Peter Xu wrote: > > On Thu, Mar 29, 2018 at 11:51:03AM +0800, Xiao Guangrong wrote: > > > > > > > > > On 03/28/2018 05:59 PM, Peter Xu wrote: > > > > On Tue, Mar 27, 2018 at 05:10:37PM +0800, guangrong.xiao@gmail.com wrote: > > > > > > > > [...] > > > > > > > > > -static int compress_threads_load_setup(void) > > > > > +static int compress_threads_load_setup(QEMUFile *f) > > > > > { > > > > > int i, thread_count; > > > > > @@ -2665,6 +2685,7 @@ static int compress_threads_load_setup(void) > > > > > } > > > > > decomp_param[i].stream.opaque = &decomp_param[i]; > > > > > + decomp_param[i].file = f; > > > > > > > > On the source side the error will be set via: > > > > > > > > qemu_file_set_error(migrate_get_current()->to_dst_file, blen); > > > > > > > > Maybe we can do similar things using migrate_incoming_get_current() to > > > > avoid caching the QEMUFile multiple times? > > > > > > > > > > I have considered it, however, it can not work as the @file used by ram > > > loader is not the file got from migrate_incoming_get_current() under some > > > cases. > > > > > > For example, in colo_process_incoming_thread(), the file passed to > > > qemu_loadvm_state() is a internal buffer and it is not easy to switch it > > > to incoming file. > > > > I see. How about cache it in a global variable? We have these > > already: > > > > thread_count = migrate_decompress_threads(); > > decompress_threads = g_new0(QemuThread, thread_count); > > decomp_param = g_new0(DecompressParam, thread_count); > > ... > > > > IMHO we can add a new one too, at least we don't cache it multiple > > times (after all decomp_param[i]s are global variables too). > > > > Nice, that's good to me. Will add your Reviewed-by on this patch > as well if you do not mind. :) Yes, please. :) Thanks,
diff --git a/migration/qemu-file.c b/migration/qemu-file.c index e924cc23c5..a7614e8c28 100644 --- a/migration/qemu-file.c +++ b/migration/qemu-file.c @@ -710,9 +710,9 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t), blen, p, size); if (blen < 0) { - error_report("Compress Failed!"); - return 0; + return -1; } + qemu_put_be32(f, blen); if (f->ops->writev_buffer) { add_to_iovec(f, f->buf + f->buf_index, blen, false); diff --git a/migration/ram.c b/migration/ram.c index 6b699650ca..e85191c1cb 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -269,7 +269,10 @@ struct CompressParam { QemuCond cond; RAMBlock *block; ram_addr_t offset; + + /* internally used fields */ z_stream stream; + uint8_t *originbuf; }; typedef struct CompressParam CompressParam; @@ -278,6 +281,7 @@ struct DecompressParam { bool quit; QemuMutex mutex; QemuCond cond; + QEMUFile *file; void *des; uint8_t *compbuf; int len; @@ -302,7 +306,7 @@ static QemuMutex decomp_done_lock; static QemuCond decomp_done_cond; static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, - ram_addr_t offset); + ram_addr_t offset, uint8_t *source_buf); static void *do_data_compress(void *opaque) { @@ -318,7 +322,8 @@ static void *do_data_compress(void *opaque) param->block = NULL; qemu_mutex_unlock(¶m->mutex); - do_compress_ram_page(param->file, ¶m->stream, block, offset); + do_compress_ram_page(param->file, ¶m->stream, block, offset, + param->originbuf); qemu_mutex_lock(&comp_done_lock); param->done = true; @@ -372,6 +377,7 @@ static void compress_threads_save_cleanup(void) qemu_mutex_destroy(&comp_param[i].mutex); qemu_cond_destroy(&comp_param[i].cond); deflateEnd(&comp_param[i].stream); + g_free(comp_param[i].originbuf); comp_param[i].stream.opaque = NULL; } qemu_mutex_destroy(&comp_done_lock); @@ -395,8 +401,14 @@ static int compress_threads_save_setup(void) qemu_cond_init(&comp_done_cond); qemu_mutex_init(&comp_done_lock); for (i = 0; i < thread_count; i++) { + comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE); + if (!comp_param[i].originbuf) { + goto exit; + } + if (deflateInit(&comp_param[i].stream, migrate_compress_level()) != Z_OK) { + g_free(comp_param[i].originbuf); goto exit; } comp_param[i].stream.opaque = &comp_param[i]; @@ -1055,7 +1067,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage) } static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, - ram_addr_t offset) + ram_addr_t offset, uint8_t *source_buf) { RAMState *rs = ram_state; int bytes_sent, blen; @@ -1063,7 +1075,14 @@ static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, bytes_sent = save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE); - blen = qemu_put_compression_data(f, stream, p, TARGET_PAGE_SIZE); + + /* + * copy it to a internal buffer to avoid it being modified by VM + * so that we can catch up the error during compression and + * decompression + */ + memcpy(source_buf, p, TARGET_PAGE_SIZE); + blen = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE); if (blen < 0) { bytes_sent = 0; qemu_file_set_error(migrate_get_current()->to_dst_file, blen); @@ -2557,7 +2576,7 @@ static void *do_data_decompress(void *opaque) DecompressParam *param = opaque; unsigned long pagesize; uint8_t *des; - int len; + int len, ret; qemu_mutex_lock(¶m->mutex); while (!param->quit) { @@ -2568,13 +2587,13 @@ static void *do_data_decompress(void *opaque) qemu_mutex_unlock(¶m->mutex); pagesize = TARGET_PAGE_SIZE; - /* qemu_uncompress_data() will return failed in some case, - * especially when the page is dirtied when doing the compression, - * it's not a problem because the dirty page will be retransferred - * and uncompress() won't break the data in other pages. - */ - qemu_uncompress_data(¶m->stream, des, pagesize, param->compbuf, - len); + + ret = qemu_uncompress_data(¶m->stream, des, pagesize, + param->compbuf, len); + if (ret < 0) { + error_report("decompress data failed"); + qemu_file_set_error(param->file, ret); + } qemu_mutex_lock(&decomp_done_lock); param->done = true; @@ -2591,12 +2610,12 @@ static void *do_data_decompress(void *opaque) return NULL; } -static void wait_for_decompress_done(void) +static int wait_for_decompress_done(QEMUFile *f) { int idx, thread_count; if (!migrate_use_compression()) { - return; + return 0; } thread_count = migrate_decompress_threads(); @@ -2607,6 +2626,7 @@ static void wait_for_decompress_done(void) } } qemu_mutex_unlock(&decomp_done_lock); + return qemu_file_get_error(f); } static void compress_threads_load_cleanup(void) @@ -2646,7 +2666,7 @@ static void compress_threads_load_cleanup(void) decomp_param = NULL; } -static int compress_threads_load_setup(void) +static int compress_threads_load_setup(QEMUFile *f) { int i, thread_count; @@ -2665,6 +2685,7 @@ static int compress_threads_load_setup(void) } decomp_param[i].stream.opaque = &decomp_param[i]; + decomp_param[i].file = f; qemu_mutex_init(&decomp_param[i].mutex); qemu_cond_init(&decomp_param[i].cond); decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); @@ -2719,7 +2740,7 @@ static void decompress_data_with_multi_threads(QEMUFile *f, */ static int ram_load_setup(QEMUFile *f, void *opaque) { - if (compress_threads_load_setup()) { + if (compress_threads_load_setup(f)) { return -1; } @@ -3074,7 +3095,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) } } - wait_for_decompress_done(); + ret |= wait_for_decompress_done(f); rcu_read_unlock(); trace_ram_load_complete(ret, seq_iter); return ret;