From patchwork Tue Oct 16 11:10:03 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10643443 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 1F0D61057 for ; Tue, 16 Oct 2018 11:12:56 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 0B7D929883 for ; Tue, 16 Oct 2018 11:12:56 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id F066D29889; Tue, 16 Oct 2018 11:12:55 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.7 required=2.0 tests=BAYES_00,DKIM_ADSP_CUSTOM_MED, DKIM_INVALID,DKIM_SIGNED,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=unavailable version=3.3.1 Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by mail.wl.linuxfoundation.org (Postfix) with ESMTPS id 5297829883 for ; Tue, 16 Oct 2018 11:12:55 +0000 (UTC) Received: from localhost ([::1]:57267 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gCNHS-00025E-EZ for patchwork-qemu-devel@patchwork.kernel.org; Tue, 16 Oct 2018 07:12:54 -0400 Received: from eggs.gnu.org ([2001:4830:134:3::10]:36777) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gCNFJ-0000Y2-T9 for qemu-devel@nongnu.org; Tue, 16 Oct 2018 07:10:43 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1gCNFE-00032a-Df for qemu-devel@nongnu.org; Tue, 16 Oct 2018 07:10:38 -0400 Received: from mail-pf1-x42c.google.com ([2607:f8b0:4864:20::42c]:39709) by eggs.gnu.org with esmtps (TLS1.0:RSA_AES_128_CBC_SHA1:16) (Exim 4.71) (envelope-from ) id 1gCNFE-0002Yn-29 for qemu-devel@nongnu.org; Tue, 16 Oct 2018 07:10:36 -0400 Received: by mail-pf1-x42c.google.com with SMTP id c25-v6so11291524pfe.6 for ; Tue, 16 Oct 2018 04:10:20 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=F6BLS4QxDMe4rOwYdWlkJRednMiYYG/dAXRvDxsdDq8=; b=CYPUcyFarEriNjjBWg35BfovasAR3w9kijrww92zsd647DfEjf7An/3r3bwb/raDOj bfR5YUEMwqseVq6/D53Un62SSnheeRVv6/oOi8hYW02sGretbJNV0l2KcbhCNAmGSxSY KRr+CNHL/u1CahLr/KHLglV+n00aRAODyjZ0bCc1cb3ZwOkQ4QBDAa12j6uipvWvj0qw iOS2jNxRoKSiRQAxHXpmxWqUcIIJFg07GaJK2FwxIKmuuhyUiQpt8UpEqCwTR+1JekTA 3gwmEbu0DOMS45JumPY2STQnoNkaIc7r+FULjSKUPMfMwka144RTSrXuUjy1Rlrl+r7T qOlw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=F6BLS4QxDMe4rOwYdWlkJRednMiYYG/dAXRvDxsdDq8=; b=BeINiMSdWVDsXcAJZdICvIcoAny5qzJRxiFKII7UvKEhdkEe8oldsmpjNqGZLxqaBi WvGSRR0a71xqNW2RiLCIaX4shOMW4scUwQXEhjrwdobDDdORhcXVjyPFDWDmKEPh38tT ZdOrAV3lNsdnQgp3z7ovawaoaF5KL9LR4s5LxJy6lxFEtBkmEjcx1y5nXCbDEQ+pyC13 9vUWdPShv6Q+AAMgadchenjKPj37oy/IEMJcb+9e/uJGAdNs02ZiFSy4pkqDIseNDaMd HKn1YxpHgA69sHslojFFOhNUfTWFV9HP+cfOJEMjCnFNtx7Q8w8UjpQ33D9yf246u9ri fdBQ== X-Gm-Message-State: ABuFfogJuWA7gwzrV+xIuRoiRL0Fpr513DEHTeekJTueWhizqm66DNE0 uh+i6tTY9vemiKMt5HlV2OA= X-Google-Smtp-Source: ACcGV623+GB5FAcRhlEVtJDx4Qf+SVk9hGUakQxBZoheXIWmg3qJkxnNS0ugTCEpjUzV3PTJsKEIGQ== X-Received: by 2002:a63:ae4d:: with SMTP id e13-v6mr19934232pgp.315.1539688219715; Tue, 16 Oct 2018 04:10:19 -0700 (PDT) Received: from localhost.localdomain ([203.205.141.40]) by smtp.gmail.com with ESMTPSA id p24-v6sm18054927pgm.70.2018.10.16.04.10.16 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Tue, 16 Oct 2018 04:10:19 -0700 (PDT) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Date: Tue, 16 Oct 2018 19:10:03 +0800 Message-Id: <20181016111006.629-2-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.4 In-Reply-To: <20181016111006.629-1-xiaoguangrong@tencent.com> References: <20181016111006.629-1-xiaoguangrong@tencent.com> X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 2607:f8b0:4864:20::42c Subject: [Qemu-devel] [PATCH 1/4] ptr_ring: port ptr_ring from linux kernel to QEMU X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: kvm@vger.kernel.org, quintela@redhat.com, Xiao Guangrong , qemu-devel@nongnu.org, peterx@redhat.com, dgilbert@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn Errors-To: qemu-devel-bounces+patchwork-qemu-devel=patchwork.kernel.org@nongnu.org Sender: "Qemu-devel" X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong ptr_ring is good to minimize cache-contention and has the simple model of memory barrier which will be used by lockless threads model to pass requests between main migration thread and compression threads Some changes are made: 1) drop unnecessary APIs, e.g, for _irq, _bh APIs 2) the resize APIs has not been ported 3) drop the locks 4) adjust some comments 5) new API, ptr_ring_disable_batch, has been introduced Signed-off-by: Xiao Guangrong --- include/qemu/ptr_ring.h | 235 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100644 include/qemu/ptr_ring.h diff --git a/include/qemu/ptr_ring.h b/include/qemu/ptr_ring.h new file mode 100644 index 0000000000..d8266d45f6 --- /dev/null +++ b/include/qemu/ptr_ring.h @@ -0,0 +1,235 @@ +/* + * Definitions for the 'struct ptr_ring' datastructure. + * + * Author: + * Michael S. Tsirkin + * Xiao Guangrong + * + * Copyright (C) 2018 Red Hat, Inc. + * Copyright (C) 2018 Tencent, Inc. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This is a limited-size FIFO maintaining pointers in FIFO order, with + * one CPU producing entries and another consuming entries from a FIFO. + * + * This implementation tries to minimize cache-contention when there is a + * single producer and a single consumer CPU. + */ + +#ifndef _QEMU_PTR_RING_H +#define _QEMU_PTR_RING_H 1 + +#include "qemu/compiler.h" +#include "qemu/atomic.h" + +#define SMP_CACHE_BYTES 64 +#define ____cacheline_aligned_in_smp \ + __attribute__((__aligned__(SMP_CACHE_BYTES))) + +#define WRITE_ONCE(ptr, val) \ + (*((volatile typeof(ptr) *)(&(ptr))) = (val)) +#define READ_ONCE(ptr) (*((volatile typeof(ptr) *)(&(ptr)))) + +struct ptr_ring { + int producer ____cacheline_aligned_in_smp; + int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ + int consumer_tail; /* next entry to invalidate */ + /* Shared consumer/producer data */ + /* Read-only by both the producer and the consumer */ + int size ____cacheline_aligned_in_smp; /* max entries in queue */ + int batch; /* number of entries to consume in a batch */ + void **queue; +}; +typedef struct ptr_ring Ptr_Ring; + +/* + * Note: callers invoking this in a loop must use a compiler barrier, + * for example cpu_relax(). + */ +static inline bool ptr_ring_full(struct ptr_ring *r) +{ + return r->queue[r->producer]; +} + +/* + * Note: callers invoking this in a loop must use a compiler barrier, + * for example cpu_relax(). + * + * Callers are responsible for making sure pointer that is being queued + * points to a valid data. + */ +static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) +{ + if (unlikely(!r->size) || r->queue[r->producer]) + return -ENOSPC; + + /* Make sure the pointer we are storing points to a valid data. */ + /* Pairs with READ_ONCE in ptr_ring_consume. */ + smp_wmb(); + + WRITE_ONCE(r->queue[r->producer++], ptr); + if (unlikely(r->producer >= r->size)) + r->producer = 0; + return 0; +} + +static inline void *__ptr_ring_peek(struct ptr_ring *r) +{ + if (likely(r->size)) + return READ_ONCE(r->queue[r->consumer_head]); + return NULL; +} + +/* + * Test ring empty status. + * + * However, if some other CPU consumes ring entries at the same time, + * the value returned is not guaranteed to be correct. + * + * In this case - to avoid incorrectly detecting the ring + * as empty - the CPU consuming the ring entries is responsible + * for either consuming all ring entries until the ring is empty, + * or synchronizing with some other CPU and causing it to + * re-test ptr_ring_empty and/or consume the ring enteries + * after the synchronization point. + * + * Note: callers invoking this in a loop must use a compiler barrier, + * for example cpu_relax(). + */ +static inline bool ptr_ring_empty(struct ptr_ring *r) +{ + if (likely(r->size)) + return !r->queue[READ_ONCE(r->consumer_head)]; + return true; +} + +/* Must only be called after __ptr_ring_peek returned !NULL */ +static inline void __ptr_ring_discard_one(struct ptr_ring *r) +{ + /* Fundamentally, what we want to do is update consumer + * index and zero out the entry so producer can reuse it. + * Doing it naively at each consume would be as simple as: + * consumer = r->consumer; + * r->queue[consumer++] = NULL; + * if (unlikely(consumer >= r->size)) + * consumer = 0; + * r->consumer = consumer; + * but that is suboptimal when the ring is full as producer is writing + * out new entries in the same cache line. Defer these updates until a + * batch of entries has been consumed. + */ + /* Note: we must keep consumer_head valid at all times for ptr_ring_empty + * to work correctly. + */ + int consumer_head = r->consumer_head; + int head = consumer_head++; + + /* Once we have processed enough entries invalidate them in + * the ring all at once so producer can reuse their space in the ring. + * We also do this when we reach end of the ring - not mandatory + * but helps keep the implementation simple. + */ + if (unlikely(consumer_head - r->consumer_tail >= r->batch || + consumer_head >= r->size)) { + /* Zero out entries in the reverse order: this way we touch the + * cache line that producer might currently be reading the last; + * producer won't make progress and touch other cache lines + * besides the first one until we write out all entries. + */ + while (likely(head >= r->consumer_tail)) + r->queue[head--] = NULL; + r->consumer_tail = consumer_head; + } + if (unlikely(consumer_head >= r->size)) { + consumer_head = 0; + r->consumer_tail = 0; + } + /* matching READ_ONCE in ptr_ring_empty for lockless tests */ + WRITE_ONCE(r->consumer_head, consumer_head); +} + +static inline void *ptr_ring_consume(struct ptr_ring *r) +{ + void *ptr; + + /* The READ_ONCE in __ptr_ring_peek guarantees that anyone + * accessing data through the pointer is up to date. Pairs + * with smp_wmb in ptr_ring_produce. + */ + ptr = __ptr_ring_peek(r); + if (ptr) + __ptr_ring_discard_one(r); + + return ptr; +} + +static inline int ptr_ring_consume_batched(struct ptr_ring *r, + void **array, int n) +{ + void *ptr; + int i; + + for (i = 0; i < n; i++) { + ptr = ptr_ring_consume(r); + if (!ptr) + break; + array[i] = ptr; + } + + return i; +} + +static inline void **__ptr_ring_init_queue_alloc(unsigned int size) +{ + return g_try_malloc0_n(size, sizeof(void *)); +} + +static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) +{ + r->size = size; + r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); + /* We need to set batch at least to 1 to make logic + * in __ptr_ring_discard_one work correctly. + * Batching too much (because ring is small) would cause a lot of + * burstiness. Needs tuning, for now disable batching. + */ + if (r->batch > r->size / 2 || !r->batch) + r->batch = 1; +} + +/* + * Disable batching so that there is no consumered entry in the ring. + * + * It is convenient if it makes sure that the ring is large enough to + * contain all requests, i.e, ptr_ring_produce can not fail. + */ +static inline void ptr_ring_disable_batch(struct ptr_ring *r) +{ + r->batch = 1; +} + +static inline int ptr_ring_init(struct ptr_ring *r, int size) +{ + r->queue = __ptr_ring_init_queue_alloc(size); + if (!r->queue) + return -ENOMEM; + + __ptr_ring_set_size(r, size); + r->producer = r->consumer_head = r->consumer_tail = 0; + return 0; +} + +static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) +{ + void *ptr; + + if (destroy) + while ((ptr = ptr_ring_consume(r))) + destroy(ptr); + g_free(r->queue); +} +#endif From patchwork Tue Oct 16 11:10:04 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10643447 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id BEB381057 for ; Tue, 16 Oct 2018 11:14:28 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id AC68F29889 for ; Tue, 16 Oct 2018 11:14:28 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id A030829891; Tue, 16 Oct 2018 11:14:28 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.7 required=2.0 tests=BAYES_00,DKIM_ADSP_CUSTOM_MED, DKIM_INVALID,DKIM_SIGNED,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=unavailable version=3.3.1 Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by mail.wl.linuxfoundation.org (Postfix) with ESMTPS id 8A47129889 for ; Tue, 16 Oct 2018 11:14:27 +0000 (UTC) Received: from localhost ([::1]:57279 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gCNIw-0003Nk-RV for patchwork-qemu-devel@patchwork.kernel.org; Tue, 16 Oct 2018 07:14:26 -0400 Received: from eggs.gnu.org ([2001:4830:134:3::10]:36786) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gCNFL-0000ZF-Fz for qemu-devel@nongnu.org; Tue, 16 Oct 2018 07:10:49 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1gCNFE-00031R-4K for qemu-devel@nongnu.org; Tue, 16 Oct 2018 07:10:38 -0400 Received: from mail-pg1-x541.google.com ([2607:f8b0:4864:20::541]:35212) by eggs.gnu.org with esmtps (TLS1.0:RSA_AES_128_CBC_SHA1:16) (Exim 4.71) (envelope-from ) id 1gCNFD-0002fn-Nq for qemu-devel@nongnu.org; Tue, 16 Oct 2018 07:10:36 -0400 Received: by mail-pg1-x541.google.com with SMTP id v133-v6so10701471pgb.2 for ; Tue, 16 Oct 2018 04:10:24 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references :mime-version:content-transfer-encoding; bh=tB7Ty9oQ2GaY9HnWrswi6LHJqfKiDo0AC2wzD0+POHc=; b=XCCOsY9FaX7PbQAD+iX1DDs5369X6zHSa9XVFft9el1/fvcW2Yxne7ZPKHWwvqW1Il 2eTjbEr6XC34sT3CVCTWNIv8CHBaDRAN1D1O1LXNx5o9AHKwIdh/cekB8NsmaJNiUe7q L7QLsK+V6xrF29cboipFXNAgl7+kkiWRDhJfP2/YK5zxNhBVhQ8B1d8JcC9USEP6WRsD uPC31K5+yy0ylKqJ/A4CUo42Zogt0AcC3673noFmPffzGAwJ8EbbaK40Ja/PvylBZWcG bwPw+eJI66bLsnFdp4IUOE5m1LAgY1W8kbiRfIqWucZ1kQjb5wvX40jLurvpzonE0qYQ mH9Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references:mime-version:content-transfer-encoding; bh=tB7Ty9oQ2GaY9HnWrswi6LHJqfKiDo0AC2wzD0+POHc=; b=aXHVEnUsR5gB5m/S/MURWt1YJiApJOI6Ljrmln4kGYsIw8Ckh2wK11nbeaN8M+6Fh7 3yyLHyfuaCzFKCN47DfwP+b0wV6PX65aErFAfiuWkDMi5n8vbobbNeTwEZeijSMCQZt8 HfJUypWOO7YJa8V4kdrv2I0AIEtAMBtLbU96QNCdqfLL0IjetSxu/7N/a6z4L2XLBfw6 1Y8AWxvtHoqgUr2z/cXIsfMlGbfffaEPbZPr27Id9ZwNw1LtgsieE3XzMQFdpiPBFf3g 02HmjL+ds+gvCw3nZl/ciR6qcS7K/t8Id5LoXN8djoRZjb88ueIq3XS8xuugfSOHlxtt aHRg== X-Gm-Message-State: ABuFfohU58EOIbY7xiaqyxSmLzQ3OFHJmLOJIXQ32pjdbNKdZB+mLi7I iWDxs16Rjiu9bI3z8xxnp6M= X-Google-Smtp-Source: ACcGV63VvVB60dxegf70vEULFS6mUUXZeOCUo16rd/bZhH9IYJdGLRWx9o8uIM4uQWofjmA/ephdnA== X-Received: by 2002:a63:fb09:: with SMTP id o9-v6mr19844971pgh.333.1539688223645; Tue, 16 Oct 2018 04:10:23 -0700 (PDT) Received: from localhost.localdomain ([203.205.141.40]) by smtp.gmail.com with ESMTPSA id p24-v6sm18054927pgm.70.2018.10.16.04.10.19 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Tue, 16 Oct 2018 04:10:23 -0700 (PDT) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Date: Tue, 16 Oct 2018 19:10:04 +0800 Message-Id: <20181016111006.629-3-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.4 In-Reply-To: <20181016111006.629-1-xiaoguangrong@tencent.com> References: <20181016111006.629-1-xiaoguangrong@tencent.com> MIME-Version: 1.0 X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 2607:f8b0:4864:20::541 Subject: [Qemu-devel] [PATCH 2/4] migration: introduce lockless multithreads model X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: kvm@vger.kernel.org, quintela@redhat.com, Xiao Guangrong , qemu-devel@nongnu.org, peterx@redhat.com, dgilbert@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn Errors-To: qemu-devel-bounces+patchwork-qemu-devel=patchwork.kernel.org@nongnu.org Sender: "Qemu-devel" X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong Current implementation of compression and decompression are very hard to be enabled on productions. We noticed that too many wait-wakes go to kernel space and CPU usages are very low even if the system is really free The reasons are: 1) there are two many locks used to do synchronous,there   is a global lock and each single thread has its own lock,   migration thread and work threads need to go to sleep if   these locks are busy 2) migration thread separately submits request to the thread however, only one request can be pended, that means, the thread has to go to sleep after finishing the request To make it work better, we introduce a lockless multithread model, the user, currently it is the migration thread, submits request to each thread which has its own ring whose capacity is 4 and puts the result to a global ring where the user fetches result out and do remaining operations for the request, e.g, posting the compressed data out for migration on the source QEMU Signed-off-by: Xiao Guangrong --- include/qemu/lockless-threads.h | 63 +++++++ util/Makefile.objs | 1 + util/lockless-threads.c | 373 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 437 insertions(+) create mode 100644 include/qemu/lockless-threads.h create mode 100644 util/lockless-threads.c diff --git a/include/qemu/lockless-threads.h b/include/qemu/lockless-threads.h new file mode 100644 index 0000000000..9340d3a748 --- /dev/null +++ b/include/qemu/lockless-threads.h @@ -0,0 +1,63 @@ +/* + * Lockless Multithreads Abstraction + * + * This is the abstraction layer for lockless multithreads management. + * + * Note: currently only one producer is allowed. + * + * Copyright(C) 2018 Tencent Corporation. + * + * Author: + * Xiao Guangrong + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#ifndef QEMU_LOCKLESS_THREAD_H +#define QEMU_LOCKLESS_THREAD_H + +#include "qemu/queue.h" +#include "qemu/thread.h" +#include "qemu/ptr_ring.h" + +/* + * the request representation which contains the internally used mete data, + * it can be embedded to user's self-defined data struct and the user can + * use container_of() to get the self-defined data + */ +struct ThreadRequest { + QSLIST_ENTRY(ThreadRequest) node; + unsigned int thread_index; +}; +typedef struct ThreadRequest ThreadRequest; + +typedef struct Threads Threads; + +/* the size of thread local request ring on default */ +#define DEFAULT_THREAD_RING_SIZE 4 + +Threads *threads_create(unsigned int threads_nr, const char *name, + int thread_ring_size, + ThreadRequest *(*thread_request_init)(void), + void (*thread_request_uninit)(ThreadRequest *request), + void (*thread_request_handler)(ThreadRequest *request), + void (*thread_request_done)(ThreadRequest *request)); +void threads_destroy(Threads *threads); + +/* + * find a free request and associate it with a free thread. + * If no request or no thread is free, return NULL + */ +ThreadRequest *threads_submit_request_prepare(Threads *threads); +/* + * push the request to its thread's local ring and notify the thread + */ +void threads_submit_request_commit(Threads *threads, ThreadRequest *request); + +/* + * wait all threads to complete the request filled in their local rings + * to make sure there is no previous request exists. + */ +void threads_wait_done(Threads *threads); +#endif diff --git a/util/Makefile.objs b/util/Makefile.objs index 0820923c18..deb5c972d5 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -50,5 +50,6 @@ util-obj-y += range.o util-obj-y += stats64.o util-obj-y += systemd.o util-obj-y += iova-tree.o +util-obj-y += lockless-threads.o util-obj-$(CONFIG_LINUX) += vfio-helpers.o util-obj-$(CONFIG_OPENGL) += drm.o diff --git a/util/lockless-threads.c b/util/lockless-threads.c new file mode 100644 index 0000000000..50cf143c03 --- /dev/null +++ b/util/lockless-threads.c @@ -0,0 +1,373 @@ +/* + * Lockless Multithreads Implementation + * + * Implement lockless multithreads management. + * + * Note: currently only one producer is allowed. + * + * Copyright(C) 2018 Tencent Corporation. + * + * Author: + * Xiao Guangrong + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#include "qemu/osdep.h" +#include "qemu/host-utils.h" +#include "qemu/lockless-threads.h" + +struct ThreadLocal { + QemuThread thread; + + /* the event used to wake up the thread */ + QemuEvent ev; + + struct Threads *threads; + + /* local request ring which is filled by the user */ + Ptr_Ring request_ring; + + /* the index of the thread */ + int self; + + /* thread is useless and needs to exit */ + bool quit; +}; +typedef struct ThreadLocal ThreadLocal; + +/* + * the main data struct represents multithreads which is shared by + * all threads + */ +struct Threads { + const char *name; + unsigned int threads_nr; + /* the request is pushed to the thread with round-robin manner */ + unsigned int current_thread_index; + + int thread_ring_size; + int total_requests; + + /* the request is pre-allocated and linked in the list */ + int free_requests_nr; + QSLIST_HEAD(, ThreadRequest) free_requests; + + /* the constructor of request */ + ThreadRequest *(*thread_request_init)(void); + /* the destructor of request */ + void (*thread_request_uninit)(ThreadRequest *request); + /* the handler of the request which is called in the thread */ + void (*thread_request_handler)(ThreadRequest *request); + /* + * the handler to process the result which is called in the + * user's context + */ + void (*thread_request_done)(ThreadRequest *request); + + /* the thread push the result to this ring so it has multiple producers */ + QemuSpin done_ring_lock; + Ptr_Ring request_done_ring; + + ThreadLocal per_thread_data[0]; +}; +typedef struct Threads Threads; + +static void put_done_request(Threads *threads, ThreadRequest *request) +{ + int ret; + + qemu_spin_lock(&threads->done_ring_lock); + ret = ptr_ring_produce(&threads->request_done_ring, request); + /* there should be enough room to save all request. */ + assert(!ret); + qemu_spin_unlock(&threads->done_ring_lock); +} + +/* retry to see if there is avilable request before actually go to wait. */ +#define BUSY_WAIT_COUNT 1000 + +static ThreadRequest *thread_busy_wait_for_request(ThreadLocal *thread) +{ + ThreadRequest *request; + int count = 0; + + for (count = 0; count < BUSY_WAIT_COUNT; count++) { + request = ptr_ring_consume(&thread->request_ring); + if (request) { + return request; + } + + cpu_relax(); + } + + return NULL; +} + +static void *thread_run(void *opaque) +{ + ThreadLocal *self_data = (ThreadLocal *)opaque; + Threads *threads = self_data->threads; + void (*handler)(ThreadRequest *data) = threads->thread_request_handler; + ThreadRequest *request; + + for ( ; !atomic_read(&self_data->quit); ) { + qemu_event_reset(&self_data->ev); + + request = thread_busy_wait_for_request(self_data); + if (!request) { + qemu_event_wait(&self_data->ev); + continue; + } + handler(request); + put_done_request(threads, request); + } + + return NULL; +} + +static void add_free_request(Threads *threads, ThreadRequest *request) +{ + QSLIST_INSERT_HEAD(&threads->free_requests, request, node); + threads->free_requests_nr++; +} + +static ThreadRequest *get_and_remove_first_free_request(Threads *threads) +{ + ThreadRequest *request; + + if (QSLIST_EMPTY(&threads->free_requests)) { + return NULL; + } + + request = QSLIST_FIRST(&threads->free_requests); + QSLIST_REMOVE_HEAD(&threads->free_requests, node); + threads->free_requests_nr--; + return request; +} + +static void uninit_requests(Threads *threads, int free_nr) +{ + ThreadRequest *request; + + /* + * all requests should be released to the list if threads are being + * destroyed, i,e. should call threads_wait_done() first. + */ + assert(threads->free_requests_nr == free_nr); + + while ((request = get_and_remove_first_free_request(threads))) { + threads->thread_request_uninit(request); + } + + assert(ptr_ring_empty(&threads->request_done_ring)); + ptr_ring_cleanup(&threads->request_done_ring, NULL); +} + +static int init_requests(Threads *threads, int total_requests) +{ + ThreadRequest *request; + int i, free_nr = 0; + + if (ptr_ring_init(&threads->request_done_ring, total_requests) < 0) { + return -1; + } + ptr_ring_disable_batch(&threads->request_done_ring); + + QSLIST_INIT(&threads->free_requests); + for (i = 0; i < total_requests; i++) { + request = threads->thread_request_init(); + if (!request) { + goto cleanup; + } + + free_nr++; + add_free_request(threads, request); + } + return 0; + +cleanup: + uninit_requests(threads, free_nr); + return -1; +} + +static void uninit_thread_data(Threads *threads, int num) +{ + ThreadLocal *thread_local = threads->per_thread_data; + int i; + + for (i = 0; i < num; i++) { + thread_local[i].quit = true; + qemu_event_set(&thread_local[i].ev); + qemu_thread_join(&thread_local[i].thread); + qemu_event_destroy(&thread_local[i].ev); + assert(ptr_ring_empty(&thread_local[i].request_ring)); + + /* nothing is left in the ring. */ + ptr_ring_cleanup(&thread_local[i].request_ring, NULL); + } +} + +static int init_thread_data(Threads *threads, int threads_nr) +{ + ThreadLocal *thread_local = threads->per_thread_data; + char *name; + int i; + + for (i = 0; i < threads_nr; i++) { + if (ptr_ring_init(&thread_local[i].request_ring, + threads->thread_ring_size) < 0) { + goto exit; + } + ptr_ring_disable_batch(&thread_local[i].request_ring); + + qemu_event_init(&thread_local[i].ev, false); + thread_local[i].threads = threads; + thread_local[i].self = i; + name = g_strdup_printf("%s/%d", threads->name, thread_local[i].self); + qemu_thread_create(&thread_local[i].thread, name, + thread_run, &thread_local[i], QEMU_THREAD_JOINABLE); + g_free(name); + } + return 0; + + exit: + uninit_thread_data(threads, i); + return -1; +} + +Threads *threads_create(unsigned int threads_nr, const char *name, + int thread_ring_size, + ThreadRequest *(*thread_request_init)(void), + void (*thread_request_uninit)(ThreadRequest *request), + void (*thread_request_handler)(ThreadRequest *request), + void (*thread_request_done)(ThreadRequest *request)) +{ + Threads *threads; + int total_requests; + + threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal)); + threads->name = name; + threads->thread_request_init = thread_request_init; + threads->thread_request_uninit = thread_request_uninit; + threads->thread_request_handler = thread_request_handler; + threads->thread_request_done = thread_request_done; + qemu_spin_init(&threads->done_ring_lock); + + threads->thread_ring_size = thread_ring_size; + total_requests = threads->thread_ring_size * threads_nr; + if (init_requests(threads, total_requests) < 0) { + goto exit; + } + threads->total_requests = total_requests; + + if (init_thread_data(threads, threads_nr) < 0) { + goto exit; + } + threads->threads_nr = threads_nr; + return threads; + +exit: + threads_destroy(threads); + return NULL; +} + +void threads_destroy(Threads *threads) +{ + uninit_thread_data(threads, threads->threads_nr); + uninit_requests(threads, threads->total_requests); + g_free(threads); +} + +static int find_free_thread(Threads *threads, int range) +{ + int current_index, index, try = 0; + + current_index = threads->current_thread_index % threads->threads_nr; + index = current_index; + + do { + index = index % threads->threads_nr; + if (!ptr_ring_full(&threads->per_thread_data[index].request_ring)) { + threads->current_thread_index = index; + return index; + } + + if (++try > range) { + return -1; + } + } while (++index != current_index); + + return -1; +} + +ThreadRequest *threads_submit_request_prepare(Threads *threads) +{ + ThreadRequest *request; + int index; + + /* seek a free one in all threads. */ + index = find_free_thread(threads, threads->threads_nr); + if (index < 0) { + return NULL; + } + + /* try to get the request from the list */ + request = get_and_remove_first_free_request(threads); + if (request) { + goto got_request; + } + + /* get the request already been handled by the threads */ + request = ptr_ring_consume(&threads->request_done_ring); + if (request) { + threads->thread_request_done(request); + goto got_request; + } + + return NULL; + +got_request: + request->thread_index = index; + return request; +} + +void threads_submit_request_commit(Threads *threads, ThreadRequest *request) +{ + int ret, index = request->thread_index; + ThreadLocal *thread_local = &threads->per_thread_data[index]; + + ret = ptr_ring_produce(&thread_local->request_ring, request); + + /* + * we have detected that the thread's ring is not full in + * threads_submit_request_prepare(), there should be free + * room in the ring + */ + assert(!ret); + /* new request arrived, notify the thread */ + qemu_event_set(&thread_local->ev); + + /* we have used this entry, search from the next one. */ + threads->current_thread_index = ++index; +} + +void threads_wait_done(Threads *threads) +{ + ThreadRequest *requests[DEFAULT_THREAD_RING_SIZE * 2]; + int nr; + +retry: + nr = ptr_ring_consume_batched(&threads->request_done_ring, + (void **)requests, ARRAY_SIZE(requests)); + while (nr--) { + threads->thread_request_done(requests[nr]); + add_free_request(threads, requests[nr]); + } + + if (threads->free_requests_nr != threads->total_requests) { + cpu_relax(); + goto retry; + } +} From patchwork Tue Oct 16 11:10:05 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10643441 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 7A1CB1057 for ; Tue, 16 Oct 2018 11:12:05 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 69B9D29883 for ; Tue, 16 Oct 2018 11:12:05 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 5CF392988D; Tue, 16 Oct 2018 11:12:05 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.7 required=2.0 tests=BAYES_00,DKIM_ADSP_CUSTOM_MED, DKIM_INVALID,DKIM_SIGNED,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=unavailable version=3.3.1 Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by mail.wl.linuxfoundation.org (Postfix) with ESMTPS id A4D1E29883 for ; Tue, 16 Oct 2018 11:12:04 +0000 (UTC) Received: from localhost ([::1]:57266 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gCNGd-0001QD-Ug for patchwork-qemu-devel@patchwork.kernel.org; Tue, 16 Oct 2018 07:12:03 -0400 Received: from eggs.gnu.org ([2001:4830:134:3::10]:36763) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gCNFI-0000X0-1R for qemu-devel@nongnu.org; Tue, 16 Oct 2018 07:10:45 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1gCNFD-000314-Uf for qemu-devel@nongnu.org; Tue, 16 Oct 2018 07:10:38 -0400 Received: from mail-pg1-x541.google.com ([2607:f8b0:4864:20::541]:44471) by eggs.gnu.org with esmtps (TLS1.0:RSA_AES_128_CBC_SHA1:16) (Exim 4.71) (envelope-from ) id 1gCNFD-0002ln-En for qemu-devel@nongnu.org; Tue, 16 Oct 2018 07:10:35 -0400 Received: by mail-pg1-x541.google.com with SMTP id g2-v6so10685614pgu.11 for ; Tue, 16 Oct 2018 04:10:28 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=CxhcfrwFXA5qte1Agh1ZynNl8udg4bU7bkK+GQOcbrY=; b=GsW0rSDPBXajeebi6w3iRNw6jYjk5cuXrEGQJpP4H+kuUV5bD5gYhdtnqVjBR/vy9t 64wgZ65jsH1ZChYj5LeRnwXzX9m292aZxwxVtpGiGO/bcXnMN01P+k+SDcLcnrpJakz2 /z2sAOTNxFEkvCVwHF3DPibEviuYIMMUZPp5ug6B9jRpAfy1gN9NhbS9qoarLZLx/Yw7 rniCXNZOWunhUKBWr2VSDpRHtQfAK9pZjxGutqobcBVKFFnSgtXVzsheIXfp/zASlqNl 5NaDZxvReGou0zYvsPsCNaHWrA06Mu6GGy4vVVLbgjFLAtD7dK17vhh0HXxxKRYOk8Vp BC9w== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=CxhcfrwFXA5qte1Agh1ZynNl8udg4bU7bkK+GQOcbrY=; b=UXhQpx2tGqNVTC1KaNS1MiajftrfvejO7byKyV3zHbSH9mD+uyp7snSXmWAFroUiOh Y0Tn8bAl1clwP048n7j8IsEbvhe3bh3+hWUs/ycpukPveYCLSPwwp176JHPE1H/X/cXu IqKTQwmrtaIYz2Zdbj1VsLP23/fJZVMkG77zEb88yAPmXdUfnarz3TrGLNmApJmiKpQf vNTxOWZ3j4EANEjOU4rLVSZ6T8t5NteCZfrR5tVIPk+hYHQyBBXtvJ+xNyFtiwQAsr0o SfgpgHLtQq5Fx75rmgYACie2auY9cSWLT49QTDGvLAlVY2DfFXEbgXdeBONWru3K/TL4 JJJQ== X-Gm-Message-State: ABuFfohksK57yaQvw/ztyZJ6H5ibZXfXufQWqETn2MTXuovL2gnkKGYO 4Fm17aJtaJb8f+u7VJI66AA= X-Google-Smtp-Source: ACcGV63ux71vY6asKBcnub0BY80ur9U6E5+vqf7YV/enomkA0u/vp+X5uS5Znvvvd7aM3c04LFWWNw== X-Received: by 2002:a63:c14a:: with SMTP id p10-v6mr19790957pgi.424.1539688227427; Tue, 16 Oct 2018 04:10:27 -0700 (PDT) Received: from localhost.localdomain ([203.205.141.40]) by smtp.gmail.com with ESMTPSA id p24-v6sm18054927pgm.70.2018.10.16.04.10.23 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Tue, 16 Oct 2018 04:10:26 -0700 (PDT) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Date: Tue, 16 Oct 2018 19:10:05 +0800 Message-Id: <20181016111006.629-4-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.4 In-Reply-To: <20181016111006.629-1-xiaoguangrong@tencent.com> References: <20181016111006.629-1-xiaoguangrong@tencent.com> X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 2607:f8b0:4864:20::541 Subject: [Qemu-devel] [PATCH 3/4] migration: use lockless Multithread model for compression X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: kvm@vger.kernel.org, quintela@redhat.com, Xiao Guangrong , qemu-devel@nongnu.org, peterx@redhat.com, dgilbert@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn Errors-To: qemu-devel-bounces+patchwork-qemu-devel=patchwork.kernel.org@nongnu.org Sender: "Qemu-devel" X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong Adapt the compression code to the lockless multithread model Signed-off-by: Xiao Guangrong --- migration/ram.c | 312 +++++++++++++++++++++----------------------------------- 1 file changed, 115 insertions(+), 197 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index bc38d98cc3..2356bc255c 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -57,6 +57,7 @@ #include "qemu/uuid.h" #include "savevm.h" #include "qemu/iov.h" +#include "qemu/lockless-threads.h" /***********************************************************/ /* ram save/restore */ @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus; CompressionStats compression_counters; -struct CompressParam { - bool done; - bool quit; - bool zero_page; - QEMUFile *file; - QemuMutex mutex; - QemuCond cond; - RAMBlock *block; - ram_addr_t offset; - - /* internally used fields */ - z_stream stream; - uint8_t *originbuf; -}; -typedef struct CompressParam CompressParam; - struct DecompressParam { bool done; bool quit; @@ -377,15 +362,6 @@ struct DecompressParam { }; typedef struct DecompressParam DecompressParam; -static CompressParam *comp_param; -static QemuThread *compress_threads; -/* comp_done_cond is used to wake up the migration thread when - * one of the compression threads has finished the compression. - * comp_done_lock is used to co-work with comp_done_cond. - */ -static QemuMutex comp_done_lock; -static QemuCond comp_done_cond; -/* The empty QEMUFileOps will be used by file in CompressParam */ static const QEMUFileOps empty_ops = { }; static QEMUFile *decomp_file; @@ -394,125 +370,6 @@ static QemuThread *decompress_threads; static QemuMutex decomp_done_lock; static QemuCond decomp_done_cond; -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, - ram_addr_t offset, uint8_t *source_buf); - -static void *do_data_compress(void *opaque) -{ - CompressParam *param = opaque; - RAMBlock *block; - ram_addr_t offset; - bool zero_page; - - qemu_mutex_lock(¶m->mutex); - while (!param->quit) { - if (param->block) { - block = param->block; - offset = param->offset; - param->block = NULL; - qemu_mutex_unlock(¶m->mutex); - - zero_page = do_compress_ram_page(param->file, ¶m->stream, - block, offset, param->originbuf); - - qemu_mutex_lock(&comp_done_lock); - param->done = true; - param->zero_page = zero_page; - qemu_cond_signal(&comp_done_cond); - qemu_mutex_unlock(&comp_done_lock); - - qemu_mutex_lock(¶m->mutex); - } else { - qemu_cond_wait(¶m->cond, ¶m->mutex); - } - } - qemu_mutex_unlock(¶m->mutex); - - return NULL; -} - -static void compress_threads_save_cleanup(void) -{ - int i, thread_count; - - if (!migrate_use_compression() || !comp_param) { - return; - } - - thread_count = migrate_compress_threads(); - for (i = 0; i < thread_count; i++) { - /* - * we use it as a indicator which shows if the thread is - * properly init'd or not - */ - if (!comp_param[i].file) { - break; - } - - qemu_mutex_lock(&comp_param[i].mutex); - comp_param[i].quit = true; - qemu_cond_signal(&comp_param[i].cond); - qemu_mutex_unlock(&comp_param[i].mutex); - - qemu_thread_join(compress_threads + i); - qemu_mutex_destroy(&comp_param[i].mutex); - qemu_cond_destroy(&comp_param[i].cond); - deflateEnd(&comp_param[i].stream); - g_free(comp_param[i].originbuf); - qemu_fclose(comp_param[i].file); - comp_param[i].file = NULL; - } - qemu_mutex_destroy(&comp_done_lock); - qemu_cond_destroy(&comp_done_cond); - g_free(compress_threads); - g_free(comp_param); - compress_threads = NULL; - comp_param = NULL; -} - -static int compress_threads_save_setup(void) -{ - int i, thread_count; - - if (!migrate_use_compression()) { - return 0; - } - thread_count = migrate_compress_threads(); - compress_threads = g_new0(QemuThread, thread_count); - comp_param = g_new0(CompressParam, thread_count); - qemu_cond_init(&comp_done_cond); - qemu_mutex_init(&comp_done_lock); - for (i = 0; i < thread_count; i++) { - comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE); - if (!comp_param[i].originbuf) { - goto exit; - } - - if (deflateInit(&comp_param[i].stream, - migrate_compress_level()) != Z_OK) { - g_free(comp_param[i].originbuf); - goto exit; - } - - /* comp_param[i].file is just used as a dummy buffer to save data, - * set its ops to empty. - */ - comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops); - comp_param[i].done = true; - comp_param[i].quit = false; - qemu_mutex_init(&comp_param[i].mutex); - qemu_cond_init(&comp_param[i].cond); - qemu_thread_create(compress_threads + i, "compress", - do_data_compress, comp_param + i, - QEMU_THREAD_JOINABLE); - } - return 0; - -exit: - compress_threads_save_cleanup(); - return -1; -} - /* Multiple fd's */ #define MULTIFD_MAGIC 0x11223344U @@ -1909,12 +1766,27 @@ exit: return zero_page; } +struct CompressData { + /* filled by migration thread.*/ + RAMBlock *block; + ram_addr_t offset; + + /* filled by compress thread. */ + QEMUFile *file; + z_stream stream; + uint8_t *originbuf; + bool zero_page; + + ThreadRequest request; +}; +typedef struct CompressData CompressData; + static void -update_compress_thread_counts(const CompressParam *param, int bytes_xmit) +update_compress_thread_counts(CompressData *cd, int bytes_xmit) { ram_counters.transferred += bytes_xmit; - if (param->zero_page) { + if (cd->zero_page) { ram_counters.duplicate++; return; } @@ -1924,81 +1796,127 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit) compression_counters.pages++; } +static ThreadRequest *compress_thread_data_init(void) +{ + CompressData *cd = g_new0(CompressData, 1); + + cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE); + if (!cd->originbuf) { + goto exit; + } + + if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) { + g_free(cd->originbuf); + goto exit; + } + + cd->file = qemu_fopen_ops(NULL, &empty_ops); + return &cd->request; + +exit: + g_free(cd); + return NULL; +} + +static void compress_thread_data_fini(ThreadRequest *request) +{ + CompressData *cd = container_of(request, CompressData, request); + + qemu_fclose(cd->file); + deflateEnd(&cd->stream); + g_free(cd->originbuf); + g_free(cd); +} + +static void compress_thread_data_handler(ThreadRequest *request) +{ + CompressData *cd = container_of(request, CompressData, request); + + /* + * if compression fails, it will be indicated by + * migrate_get_current()->to_dst_file. + */ + cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block, + cd->offset, cd->originbuf); +} + +static void compress_thread_data_done(ThreadRequest *request) +{ + CompressData *cd = container_of(request, CompressData, request); + RAMState *rs = ram_state; + int bytes_xmit; + + bytes_xmit = qemu_put_qemu_file(rs->f, cd->file); + update_compress_thread_counts(cd, bytes_xmit); +} + +static Threads *compress_threads; + static bool save_page_use_compression(RAMState *rs); static void flush_compressed_data(RAMState *rs) { - int idx, len, thread_count; - if (!save_page_use_compression(rs)) { return; } - thread_count = migrate_compress_threads(); - qemu_mutex_lock(&comp_done_lock); - for (idx = 0; idx < thread_count; idx++) { - while (!comp_param[idx].done) { - qemu_cond_wait(&comp_done_cond, &comp_done_lock); - } - } - qemu_mutex_unlock(&comp_done_lock); + threads_wait_done(compress_threads); +} - for (idx = 0; idx < thread_count; idx++) { - qemu_mutex_lock(&comp_param[idx].mutex); - if (!comp_param[idx].quit) { - len = qemu_put_qemu_file(rs->f, comp_param[idx].file); - /* - * it's safe to fetch zero_page without holding comp_done_lock - * as there is no further request submitted to the thread, - * i.e, the thread should be waiting for a request at this point. - */ - update_compress_thread_counts(&comp_param[idx], len); - } - qemu_mutex_unlock(&comp_param[idx].mutex); +static void compress_threads_save_cleanup(void) +{ + if (!compress_threads) { + return; } + + threads_destroy(compress_threads); + compress_threads = NULL; } -static inline void set_compress_params(CompressParam *param, RAMBlock *block, - ram_addr_t offset) +static int compress_threads_save_setup(void) { - param->block = block; - param->offset = offset; + if (!migrate_use_compression()) { + return 0; + } + + compress_threads = threads_create(migrate_compress_threads(), + "compress", + DEFAULT_THREAD_RING_SIZE, + compress_thread_data_init, + compress_thread_data_fini, + compress_thread_data_handler, + compress_thread_data_done); + return compress_threads ? 0 : -1; } static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block, ram_addr_t offset) { - int idx, thread_count, bytes_xmit = -1, pages = -1; + CompressData *cd; + ThreadRequest *request; bool wait = migrate_compress_wait_thread(); - thread_count = migrate_compress_threads(); - qemu_mutex_lock(&comp_done_lock); retry: - for (idx = 0; idx < thread_count; idx++) { - if (comp_param[idx].done) { - comp_param[idx].done = false; - bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file); - qemu_mutex_lock(&comp_param[idx].mutex); - set_compress_params(&comp_param[idx], block, offset); - qemu_cond_signal(&comp_param[idx].cond); - qemu_mutex_unlock(&comp_param[idx].mutex); - pages = 1; - update_compress_thread_counts(&comp_param[idx], bytes_xmit); - break; + request = threads_submit_request_prepare(compress_threads); + if (!request) { + /* + * wait for the free thread if the user specifies + * 'compress-wait-thread', otherwise we will post + * the page out in the main thread as normal page. + */ + if (wait) { + cpu_relax(); + goto retry; } - } - /* - * wait for the free thread if the user specifies 'compress-wait-thread', - * otherwise we will post the page out in the main thread as normal page. - */ - if (pages < 0 && wait) { - qemu_cond_wait(&comp_done_cond, &comp_done_lock); - goto retry; - } - qemu_mutex_unlock(&comp_done_lock); + return -1; + } - return pages; + cd = container_of(request, CompressData, request); + cd->block = block; + cd->offset = offset; + threads_submit_request_commit(compress_threads, request); + return 1; } /** From patchwork Tue Oct 16 11:10:06 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10643445 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 0B2B0112B for ; Tue, 16 Oct 2018 11:14:22 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id EEFEA29889 for ; Tue, 16 Oct 2018 11:14:21 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id E250F29891; Tue, 16 Oct 2018 11:14:21 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.7 required=2.0 tests=BAYES_00,DKIM_ADSP_CUSTOM_MED, DKIM_INVALID,DKIM_SIGNED,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=unavailable version=3.3.1 Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by mail.wl.linuxfoundation.org (Postfix) with ESMTPS id 4CA3E29889 for ; Tue, 16 Oct 2018 11:14:21 +0000 (UTC) Received: from localhost ([::1]:57277 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gCNIq-000397-Ka for patchwork-qemu-devel@patchwork.kernel.org; Tue, 16 Oct 2018 07:14:20 -0400 Received: from eggs.gnu.org ([2001:4830:134:3::10]:36764) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gCNFI-0000X1-1X for qemu-devel@nongnu.org; Tue, 16 Oct 2018 07:10:41 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1gCNFE-00031L-2R for qemu-devel@nongnu.org; Tue, 16 Oct 2018 07:10:38 -0400 Received: from mail-pl1-x643.google.com ([2607:f8b0:4864:20::643]:43669) by eggs.gnu.org with esmtps (TLS1.0:RSA_AES_128_CBC_SHA1:16) (Exim 4.71) (envelope-from ) id 1gCNFD-0002t2-Jc for qemu-devel@nongnu.org; Tue, 16 Oct 2018 07:10:35 -0400 Received: by mail-pl1-x643.google.com with SMTP id 30-v6so10844975plb.10 for ; Tue, 16 Oct 2018 04:10:32 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=GSqVKe80SLcXrXY3yXd6DekVVvCEUGj2QlP+Cr+17oM=; b=G8xYOp+tk3zjCA2JXV5ZNiC6p1oAOH/BLOTZsEHoVe+nAI+9efTydVgyBzGFcNaOD/ IEhU7GZPFuNnNFa0pZXeegy3C3C9rFBqtvRKoqQgXJXOX+h6emrZgrO2s65oqPsBBixx UhZGit5tqJWNtBZWYMFiXHdef8LpNrB62bSs2xuVjmimUmP958hYkXcNon+i5kx4zpBu KFfJ1XlPioA0DqMAKOa1a/0KWl4TtnBqix2nI7iHJx5RoVH8vj/X82uW/vOa2YTpvy3T 9/hZ734/MGMgfVmmeKyHmJ2CHc33YrzDAbaKd+gbLf1o6Il39GldNX/RQ5zKbs6x3XMD q09g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=GSqVKe80SLcXrXY3yXd6DekVVvCEUGj2QlP+Cr+17oM=; b=avvRQMlIDFIfV3vRv6uvsdzI7Xl/2KQy6MdiFP+DLKr8LApvVvCyLJ+4VQvz2014R/ SNRovX+OkH7L/KJx5BAXYNydGJqKg3LPvy58XumbujyY7KMR6pFDyzBH8RGEAhy/hdgv z+XXjTAsBhVyK7wUhf+YAmcVORHynnnRNF14X6KmMgwehyn0Xw9XWUSKwougGNjm92za UuX2INCiAenHXnlPB5mWsbmmUpMoLJ6fNWxVS65DxS5/3i8l8cqG4xeHM92wYMK1CfzR h94/HAgxUx5frLn5wCJzekMobgqxKPex6Y6W3J/veNzmznFHzcC0Y6hEhyBgieMjVP7u A8Eg== X-Gm-Message-State: ABuFfohL3jSFI3O6udwBZtkD6E5rgEImUWAqokLNBjr5MClJ1lR1VhB5 wBk8Wmjf+5oOXGEfuinfK8o= X-Google-Smtp-Source: ACcGV63GyJowHQOXRmDroMjsZ62b0JMc5pADYySEaBC+/sIeevD93vgEZcL6wMGuw1oNh+FEvEerGw== X-Received: by 2002:a17:902:124:: with SMTP id 33-v6mr21422048plb.205.1539688231213; Tue, 16 Oct 2018 04:10:31 -0700 (PDT) Received: from localhost.localdomain ([203.205.141.40]) by smtp.gmail.com with ESMTPSA id p24-v6sm18054927pgm.70.2018.10.16.04.10.27 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Tue, 16 Oct 2018 04:10:30 -0700 (PDT) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Date: Tue, 16 Oct 2018 19:10:06 +0800 Message-Id: <20181016111006.629-5-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.4 In-Reply-To: <20181016111006.629-1-xiaoguangrong@tencent.com> References: <20181016111006.629-1-xiaoguangrong@tencent.com> X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 2607:f8b0:4864:20::643 Subject: [Qemu-devel] [PATCH 4/4] migration: use lockless Multithread model for decompression X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: kvm@vger.kernel.org, quintela@redhat.com, Xiao Guangrong , qemu-devel@nongnu.org, peterx@redhat.com, dgilbert@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn Errors-To: qemu-devel-bounces+patchwork-qemu-devel=patchwork.kernel.org@nongnu.org Sender: "Qemu-devel" X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong Adapt the compression code to the lockless multithread model Signed-off-by: Xiao Guangrong --- migration/ram.c | 223 ++++++++++++++++++++------------------------------------ 1 file changed, 78 insertions(+), 145 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 2356bc255c..99dc9d1911 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -350,25 +350,9 @@ typedef struct PageSearchStatus PageSearchStatus; CompressionStats compression_counters; -struct DecompressParam { - bool done; - bool quit; - QemuMutex mutex; - QemuCond cond; - void *des; - uint8_t *compbuf; - int len; - z_stream stream; -}; -typedef struct DecompressParam DecompressParam; - static const QEMUFileOps empty_ops = { }; static QEMUFile *decomp_file; -static DecompressParam *decomp_param; -static QemuThread *decompress_threads; -static QemuMutex decomp_done_lock; -static QemuCond decomp_done_cond; /* Multiple fd's */ @@ -3382,6 +3366,7 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size) } } + /* return the size after decompression, or negative value on error */ static int qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, @@ -3407,166 +3392,114 @@ qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, return stream->total_out; } -static void *do_data_decompress(void *opaque) -{ - DecompressParam *param = opaque; - unsigned long pagesize; - uint8_t *des; - int len, ret; - - qemu_mutex_lock(¶m->mutex); - while (!param->quit) { - if (param->des) { - des = param->des; - len = param->len; - param->des = 0; - qemu_mutex_unlock(¶m->mutex); - - pagesize = TARGET_PAGE_SIZE; - - ret = qemu_uncompress_data(¶m->stream, des, pagesize, - param->compbuf, len); - if (ret < 0 && migrate_get_current()->decompress_error_check) { - error_report("decompress data failed"); - qemu_file_set_error(decomp_file, ret); - } +struct DecompressData { + /* filled by migration thread.*/ + void *des; + uint8_t *compbuf; + size_t len; - qemu_mutex_lock(&decomp_done_lock); - param->done = true; - qemu_cond_signal(&decomp_done_cond); - qemu_mutex_unlock(&decomp_done_lock); + z_stream stream; + ThreadRequest request; +}; +typedef struct DecompressData DecompressData; - qemu_mutex_lock(¶m->mutex); - } else { - qemu_cond_wait(¶m->cond, ¶m->mutex); - } +static Threads *decompress_threads; + +static ThreadRequest *decompress_thread_data_init(void) +{ + DecompressData *dd = g_new0(DecompressData, 1); + + if (inflateInit(&dd->stream) != Z_OK) { + g_free(dd); + return NULL; } - qemu_mutex_unlock(¶m->mutex); - return NULL; + dd->compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); + return &dd->request; } -static int wait_for_decompress_done(void) +static void decompress_thread_data_fini(ThreadRequest *request) { - int idx, thread_count; + DecompressData *dd = container_of(request, DecompressData, request); - if (!migrate_use_compression()) { - return 0; - } + inflateEnd(&dd->stream); + g_free(dd->compbuf); + g_free(dd); +} - thread_count = migrate_decompress_threads(); - qemu_mutex_lock(&decomp_done_lock); - for (idx = 0; idx < thread_count; idx++) { - while (!decomp_param[idx].done) { - qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); - } +static void decompress_thread_data_handler(ThreadRequest *request) +{ + DecompressData *dd = container_of(request, DecompressData, request); + unsigned long pagesize = TARGET_PAGE_SIZE; + int ret; + + ret = qemu_uncompress_data(&dd->stream, dd->des, pagesize, + dd->compbuf, dd->len); + if (ret < 0 && migrate_get_current()->decompress_error_check) { + error_report("decompress data failed"); + qemu_file_set_error(decomp_file, ret); } - qemu_mutex_unlock(&decomp_done_lock); - return qemu_file_get_error(decomp_file); } -static void compress_threads_load_cleanup(void) +static void decompress_thread_data_done(ThreadRequest *data) { - int i, thread_count; +} +static int decompress_init(QEMUFile *f) +{ if (!migrate_use_compression()) { - return; + return 0; } - thread_count = migrate_decompress_threads(); - for (i = 0; i < thread_count; i++) { - /* - * we use it as a indicator which shows if the thread is - * properly init'd or not - */ - if (!decomp_param[i].compbuf) { - break; - } - qemu_mutex_lock(&decomp_param[i].mutex); - decomp_param[i].quit = true; - qemu_cond_signal(&decomp_param[i].cond); - qemu_mutex_unlock(&decomp_param[i].mutex); - } - for (i = 0; i < thread_count; i++) { - if (!decomp_param[i].compbuf) { - break; - } + decomp_file = f; + decompress_threads = threads_create(migrate_decompress_threads(), + "decompress", + DEFAULT_THREAD_RING_SIZE, + decompress_thread_data_init, + decompress_thread_data_fini, + decompress_thread_data_handler, + decompress_thread_data_done); + return decompress_threads ? 0 : -1; +} - qemu_thread_join(decompress_threads + i); - qemu_mutex_destroy(&decomp_param[i].mutex); - qemu_cond_destroy(&decomp_param[i].cond); - inflateEnd(&decomp_param[i].stream); - g_free(decomp_param[i].compbuf); - decomp_param[i].compbuf = NULL; +static void decompress_fini(void) +{ + if (!decompress_threads) { + return; } - g_free(decompress_threads); - g_free(decomp_param); + + threads_destroy(decompress_threads); decompress_threads = NULL; - decomp_param = NULL; decomp_file = NULL; } -static int compress_threads_load_setup(QEMUFile *f) +static int flush_decompressed_data(void) { - int i, thread_count; - if (!migrate_use_compression()) { return 0; } - thread_count = migrate_decompress_threads(); - decompress_threads = g_new0(QemuThread, thread_count); - decomp_param = g_new0(DecompressParam, thread_count); - qemu_mutex_init(&decomp_done_lock); - qemu_cond_init(&decomp_done_cond); - decomp_file = f; - for (i = 0; i < thread_count; i++) { - if (inflateInit(&decomp_param[i].stream) != Z_OK) { - goto exit; - } - - decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); - qemu_mutex_init(&decomp_param[i].mutex); - qemu_cond_init(&decomp_param[i].cond); - decomp_param[i].done = true; - decomp_param[i].quit = false; - qemu_thread_create(decompress_threads + i, "decompress", - do_data_decompress, decomp_param + i, - QEMU_THREAD_JOINABLE); - } - return 0; -exit: - compress_threads_load_cleanup(); - return -1; + threads_wait_done(decompress_threads); + return qemu_file_get_error(decomp_file); } static void decompress_data_with_multi_threads(QEMUFile *f, - void *host, int len) + void *host, size_t len) { - int idx, thread_count; + ThreadRequest *request; + DecompressData *dd; - thread_count = migrate_decompress_threads(); - qemu_mutex_lock(&decomp_done_lock); - while (true) { - for (idx = 0; idx < thread_count; idx++) { - if (decomp_param[idx].done) { - decomp_param[idx].done = false; - qemu_mutex_lock(&decomp_param[idx].mutex); - qemu_get_buffer(f, decomp_param[idx].compbuf, len); - decomp_param[idx].des = host; - decomp_param[idx].len = len; - qemu_cond_signal(&decomp_param[idx].cond); - qemu_mutex_unlock(&decomp_param[idx].mutex); - break; - } - } - if (idx < thread_count) { - break; - } else { - qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); - } +retry: + request = threads_submit_request_prepare(decompress_threads); + if (!request) { + goto retry; } - qemu_mutex_unlock(&decomp_done_lock); + + dd = container_of(request, DecompressData, request); + dd->des = host; + dd->len = len; + qemu_get_buffer(f, dd->compbuf, len); + threads_submit_request_commit(decompress_threads, request); } /** @@ -3579,7 +3512,7 @@ static void decompress_data_with_multi_threads(QEMUFile *f, */ static int ram_load_setup(QEMUFile *f, void *opaque) { - if (compress_threads_load_setup(f)) { + if (decompress_init(f)) { return -1; } @@ -3599,7 +3532,7 @@ static int ram_load_cleanup(void *opaque) } xbzrle_load_cleanup(); - compress_threads_load_cleanup(); + decompress_fini(); RAMBLOCK_FOREACH_MIGRATABLE(rb) { g_free(rb->receivedmap); @@ -3949,7 +3882,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) } } - ret |= wait_for_decompress_done(); + ret |= flush_decompressed_data(); rcu_read_unlock(); trace_ram_load_complete(ret, seq_iter); return ret;