@@ -282,6 +282,8 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict)
info->compression->compressed_size);
monitor_printf(mon, "compression rate: %0.2f\n",
info->compression->compression_rate);
+ monitor_printf(mon, "compress-no-wait-weight: %"PRIu64"\n",
+ info->compression->compress_no_wait_weight);
}
if (info->has_cpu_throttle_percentage) {
@@ -344,6 +346,11 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
monitor_printf(mon, "%s: %s\n",
MigrationParameter_str(MIGRATION_PARAMETER_COMPRESS_WAIT_THREAD),
params->compress_wait_thread ? "on" : "off");
+ assert(params->has_compress_wait_thread_adaptive);
+ monitor_printf(mon, "%s: %s\n",
+ MigrationParameter_str(
+ MIGRATION_PARAMETER_COMPRESS_WAIT_THREAD_ADAPTIVE),
+ params->compress_wait_thread_adaptive ? "on" : "off");
assert(params->has_decompress_threads);
monitor_printf(mon, "%s: %u\n",
MigrationParameter_str(MIGRATION_PARAMETER_DECOMPRESS_THREADS),
@@ -1676,6 +1683,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
p->has_compress_wait_thread = true;
visit_type_bool(v, param, &p->compress_wait_thread, &err);
break;
+ case MIGRATION_PARAMETER_COMPRESS_WAIT_THREAD_ADAPTIVE:
+ p->has_compress_wait_thread_adaptive = true;
+ visit_type_bool(v, param, &p->compress_wait_thread_adaptive, &err);
+ break;
case MIGRATION_PARAMETER_DECOMPRESS_THREADS:
p->has_decompress_threads = true;
visit_type_int(v, param, &p->decompress_threads, &err);
@@ -706,6 +706,9 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
params->compress_threads = s->parameters.compress_threads;
params->has_compress_wait_thread = true;
params->compress_wait_thread = s->parameters.compress_wait_thread;
+ params->has_compress_wait_thread_adaptive = true;
+ params->compress_wait_thread_adaptive =
+ s->parameters.compress_wait_thread_adaptive;
params->has_decompress_threads = true;
params->decompress_threads = s->parameters.decompress_threads;
params->has_cpu_throttle_initial = true;
@@ -799,6 +802,8 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s)
compression_counters.compressed_size;
info->compression->compression_rate =
compression_counters.compression_rate;
+ info->compression->compress_no_wait_weight =
+ compression_counters.compress_no_wait_weight;
}
if (cpu_throttle_active()) {
@@ -1132,6 +1137,11 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
dest->compress_wait_thread = params->compress_wait_thread;
}
+ if (params->has_compress_wait_thread_adaptive) {
+ dest->compress_wait_thread_adaptive =
+ params->compress_wait_thread_adaptive;
+ }
+
if (params->has_decompress_threads) {
dest->decompress_threads = params->decompress_threads;
}
@@ -1204,6 +1214,11 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
s->parameters.compress_wait_thread = params->compress_wait_thread;
}
+ if (params->has_compress_wait_thread_adaptive) {
+ s->parameters.compress_wait_thread_adaptive =
+ params->compress_wait_thread_adaptive;
+ }
+
if (params->has_decompress_threads) {
s->parameters.decompress_threads = params->decompress_threads;
}
@@ -1925,6 +1940,15 @@ bool migrate_postcopy_blocktime(void)
return s->enabled_capabilities[MIGRATION_CAPABILITY_POSTCOPY_BLOCKTIME];
}
+int64_t migration_max_bandwidth(void)
+{
+ MigrationState *s;
+
+ s = migrate_get_current();
+
+ return s->parameters.max_bandwidth;
+}
+
bool migrate_use_compression(void)
{
MigrationState *s;
@@ -1961,6 +1985,15 @@ int migrate_compress_wait_thread(void)
return s->parameters.compress_wait_thread;
}
+int migrate_compress_wait_thread_adaptive(void)
+{
+ MigrationState *s;
+
+ s = migrate_get_current();
+
+ return s->parameters.compress_wait_thread_adaptive;
+}
+
int migrate_decompress_threads(void)
{
MigrationState *s;
@@ -2898,6 +2931,8 @@ static void migration_update_counters(MigrationState *s,
s->mbps = (((double) transferred * 8.0) /
((double) time_spent / 1000.0)) / 1000.0 / 1000.0;
+ compress_adaptive_update(s->mbps);
+
/*
* if we haven't sent anything, we don't want to
* recalculate. 10000 is a small enough number for our purposes
@@ -3232,6 +3267,8 @@ static Property migration_properties[] = {
DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT),
DEFINE_PROP_BOOL("x-compress-wait-thread", MigrationState,
parameters.compress_wait_thread, true),
+ DEFINE_PROP_BOOL("x-ccompress-wait-thread-adaptive", MigrationState,
+ parameters.compress_wait_thread_adaptive, false),
DEFINE_PROP_UINT8("x-decompress-threads", MigrationState,
parameters.decompress_threads,
DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT),
@@ -270,11 +270,15 @@ bool migrate_use_block(void);
bool migrate_use_block_incremental(void);
int migrate_max_cpu_throttle(void);
bool migrate_use_return_path(void);
+int64_t migration_max_bandwidth(void);
bool migrate_use_compression(void);
int migrate_compress_level(void);
int migrate_compress_threads(void);
int migrate_compress_wait_thread(void);
+int migrate_compress_wait_thread_adaptive(void);
+void compress_adaptive_update(double mbps);
+
int migrate_decompress_threads(void);
bool migrate_use_events(void);
bool migrate_postcopy_blocktime(void);
@@ -276,6 +276,8 @@ struct RAMSrcPageRequest {
QSIMPLEQ_ENTRY(RAMSrcPageRequest) next_req;
};
+#define COMPRESS_BUSY_COUNT_PERIOD 100
+
/* State of RAM for migration */
struct RAMState {
/* QEMUFile used for this migration */
@@ -292,6 +294,19 @@ struct RAMState {
bool ram_bulk_stage;
/* How many times we have dirty too many pages */
int dirty_rate_high_cnt;
+
+ /* used by by compress-wait-thread-adaptive */
+ /*
+ * the count for the case that all compress threads are busy to
+ * handle a page in a period
+ */
+ uint8_t compress_busy_count;
+ /*
+ * the number of pages that can be directly posted as normal page when
+ * all compress threads are busy in a period
+ */
+ uint8_t compress_no_wait_left;
+
/* these variables are used for bitmap sync */
/* last time we did a full bitmap_sync */
int64_t time_last_bitmap_sync;
@@ -470,6 +485,8 @@ static void compress_threads_save_cleanup(void)
comp_param = NULL;
}
+static void compress_adaptive_init(void);
+
static int compress_threads_save_setup(void)
{
int i, thread_count;
@@ -477,6 +494,9 @@ static int compress_threads_save_setup(void)
if (!migrate_use_compression()) {
return 0;
}
+
+ compress_adaptive_init();
+
thread_count = migrate_compress_threads();
compress_threads = g_new0(QemuThread, thread_count);
comp_param = g_new0(CompressParam, thread_count);
@@ -1593,6 +1613,67 @@ uint64_t ram_pagesize_summary(void)
return summary;
}
+static void compress_adaptive_init(void)
+{
+ /* fully wait on default. */
+ compression_counters.compress_no_wait_weight = 0;
+ ram_state->compress_no_wait_left = 0;
+ ram_state->compress_busy_count = 0;
+}
+
+void compress_adaptive_update(double mbps)
+{
+ int64_t rate_limit, remain_bw, max_bw = migration_max_bandwidth();
+
+ if (!migrate_use_compression() ||
+ !migrate_compress_wait_thread_adaptive()) {
+ return;
+ }
+
+ /* no bandwith is set to the file then we can not do adaptive adjustment */
+ rate_limit = qemu_file_get_rate_limit(migrate_get_current()->to_dst_file);
+ if (rate_limit == 0 || rate_limit == INT64_MAX) {
+ return;
+ }
+
+ max_bw = (max_bw >> 20) * 8;
+ remain_bw = abs(max_bw - (int64_t)(mbps));
+ if (remain_bw <= ((max_bw / 10) * 2)) {
+ /* if we have used all the bandwidth, let's compress more. */
+ if (compression_counters.compress_no_wait_weight) {
+ compression_counters.compress_no_wait_weight--;
+ }
+ goto exit;
+ }
+
+ /* have enough bandwidth left, do not need to compress so aggressively */
+ if (compression_counters.compress_no_wait_weight !=
+ COMPRESS_BUSY_COUNT_PERIOD) {
+ compression_counters.compress_no_wait_weight++;
+ }
+
+exit:
+ ram_state->compress_busy_count = 0;
+ ram_state->compress_no_wait_left =
+ compression_counters.compress_no_wait_weight;
+}
+
+static bool compress_adaptive_need_wait(void)
+{
+ if (++ram_state->compress_busy_count == COMPRESS_BUSY_COUNT_PERIOD) {
+ ram_state->compress_busy_count = 0;
+ ram_state->compress_no_wait_left =
+ compression_counters.compress_no_wait_weight;
+ }
+
+ if (ram_state->compress_no_wait_left) {
+ ram_state->compress_no_wait_left--;
+ return false;
+ }
+
+ return true;
+}
+
static void migration_update_rates(RAMState *rs, int64_t end_time)
{
uint64_t page_count = rs->target_page_count - rs->target_page_count_prev;
@@ -1970,6 +2051,15 @@ static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
{
int idx, thread_count, bytes_xmit = -1, pages = -1;
bool wait = migrate_compress_wait_thread();
+ bool adaptive = migrate_compress_wait_thread_adaptive();
+
+ /*
+ * 'compress-wait-thread-adaptive' has higher priority than
+ * 'compress-wait-thread'.
+ */
+ if (adaptive) {
+ wait = false;
+ }
thread_count = migrate_compress_threads();
qemu_mutex_lock(&comp_done_lock);
@@ -1984,20 +2074,29 @@ retry:
qemu_mutex_unlock(&comp_param[idx].mutex);
pages = 1;
update_compress_thread_counts(&comp_param[idx], bytes_xmit);
- break;
+ goto exit;
}
}
+ if (adaptive && !wait) {
+ /* it is the first time go to the loop */
+ wait = compress_adaptive_need_wait();
+ }
+
/*
- * wait for the free thread if the user specifies 'compress-wait-thread',
- * otherwise we will post the page out in the main thread as normal page.
+ * wait for the free thread if the user specifies
+ * 'compress-wait-thread-adaptive' that detected the bandwidth is
+ * not enough or compress-wait-thread', otherwise we will post the
+ * page out in the main thread as normal page.
*/
- if (pages < 0 && wait) {
+ if (wait) {
qemu_cond_wait(&comp_done_cond, &comp_done_lock);
goto retry;
}
- qemu_mutex_unlock(&comp_done_lock);
+
+exit:
+ qemu_mutex_unlock(&comp_done_lock);
return pages;
}
@@ -3147,19 +3246,18 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
RAMState **rsp = opaque;
RAMBlock *block;
- if (compress_threads_save_setup()) {
- return -1;
- }
-
/* migration has already setup the bitmap, reuse it. */
if (!migration_in_colo_state()) {
if (ram_init_all(rsp) != 0) {
- compress_threads_save_cleanup();
return -1;
}
}
(*rsp)->f = f;
+ if (compress_threads_save_setup()) {
+ return -1;
+ }
+
rcu_read_lock();
qemu_put_be64(f, ram_bytes_total() | RAM_SAVE_FLAG_MEM_SIZE);
@@ -90,11 +90,16 @@
#
# @compression-rate: rate of compressed size
#
+# @compress-no-wait-weight: it controls how many pages are directly posted
+# out as normal page when all compression threads are currently busy.
+# Only available if compress-wait-thread-adaptive is on. (Since 3.2)
+#
# Since: 3.1
##
{ 'struct': 'CompressionStats',
'data': {'pages': 'int', 'busy': 'int', 'busy-rate': 'number',
- 'compressed-size': 'int', 'compression-rate': 'number' } }
+ 'compressed-size': 'int', 'compression-rate': 'number',
+ 'compress-no-wait-weight': 'int'} }
##
# @MigrationStatus:
@@ -490,6 +495,11 @@
# compression thread to become available; otherwise,
# send the page uncompressed. (Since 3.1)
#
+# @compress-wait-thread-adaptive: Adaptively controls compress-wait-thread
+# based on the rate limit and it's off on default. When it is on
+# and it has enough bandwidth, it acts as compress-wait-thread is
+# off. (Since 3.2)
+#
# @decompress-threads: Set decompression thread count to be used in live
# migration, the decompression thread count is an integer between 1
# and 255. Usually, decompression is at least 4 times as fast as
@@ -558,7 +568,7 @@
##
{ 'enum': 'MigrationParameter',
'data': ['compress-level', 'compress-threads', 'decompress-threads',
- 'compress-wait-thread',
+ 'compress-wait-thread', 'compress-wait-thread-adaptive',
'cpu-throttle-initial', 'cpu-throttle-increment',
'tls-creds', 'tls-hostname', 'max-bandwidth',
'downtime-limit', 'x-checkpoint-delay', 'block-incremental',
@@ -578,6 +588,11 @@
# compression thread to become available; otherwise,
# send the page uncompressed. (Since 3.1)
#
+# @compress-wait-thread-adaptive: Adaptively controls compress-wait-thread
+# based on the rate limit and it's off on default. When it is on
+# and it has enough bandwidth, it acts as compress-wait-thread is
+# off. (Since 3.2)
+#
# @decompress-threads: decompression thread count
#
# @cpu-throttle-initial: Initial percentage of time guest cpus are
@@ -653,6 +668,7 @@
'data': { '*compress-level': 'int',
'*compress-threads': 'int',
'*compress-wait-thread': 'bool',
+ '*compress-wait-thread-adaptive': 'bool',
'*decompress-threads': 'int',
'*cpu-throttle-initial': 'int',
'*cpu-throttle-increment': 'int',
@@ -698,6 +714,11 @@
# compression thread to become available; otherwise,
# send the page uncompressed. (Since 3.1)
#
+# @compress-wait-thread-adaptive: Adaptively controls compress-wait-thread
+# based on the rate limit and it's off on default. When it is on
+# and it has enough bandwidth, it acts as compress-wait-thread is
+# off. (Since 3.2)
+#
# @decompress-threads: decompression thread count
#
# @cpu-throttle-initial: Initial percentage of time guest cpus are
@@ -769,6 +790,7 @@
'data': { '*compress-level': 'uint8',
'*compress-threads': 'uint8',
'*compress-wait-thread': 'bool',
+ '*compress-wait-thread-adaptive': 'bool',
'*decompress-threads': 'uint8',
'*cpu-throttle-initial': 'uint8',
'*cpu-throttle-increment': 'uint8',