@@ -327,6 +327,10 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
monitor_printf(mon, "%s: %u\n",
MigrationParameter_str(MIGRATION_PARAMETER_COMPRESS_THREADS),
params->compress_threads);
+ assert(params->has_compress_wait_thread);
+ monitor_printf(mon, "%s: %s\n",
+ MigrationParameter_str(MIGRATION_PARAMETER_COMPRESS_WAIT_THREAD),
+ params->compress_wait_thread ? "on" : "off");
assert(params->has_decompress_threads);
monitor_printf(mon, "%s: %u\n",
MigrationParameter_str(MIGRATION_PARAMETER_DECOMPRESS_THREADS),
@@ -1623,6 +1627,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
p->has_compress_threads = true;
visit_type_int(v, param, &p->compress_threads, &err);
break;
+ case MIGRATION_PARAMETER_COMPRESS_WAIT_THREAD:
+ p->has_compress_wait_thread = true;
+ visit_type_bool(v, param, &p->compress_wait_thread, &err);
+ break;
case MIGRATION_PARAMETER_DECOMPRESS_THREADS:
p->has_decompress_threads = true;
visit_type_int(v, param, &p->decompress_threads, &err);
@@ -671,6 +671,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
params->compress_level = s->parameters.compress_level;
params->has_compress_threads = true;
params->compress_threads = s->parameters.compress_threads;
+ params->has_compress_wait_thread = true;
+ params->compress_wait_thread = s->parameters.compress_wait_thread;
params->has_decompress_threads = true;
params->decompress_threads = s->parameters.decompress_threads;
params->has_cpu_throttle_initial = true;
@@ -1061,6 +1063,10 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
dest->compress_threads = params->compress_threads;
}
+ if (params->has_compress_wait_thread) {
+ dest->compress_wait_thread = params->compress_wait_thread;
+ }
+
if (params->has_decompress_threads) {
dest->decompress_threads = params->decompress_threads;
}
@@ -1126,6 +1132,10 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
s->parameters.compress_threads = params->compress_threads;
}
+ if (params->has_compress_wait_thread) {
+ s->parameters.compress_wait_thread = params->compress_wait_thread;
+ }
+
if (params->has_decompress_threads) {
s->parameters.decompress_threads = params->decompress_threads;
}
@@ -1871,6 +1881,15 @@ int migrate_compress_threads(void)
return s->parameters.compress_threads;
}
+int migrate_compress_wait_thread(void)
+{
+ MigrationState *s;
+
+ s = migrate_get_current();
+
+ return s->parameters.compress_wait_thread;
+}
+
int migrate_decompress_threads(void)
{
MigrationState *s;
@@ -3131,6 +3150,8 @@ static Property migration_properties[] = {
DEFINE_PROP_UINT8("x-compress-threads", MigrationState,
parameters.compress_threads,
DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT),
+ DEFINE_PROP_BOOL("x-compress-wait-thread", MigrationState,
+ parameters.compress_wait_thread, true),
DEFINE_PROP_UINT8("x-decompress-threads", MigrationState,
parameters.decompress_threads,
DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT),
@@ -271,6 +271,7 @@ bool migrate_use_return_path(void);
bool migrate_use_compression(void);
int migrate_compress_level(void);
int migrate_compress_threads(void);
+int migrate_compress_wait_thread(void);
int migrate_decompress_threads(void);
bool migrate_use_events(void);
bool migrate_postcopy_blocktime(void);
@@ -1889,30 +1889,34 @@ static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
ram_addr_t offset)
{
int idx, thread_count, bytes_xmit = -1, pages = -1;
+ bool wait = migrate_compress_wait_thread();
thread_count = migrate_compress_threads();
qemu_mutex_lock(&comp_done_lock);
- while (true) {
- for (idx = 0; idx < thread_count; idx++) {
- if (comp_param[idx].done) {
- comp_param[idx].done = false;
- bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
- qemu_mutex_lock(&comp_param[idx].mutex);
- set_compress_params(&comp_param[idx], block, offset);
- qemu_cond_signal(&comp_param[idx].cond);
- qemu_mutex_unlock(&comp_param[idx].mutex);
- pages = 1;
- ram_counters.normal++;
- ram_counters.transferred += bytes_xmit;
- break;
- }
- }
- if (pages > 0) {
+retry:
+ for (idx = 0; idx < thread_count; idx++) {
+ if (comp_param[idx].done) {
+ comp_param[idx].done = false;
+ bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
+ qemu_mutex_lock(&comp_param[idx].mutex);
+ set_compress_params(&comp_param[idx], block, offset);
+ qemu_cond_signal(&comp_param[idx].cond);
+ qemu_mutex_unlock(&comp_param[idx].mutex);
+ pages = 1;
+ ram_counters.normal++;
+ ram_counters.transferred += bytes_xmit;
break;
- } else {
- qemu_cond_wait(&comp_done_cond, &comp_done_lock);
}
}
+
+ /*
+ * wait for the free thread if the user specifies 'compress-wait-thread',
+ * otherwise we will post the page out in the main thread as normal page.
+ */
+ if (pages < 0 && wait) {
+ qemu_cond_wait(&comp_done_cond, &comp_done_lock);
+ goto retry;
+ }
qemu_mutex_unlock(&comp_done_lock);
return pages;
@@ -2226,7 +2230,10 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
* CPU resource.
*/
if (block == rs->last_sent_block && save_page_use_compression(rs)) {
- return compress_page_with_multi_thread(rs, block, offset);
+ res = compress_page_with_multi_thread(rs, block, offset);
+ if (res > 0) {
+ return res;
+ }
} else if (migrate_use_multifd()) {
return ram_save_multifd_page(rs, block, offset);
}
@@ -462,6 +462,11 @@
# @compress-threads: Set compression thread count to be used in live migration,
# the compression thread count is an integer between 1 and 255.
#
+# @compress-wait-thread: Controls behavior when all compression threads are
+# currently busy. If true (default), wait for a free
+# compression thread to become available; otherwise,
+# send the page uncompressed. (Since 3.1)
+#
# @decompress-threads: Set decompression thread count to be used in live
# migration, the decompression thread count is an integer between 1
# and 255. Usually, decompression is at least 4 times as fast as
@@ -526,11 +531,11 @@
# Since: 2.4
##
{ 'enum': 'MigrationParameter',
- 'data': ['compress-level', 'compress-threads', 'decompress-threads',
- 'cpu-throttle-initial', 'cpu-throttle-increment',
- 'tls-creds', 'tls-hostname', 'max-bandwidth',
- 'downtime-limit', 'x-checkpoint-delay', 'block-incremental',
- 'x-multifd-channels', 'x-multifd-page-count',
+ 'data': ['compress-level', 'compress-threads', 'compress-wait-thread',
+ 'decompress-threads', 'cpu-throttle-initial',
+ 'cpu-throttle-increment', 'tls-creds', 'tls-hostname',
+ 'max-bandwidth', 'downtime-limit', 'x-checkpoint-delay',
+ 'block-incremental', 'x-multifd-channels', 'x-multifd-page-count',
'xbzrle-cache-size', 'max-postcopy-bandwidth' ] }
##
@@ -540,6 +545,11 @@
#
# @compress-threads: compression thread count
#
+# @compress-wait-thread: Controls behavior when all compression threads are
+# currently busy. If true (default), wait for a free
+# compression thread to become available; otherwise,
+# send the page uncompressed. (Since 3.1)
+#
# @decompress-threads: decompression thread count
#
# @cpu-throttle-initial: Initial percentage of time guest cpus are
@@ -610,6 +620,7 @@
{ 'struct': 'MigrateSetParameters',
'data': { '*compress-level': 'int',
'*compress-threads': 'int',
+ '*compress-wait-thread': 'bool',
'*decompress-threads': 'int',
'*cpu-throttle-initial': 'int',
'*cpu-throttle-increment': 'int',
@@ -649,6 +660,11 @@
#
# @compress-threads: compression thread count
#
+# @compress-wait-thread: Controls behavior when all compression threads are
+# currently busy. If true (default), wait for a free
+# compression thread to become available; otherwise,
+# send the page uncompressed. (Since 3.1)
+#
# @decompress-threads: decompression thread count
#
# @cpu-throttle-initial: Initial percentage of time guest cpus are
@@ -714,6 +730,7 @@
{ 'struct': 'MigrationParameters',
'data': { '*compress-level': 'uint8',
'*compress-threads': 'uint8',
+ '*compress-wait-thread': 'bool',
'*decompress-threads': 'uint8',
'*cpu-throttle-initial': 'uint8',
'*cpu-throttle-increment': 'uint8',