From patchwork Thu Nov 22 07:20:24 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10693603 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 7EEB615A7 for ; Thu, 22 Nov 2018 07:22:13 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 61A172CBCB for ; Thu, 22 Nov 2018 07:22:13 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 55E3D2CBD8; Thu, 22 Nov 2018 07:22:13 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.7 required=2.0 tests=BAYES_00,DKIM_ADSP_CUSTOM_MED, DKIM_INVALID,DKIM_SIGNED,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by mail.wl.linuxfoundation.org (Postfix) with ESMTPS id 7D1942CBCB for ; Thu, 22 Nov 2018 07:22:11 +0000 (UTC) Received: from localhost ([::1]:44379 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gPjJS-0003hh-NJ for patchwork-qemu-devel@patchwork.kernel.org; Thu, 22 Nov 2018 02:22:10 -0500 Received: from eggs.gnu.org ([2001:4830:134:3::10]:51125) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gPjI6-0002gZ-PY for qemu-devel@nongnu.org; Thu, 22 Nov 2018 02:20:47 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1gPjI1-0001mw-So for qemu-devel@nongnu.org; Thu, 22 Nov 2018 02:20:46 -0500 Received: from mail-pf1-x443.google.com ([2607:f8b0:4864:20::443]:45932) by eggs.gnu.org with esmtps (TLS1.0:RSA_AES_128_CBC_SHA1:16) (Exim 4.71) (envelope-from ) id 1gPjI1-0001lo-MD for qemu-devel@nongnu.org; Thu, 22 Nov 2018 02:20:41 -0500 Received: by mail-pf1-x443.google.com with SMTP id g62so1485358pfd.12 for ; Wed, 21 Nov 2018 23:20:41 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=8yuKyyw8d4l95Odfc54RiKR7Oe6eGv/wZcUX85zjzVA=; b=fTfpmCft4PTzkGnE09K2JFzCUG1MNp4nj1q/RwYt4+W7IBHkl+E45kMJNMhwKJrxMZ AIaAqvO3gkEJzdvfWdEHOJjvVRf2UO7nlUPda501yeflFYfHlvdj2dGJ48fcujbv709u e14uYsMKvse/ptCnjXPGLLlYBMEVRbHORCVUydnoZF+8YnNs9YLPF1vVjyy2gFNXeeeT HVwkRucMH0aE+On8w5E32We9jcCmGCZ12OjmcYQrjTmV7qCYt2fBTqOjLIcCI6AHYdrV dlFN/Ppxv1xOIo4ZFUSzE7vIB6K886dlWczh2rX/MfdlxAWzerR3RS26YHG231R5EzkV Khiw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=8yuKyyw8d4l95Odfc54RiKR7Oe6eGv/wZcUX85zjzVA=; b=CzO2beuwL/gx+OtO2qkX3kOZHyPTpQQoKG60MwJA+TVdSk0JlV0+8LZSe3CXwe+TMq yv2/zN8LSMipEcgCDbd/gheXCTuOgGTirfQz5+L4Jq+aUqe1s7VFXb/WJkQQW9bjxNwo 9tnix0Pb4Kb+6IQFe56eYvwFxgt98UmedzBXikrp46DhAhqDJakCCZlPFPXFjmCdZMLw g6qDiZmnyNTeedOk2W+6BI++8AtKpk1LLqS0oKVS6foEaMVM6GBoknmq/KBawvbjWC2L 3pvw+RKSAhFOm7sLszXcZJn7xq7NLs6JudmTxfQ28+ziFY64dx56ws8BUSDr/WzkMFIq lLSA== X-Gm-Message-State: AA+aEWYkq2L7O3kpp5g3H5lptTOw5GCTG7hlpsfr0OSEwwXiuRBrqs8D R64ADsSN5szjZiuy+Jt6wgg= X-Google-Smtp-Source: AJdET5eVKFUZJJZQsiU0j9H9aInN5gEHJNj52CgJnvTilAMZNE5Vc9MeJj53tWuLezulXR4zJbumPA== X-Received: by 2002:a63:f141:: with SMTP id o1mr9135320pgk.134.1542871240862; Wed, 21 Nov 2018 23:20:40 -0800 (PST) Received: from localhost.localdomain ([203.205.141.36]) by smtp.gmail.com with ESMTPSA id 19sm52731312pfs.108.2018.11.21.23.20.36 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Wed, 21 Nov 2018 23:20:40 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Date: Thu, 22 Nov 2018 15:20:24 +0800 Message-Id: <20181122072028.22819-2-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181122072028.22819-1-xiaoguangrong@tencent.com> References: <20181122072028.22819-1-xiaoguangrong@tencent.com> X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 2607:f8b0:4864:20::443 Subject: [Qemu-devel] [PATCH v3 1/5] bitops: introduce change_bit_atomic X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: kvm@vger.kernel.org, quintela@redhat.com, Xiao Guangrong , qemu-devel@nongnu.org, peterx@redhat.com, dgilbert@redhat.com, wei.w.wang@intel.com, cota@braap.org, jiang.biao2@zte.com.cn Errors-To: qemu-devel-bounces+patchwork-qemu-devel=patchwork.kernel.org@nongnu.org Sender: "Qemu-devel" X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong It will be used by threaded workqueue Signed-off-by: Xiao Guangrong Reviewed-by: Dr. David Alan Gilbert Reviewed-by: Juan Quintela --- include/qemu/bitops.h | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/include/qemu/bitops.h b/include/qemu/bitops.h index 3f0926cf40..c522958852 100644 --- a/include/qemu/bitops.h +++ b/include/qemu/bitops.h @@ -79,6 +79,19 @@ static inline void change_bit(long nr, unsigned long *addr) *p ^= mask; } +/** + * change_bit_atomic - Toggle a bit in memory atomically + * @nr: Bit to change + * @addr: Address to start counting from + */ +static inline void change_bit_atomic(long nr, unsigned long *addr) +{ + unsigned long mask = BIT_MASK(nr); + unsigned long *p = addr + BIT_WORD(nr); + + atomic_xor(p, mask); +} + /** * test_and_set_bit - Set a bit and return its old value * @nr: Bit to set From patchwork Thu Nov 22 07:20:25 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10693605 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 30B7B5A4 for ; Thu, 22 Nov 2018 07:22:23 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 0FE682CBBB for ; Thu, 22 Nov 2018 07:22:23 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 038652CBD5; Thu, 22 Nov 2018 07:22:22 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.7 required=2.0 tests=BAYES_00,DKIM_ADSP_CUSTOM_MED, DKIM_INVALID,DKIM_SIGNED,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=unavailable version=3.3.1 Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by mail.wl.linuxfoundation.org (Postfix) with ESMTPS id ACED12CBBB for ; Thu, 22 Nov 2018 07:22:21 +0000 (UTC) Received: from localhost ([::1]:44380 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gPjJc-0003q3-T4 for patchwork-qemu-devel@patchwork.kernel.org; Thu, 22 Nov 2018 02:22:20 -0500 Received: from eggs.gnu.org ([2001:4830:134:3::10]:51188) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gPjI9-0002hs-Sr for qemu-devel@nongnu.org; Thu, 22 Nov 2018 02:20:52 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1gPjI7-0001y0-8A for qemu-devel@nongnu.org; Thu, 22 Nov 2018 02:20:49 -0500 Received: from mail-pf1-x441.google.com ([2607:f8b0:4864:20::441]:33871) by eggs.gnu.org with esmtps (TLS1.0:RSA_AES_128_CBC_SHA1:16) (Exim 4.71) (envelope-from ) id 1gPjI6-0001uz-S8 for qemu-devel@nongnu.org; Thu, 22 Nov 2018 02:20:47 -0500 Received: by mail-pf1-x441.google.com with SMTP id h3so1509840pfg.1 for ; Wed, 21 Nov 2018 23:20:46 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references :mime-version:content-transfer-encoding; bh=rax0SfV08nU6hzgMl3qgHqkzDVovsOb8w02R0HHxB98=; b=lUrOS14dg65z7Ciy/jCUX3X8N0I/zT5JLoqXduK+Icvi2qzqy1c9i6WMbVma41v9I4 3vW8FE9akuA7FaIeFtRfje8VhuC3WIU8Nh+gAiBKxI87j3TwLMHC8s4q/YZw4oHno0l0 thgGBPHToUV0/g7o8VxN0QfQlo70nh8b5NStLPzvtY9L0Sm7VCKUGLy7E40Otqh6Jwtv yL/5p3aMK0iztyl6OXLoyAKnYClby+AXTg/l3IaJXTnXQzganyfqo+IeS5t9aejPVsBE CqFlEXOmrTWWgnuRZANSALuRHjVUBmrAOK9DH8oSeQK5Omk1xRP13nXyFgKhoBa8fm9t 0kKw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references:mime-version:content-transfer-encoding; bh=rax0SfV08nU6hzgMl3qgHqkzDVovsOb8w02R0HHxB98=; b=GZFJhtNLKlGc0ryXB4GKemG6Cib+ESaDKs3eiiLhxfeMkILPaBXhF7lj1W1VzR44wY 07syW2TYjyZ9dJyBQ/Qp0E9ARSh57z37bEFakcZQvEvXuYXf0HZMNpEgbRtOmhpHjGOa toJbfHAv3jWo3r5dIlgoQD7Xs0YAvT27pwHOkkSMlr516uT1jOiKgWDAO5eSiXZAS9np FIUQ4GbaRMLvs/T9Ci/5R7VScNnFEoSBnL1I4j8FUYM3Lay+nPSkfF+olF0Abpmhl+4p t+h+K0fjVei2mF8lZGPVxKRdbVp9p6c+0DJZV42MkbSErCYCPa32c8qqJCflk0oaGXL2 DNFQ== X-Gm-Message-State: AA+aEWYDB7hnaBrD4f5xh62OudMIFj+HeiJWtw6sMvoWYJ1PsTmbGW7v YPojDwiKulmu8r9a699swOdJQv37 X-Google-Smtp-Source: AFSGD/WvgefpkXSrjAyWYYTfxGNyCNJ9cjKKVFveBLbmg2vFTI/ABvhzyhkW2/oDLAceTPS7ZXoFKw== X-Received: by 2002:a63:104d:: with SMTP id 13mr9007481pgq.303.1542871245669; Wed, 21 Nov 2018 23:20:45 -0800 (PST) Received: from localhost.localdomain ([203.205.141.36]) by smtp.gmail.com with ESMTPSA id 19sm52731312pfs.108.2018.11.21.23.20.41 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Wed, 21 Nov 2018 23:20:45 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Date: Thu, 22 Nov 2018 15:20:25 +0800 Message-Id: <20181122072028.22819-3-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181122072028.22819-1-xiaoguangrong@tencent.com> References: <20181122072028.22819-1-xiaoguangrong@tencent.com> MIME-Version: 1.0 X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 2607:f8b0:4864:20::441 Subject: [Qemu-devel] [PATCH v3 2/5] util: introduce threaded workqueue X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: kvm@vger.kernel.org, quintela@redhat.com, Xiao Guangrong , qemu-devel@nongnu.org, peterx@redhat.com, dgilbert@redhat.com, wei.w.wang@intel.com, cota@braap.org, jiang.biao2@zte.com.cn Errors-To: qemu-devel-bounces+patchwork-qemu-devel=patchwork.kernel.org@nongnu.org Sender: "Qemu-devel" X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong This modules implements the lockless and efficient threaded workqueue. Three abstracted objects are used in this module: - Request. It not only contains the data that the workqueue fetches out to finish the request but also offers the space to save the result after the workqueue handles the request. It's flowed between user and workqueue. The user fills the request data into it when it is owned by user. After it is submitted to the workqueue, the workqueue fetched data out and save the result into it after the request is handled. All the requests are pre-allocated and carefully partitioned between threads so there is no contention on the request, that make threads be parallel as much as possible. - User, i.e, the submitter It's the one fills the request and submits it to the workqueue, the result will be collected after it is handled by the work queue. The user can consecutively submit requests without waiting the previous requests been handled. It only supports one submitter, you should do serial submission by yourself if you want more, e.g, use lock on you side. - Workqueue, i.e, thread Each workqueue is represented by a running thread that fetches the request submitted by the user, do the specified work and save the result to the request. Signed-off-by: Xiao Guangrong --- include/qemu/threaded-workqueue.h | 106 +++++++++ util/Makefile.objs | 1 + util/threaded-workqueue.c | 463 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 570 insertions(+) create mode 100644 include/qemu/threaded-workqueue.h create mode 100644 util/threaded-workqueue.c diff --git a/include/qemu/threaded-workqueue.h b/include/qemu/threaded-workqueue.h new file mode 100644 index 0000000000..e0ede496d0 --- /dev/null +++ b/include/qemu/threaded-workqueue.h @@ -0,0 +1,106 @@ +/* + * Lockless and Efficient Threaded Workqueue Abstraction + * + * Author: + * Xiao Guangrong + * + * Copyright(C) 2018 Tencent Corporation. + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#ifndef QEMU_THREADED_WORKQUEUE_H +#define QEMU_THREADED_WORKQUEUE_H + +#include "qemu/queue.h" +#include "qemu/thread.h" + +/* + * This modules implements the lockless and efficient threaded workqueue. + * + * Three abstracted objects are used in this module: + * - Request. + * It not only contains the data that the workqueue fetches out + * to finish the request but also offers the space to save the result + * after the workqueue handles the request. + * + * It's flowed between user and workqueue. The user fills the request + * data into it when it is owned by user. After it is submitted to the + * workqueue, the workqueue fetched data out and save the result into + * it after the request is handled. + * + * All the requests are pre-allocated and carefully partitioned between + * threads so there is no contention on the request, that make threads + * be parallel as much as possible. + * + * - User, i.e, the submitter + * It's the one fills the request and submits it to the workqueue, + * the result will be collected after it is handled by the work queue. + * + * The user can consecutively submit requests without waiting the previous + * requests been handled. + * It only supports one submitter, you should do serial submission by + * yourself if you want more, e.g, use lock on you side. + * + * - Workqueue, i.e, thread + * Each workqueue is represented by a running thread that fetches + * the request submitted by the user, do the specified work and save + * the result to the request. + */ + +typedef struct Threads Threads; + +struct ThreadedWorkqueueOps { + /* constructor of the request */ + int (*thread_request_init)(void *request); + /* destructor of the request */ + void (*thread_request_uninit)(void *request); + + /* the handler of the request that is called by the thread */ + void (*thread_request_handler)(void *request); + /* called by the user after the request has been handled */ + void (*thread_request_done)(void *request); + + size_t request_size; +}; +typedef struct ThreadedWorkqueueOps ThreadedWorkqueueOps; + +/* the default number of requests that thread need handle */ +#define DEFAULT_THREAD_REQUEST_NR 4 +/* the max number of requests that thread need handle */ +#define MAX_THREAD_REQUEST_NR (sizeof(uint64_t) * BITS_PER_BYTE) + +/* + * create a threaded queue. Other APIs will work on the Threads it returned + * + * @name: the identity of the workqueue which is used to construct the name + * of threads only + * @threads_nr: the number of threads that the workqueue will create + * @thread_requests_nr: the number of requests that each single thread will + * handle + * @ops: the handlers of the request + * + * Return NULL if it failed + */ +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, + unsigned int thread_requests_nr, + const ThreadedWorkqueueOps *ops); +void threaded_workqueue_destroy(Threads *threads); + +/* + * find a free request where the user can store the data that is needed to + * finish the request + * + * If all requests are used up, return NULL + */ +void *threaded_workqueue_get_request(Threads *threads); +/* submit the request and notify the thread */ +void threaded_workqueue_submit_request(Threads *threads, void *request); + +/* + * wait all threads to complete the request to make sure there is no + * previous request exists + */ +void threaded_workqueue_wait_for_requests(Threads *threads); +#endif diff --git a/util/Makefile.objs b/util/Makefile.objs index 0820923c18..f26dfe5182 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -50,5 +50,6 @@ util-obj-y += range.o util-obj-y += stats64.o util-obj-y += systemd.o util-obj-y += iova-tree.o +util-obj-y += threaded-workqueue.o util-obj-$(CONFIG_LINUX) += vfio-helpers.o util-obj-$(CONFIG_OPENGL) += drm.o diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c new file mode 100644 index 0000000000..2ab37cee8d --- /dev/null +++ b/util/threaded-workqueue.c @@ -0,0 +1,463 @@ +/* + * Lockless and Efficient Threaded Workqueue Abstraction + * + * Author: + * Xiao Guangrong + * + * Copyright(C) 2018 Tencent Corporation. + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#include "qemu/osdep.h" +#include "qemu/bitmap.h" +#include "qemu/threaded-workqueue.h" + +#define SMP_CACHE_BYTES 64 + +/* + * the request representation which contains the internally used mete data, + * it is the header of user-defined data. + * + * It should be aligned to the nature size of CPU. + */ +struct ThreadRequest { + /* + * the request has been handled by the thread and need the user + * to fetch result out. + */ + uint8_t done; + + /* + * the index to Thread::requests. + * Save it to the padding space although it can be calculated at runtime. + */ + uint8_t request_index; + + /* the index to Threads::per_thread_data */ + unsigned int thread_index; +} QEMU_ALIGNED(sizeof(unsigned long)); +typedef struct ThreadRequest ThreadRequest; + +struct ThreadLocal { + struct Threads *threads; + + /* the index of the thread */ + int self; + + /* thread is useless and needs to exit */ + bool quit; + + QemuThread thread; + + void *requests; + + /* + * the bit in these two bitmaps indicates the index of the ï¼ requests + * respectively. If it's the same, the corresponding request is free + * and owned by the user, i.e, where the user fills a request. Otherwise, + * it is valid and owned by the thread, i.e, where the thread fetches + * the request and write the result. + */ + + /* after the user fills the request, the bit is flipped. */ + uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); + /* after handles the request, the thread flips the bit. */ + uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); + + /* + * the event used to wake up the thread whenever a valid request has + * been submitted + */ + QemuEvent request_valid_ev QEMU_ALIGNED(SMP_CACHE_BYTES); + + /* + * the event is notified whenever a request has been completed + * (i.e, become free), which is used to wake up the user + */ + QemuEvent request_free_ev QEMU_ALIGNED(SMP_CACHE_BYTES); +}; +typedef struct ThreadLocal ThreadLocal; + +/* + * the main data struct represents multithreads which is shared by + * all threads + */ +struct Threads { + /* the request header, ThreadRequest, is contained */ + unsigned int request_size; + unsigned int thread_requests_nr; + unsigned int threads_nr; + + /* the request is pushed to the thread with round-robin manner */ + unsigned int current_thread_index; + + const ThreadedWorkqueueOps *ops; + + ThreadLocal per_thread_data[0]; +}; +typedef struct Threads Threads; + +static ThreadRequest *index_to_request(ThreadLocal *thread, int request_index) +{ + ThreadRequest *request; + + request = thread->requests + request_index * thread->threads->request_size; + assert(request->request_index == request_index); + assert(request->thread_index == thread->self); + return request; +} + +static int request_to_index(ThreadRequest *request) +{ + return request->request_index; +} + +static int request_to_thread_index(ThreadRequest *request) +{ + return request->thread_index; +} + +/* + * free request: the request is not used by any thread, however, it might + * contain the result need the user to call thread_request_done() + * + * valid request: the request contains the request data and it's committed + * to the thread, i,e. it's owned by thread. + */ +static uint64_t get_free_request_bitmap(Threads *threads, ThreadLocal *thread) +{ + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; + + request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap); + request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap); + bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap, + threads->thread_requests_nr); + + /* + * paired with smp_wmb() in mark_request_free() to make sure that we + * read request_done_bitmap before fetching the result out. + */ + smp_rmb(); + + return result_bitmap; +} + +static ThreadRequest +*find_thread_free_request(Threads *threads, ThreadLocal *thread) +{ + uint64_t result_bitmap = get_free_request_bitmap(threads, thread); + int index; + + index = find_first_zero_bit(&result_bitmap, threads->thread_requests_nr); + if (index >= threads->thread_requests_nr) { + return NULL; + } + + return index_to_request(thread, index); +} + +static ThreadRequest *threads_find_free_request(Threads *threads) +{ + ThreadLocal *thread; + ThreadRequest *request; + int cur_thread, thread_index; + + cur_thread = threads->current_thread_index % threads->threads_nr; + thread_index = cur_thread; + do { + thread = threads->per_thread_data + thread_index++; + request = find_thread_free_request(threads, thread); + if (request) { + break; + } + thread_index %= threads->threads_nr; + } while (thread_index != cur_thread); + + return request; +} + +/* + * the change bit operation combined with READ_ONCE and WRITE_ONCE which + * only works on single uint64_t width + */ +static void change_bit_once(long nr, uint64_t *addr) +{ + uint64_t value = atomic_rcu_read(addr) ^ BIT_MASK(nr); + + atomic_rcu_set(addr, value); +} + +static void mark_request_valid(Threads *threads, ThreadRequest *request) +{ + int thread_index = request_to_thread_index(request); + int request_index = request_to_index(request); + ThreadLocal *thread = threads->per_thread_data + thread_index; + + /* + * paired with smp_rmb() in find_first_valid_request_index() to make + * sure the request has been filled before the bit is flipped that + * will make the request be visible to the thread + */ + smp_wmb(); + + change_bit_once(request_index, &thread->request_fill_bitmap); + qemu_event_set(&thread->request_valid_ev); +} + +static int thread_find_first_valid_request_index(ThreadLocal *thread) +{ + Threads *threads = thread->threads; + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; + int index; + + request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap); + request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap); + bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap, + threads->thread_requests_nr); + /* + * paired with smp_wmb() in mark_request_valid() to make sure that + * we read request_fill_bitmap before fetch the request out. + */ + smp_rmb(); + + index = find_first_bit(&result_bitmap, threads->thread_requests_nr); + return index >= threads->thread_requests_nr ? -1 : index; +} + +static void mark_request_free(ThreadLocal *thread, ThreadRequest *request) +{ + int index = request_to_index(request); + + /* + * smp_wmb() is implied in change_bit_atomic() that is paired with + * smp_rmb() in get_free_request_bitmap() to make sure the result + * has been saved before the bit is flipped. + */ + change_bit_atomic(index, &thread->request_done_bitmap); + qemu_event_set(&thread->request_free_ev); +} + +/* retry to see if there is available request before actually go to wait. */ +#define BUSY_WAIT_COUNT 1000 + +static ThreadRequest * +thread_busy_wait_for_request(ThreadLocal *thread) +{ + int index, count = 0; + + for (count = 0; count < BUSY_WAIT_COUNT; count++) { + index = thread_find_first_valid_request_index(thread); + if (index >= 0) { + return index_to_request(thread, index); + } + + cpu_relax(); + } + + return NULL; +} + +static void *thread_run(void *opaque) +{ + ThreadLocal *self_data = (ThreadLocal *)opaque; + Threads *threads = self_data->threads; + void (*handler)(void *request) = threads->ops->thread_request_handler; + ThreadRequest *request; + + for ( ; !atomic_read(&self_data->quit); ) { + qemu_event_reset(&self_data->request_valid_ev); + + request = thread_busy_wait_for_request(self_data); + if (!request) { + qemu_event_wait(&self_data->request_valid_ev); + continue; + } + + assert(!request->done); + + handler(request + 1); + request->done = true; + mark_request_free(self_data, request); + } + + return NULL; +} + +static void uninit_thread_requests(ThreadLocal *thread, int free_nr) +{ + Threads *threads = thread->threads; + ThreadRequest *request = thread->requests; + int i; + + for (i = 0; i < free_nr; i++) { + threads->ops->thread_request_uninit(request + 1); + request = (void *)request + threads->request_size; + } + g_free(thread->requests); +} + +static int init_thread_requests(ThreadLocal *thread) +{ + Threads *threads = thread->threads; + ThreadRequest *request; + int ret, i, thread_reqs_size; + + thread_reqs_size = threads->thread_requests_nr * threads->request_size; + thread_reqs_size = QEMU_ALIGN_UP(thread_reqs_size, SMP_CACHE_BYTES); + thread->requests = g_malloc0(thread_reqs_size); + + request = thread->requests; + for (i = 0; i < threads->thread_requests_nr; i++) { + ret = threads->ops->thread_request_init(request + 1); + if (ret < 0) { + goto exit; + } + + request->request_index = i; + request->thread_index = thread->self; + request = (void *)request + threads->request_size; + } + return 0; + +exit: + uninit_thread_requests(thread, i); + return -1; +} + +static void uninit_thread_data(Threads *threads, int free_nr) +{ + ThreadLocal *thread_local = threads->per_thread_data; + int i; + + for (i = 0; i < free_nr; i++) { + thread_local[i].quit = true; + qemu_event_set(&thread_local[i].request_valid_ev); + qemu_thread_join(&thread_local[i].thread); + qemu_event_destroy(&thread_local[i].request_valid_ev); + qemu_event_destroy(&thread_local[i].request_free_ev); + uninit_thread_requests(&thread_local[i], threads->thread_requests_nr); + } +} + +static int +init_thread_data(Threads *threads, const char *thread_name, int thread_nr) +{ + ThreadLocal *thread_local = threads->per_thread_data; + char *name; + int i; + + for (i = 0; i < thread_nr; i++) { + thread_local[i].threads = threads; + thread_local[i].self = i; + + if (init_thread_requests(&thread_local[i]) < 0) { + goto exit; + } + + qemu_event_init(&thread_local[i].request_free_ev, false); + qemu_event_init(&thread_local[i].request_valid_ev, false); + + name = g_strdup_printf("%s/%d", thread_name, thread_local[i].self); + qemu_thread_create(&thread_local[i].thread, name, + thread_run, &thread_local[i], QEMU_THREAD_JOINABLE); + g_free(name); + } + return 0; + +exit: + uninit_thread_data(threads, i); + return -1; +} + +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, + unsigned int thread_requests_nr, + const ThreadedWorkqueueOps *ops) +{ + Threads *threads; + + if (threads_nr > MAX_THREAD_REQUEST_NR) { + return NULL; + } + + threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal)); + threads->ops = ops; + threads->threads_nr = threads_nr; + threads->thread_requests_nr = thread_requests_nr; + + QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(sizeof(ThreadRequest), sizeof(long))); + threads->request_size = threads->ops->request_size; + threads->request_size = QEMU_ALIGN_UP(threads->request_size, sizeof(long)); + threads->request_size += sizeof(ThreadRequest); + + if (init_thread_data(threads, name, threads_nr) < 0) { + g_free(threads); + return NULL; + } + + return threads; +} + +void threaded_workqueue_destroy(Threads *threads) +{ + uninit_thread_data(threads, threads->threads_nr); + g_free(threads); +} + +static void request_done(Threads *threads, ThreadRequest *request) +{ + if (!request->done) { + return; + } + + threads->ops->thread_request_done(request + 1); + request->done = false; +} + +void *threaded_workqueue_get_request(Threads *threads) +{ + ThreadRequest *request; + + request = threads_find_free_request(threads); + if (!request) { + return NULL; + } + + request_done(threads, request); + return request + 1; +} + +void threaded_workqueue_submit_request(Threads *threads, void *request) +{ + ThreadRequest *req = request - sizeof(ThreadRequest); + int thread_index = request_to_thread_index(request); + + assert(!req->done); + mark_request_valid(threads, req); + threads->current_thread_index = thread_index + 1; +} + +void threaded_workqueue_wait_for_requests(Threads *threads) +{ + ThreadLocal *thread; + uint64_t result_bitmap; + int thread_index, index = 0; + + for (thread_index = 0; thread_index < threads->threads_nr; thread_index++) { + thread = threads->per_thread_data + thread_index; + index = 0; +retry: + qemu_event_reset(&thread->request_free_ev); + result_bitmap = get_free_request_bitmap(threads, thread); + + for (; index < threads->thread_requests_nr; index++) { + if (test_bit(index, &result_bitmap)) { + qemu_event_wait(&thread->request_free_ev); + goto retry; + } + + request_done(threads, index_to_request(thread, index)); + } + } +} From patchwork Thu Nov 22 07:20:26 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10693609 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 79D5113BB for ; Thu, 22 Nov 2018 07:24:51 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 4DE9A2CA42 for ; Thu, 22 Nov 2018 07:24:51 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 40B492CAD9; Thu, 22 Nov 2018 07:24:51 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.7 required=2.0 tests=BAYES_00,DKIM_ADSP_CUSTOM_MED, DKIM_INVALID,DKIM_SIGNED,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=unavailable version=3.3.1 Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by mail.wl.linuxfoundation.org (Postfix) with ESMTPS id 7910B2CA42 for ; Thu, 22 Nov 2018 07:24:50 +0000 (UTC) Received: from localhost ([::1]:44401 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gPjM1-00061a-OV for patchwork-qemu-devel@patchwork.kernel.org; Thu, 22 Nov 2018 02:24:49 -0500 Received: from eggs.gnu.org ([2001:4830:134:3::10]:51259) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gPjIC-0002kB-LR for qemu-devel@nongnu.org; Thu, 22 Nov 2018 02:20:56 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1gPjIB-00026z-89 for qemu-devel@nongnu.org; Thu, 22 Nov 2018 02:20:52 -0500 Received: from mail-pf1-x442.google.com ([2607:f8b0:4864:20::442]:35215) by eggs.gnu.org with esmtps (TLS1.0:RSA_AES_128_CBC_SHA1:16) (Exim 4.71) (envelope-from ) id 1gPjIA-00024k-SS for qemu-devel@nongnu.org; Thu, 22 Nov 2018 02:20:51 -0500 Received: by mail-pf1-x442.google.com with SMTP id z9so1506307pfi.2 for ; Wed, 21 Nov 2018 23:20:50 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=vEVvrbhEOiEpCLe1eJCTeurBkl6VvXXUL0twGNSbhZ0=; b=WkvxnV0brQ/LgDT5RpD2kULRtqBjZaqnN0lAiDpbztxo/qbamf19NJ8+Xzzvw43SmV oWqCmL4e5upkZrA1gIeoeOZot2GzjbHO5yR1vh95IZqvzocqZzctgSiW0wc4jvDYKCIX 6Gh+8q1N4YbMlkEGgiuauLlRrOfFHJyY/MAWIzeVHkIx6kGDZljJfM9HxAXkODc6FS2B FGz0SGPn9iih9x1hONNaY/EIOxkWhlslhCNHLdvVuRDdkDCGTynKFfvzx6bBPZlDvAYg B84oxL+ycQcmTbirsuyorCPerONxyydO5yRXIM8seHYVZ4HZ9hCm7tXFtOBzdgwK21Sm t1Tw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=vEVvrbhEOiEpCLe1eJCTeurBkl6VvXXUL0twGNSbhZ0=; b=jgU4M3D0we53loC/QAOJiBRu7xk/f/6LaYYWvD0nn920iZYmkehPQuvj3eIxlRqG8B YHN6JvScy9Zhgqedr9DdSG3GziLgVLdaU7DJY5XqVbsnyMX0Cw+HvJtN4GmoqruzWTCR lrnipKv6dlPcigLGb/Acx119kPea+T60Bh+y5WESU2rZPOuUgLsL9QWAIrZM7mu3IR1V puB+axI0GZ/usXat+7rvJV1Iupv3eNSbHIvCl6dIVUaTKCTqM8lAk7p5hyu+X3ew0fiZ 6WaQWLhPGHMppLPLh9gJrEjgV91+pBR6PZsDAgFjjolRKb/BRFFM4mr6da5bMdDv+axC zUeA== X-Gm-Message-State: AA+aEWZaSY0TmQk7IqEEF67KwVyr5IV7CI1T7PLW3CskPGQ7vXVFsEu9 GSlZXg064AEZbSjK6usgy94= X-Google-Smtp-Source: AFSGD/UWYboi1+yi4bFDtCg2e+8XSh0I5poeMrFe3pxuClwDoLzpweRxsYAwPCzlSg7G7Pe/85o/fw== X-Received: by 2002:a63:3c44:: with SMTP id i4mr8850868pgn.286.1542871249873; Wed, 21 Nov 2018 23:20:49 -0800 (PST) Received: from localhost.localdomain ([203.205.141.36]) by smtp.gmail.com with ESMTPSA id 19sm52731312pfs.108.2018.11.21.23.20.45 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Wed, 21 Nov 2018 23:20:49 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Date: Thu, 22 Nov 2018 15:20:26 +0800 Message-Id: <20181122072028.22819-4-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181122072028.22819-1-xiaoguangrong@tencent.com> References: <20181122072028.22819-1-xiaoguangrong@tencent.com> X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 2607:f8b0:4864:20::442 Subject: [Qemu-devel] [PATCH v3 3/5] migration: use threaded workqueue for compression X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: kvm@vger.kernel.org, quintela@redhat.com, Xiao Guangrong , qemu-devel@nongnu.org, peterx@redhat.com, dgilbert@redhat.com, wei.w.wang@intel.com, cota@braap.org, jiang.biao2@zte.com.cn Errors-To: qemu-devel-bounces+patchwork-qemu-devel=patchwork.kernel.org@nongnu.org Sender: "Qemu-devel" X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong Adapt the compression code to the threaded workqueue Signed-off-by: Xiao Guangrong --- migration/ram.c | 308 ++++++++++++++++++++------------------------------------ 1 file changed, 110 insertions(+), 198 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 7e7deec4d8..254c08f27b 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -57,6 +57,7 @@ #include "qemu/uuid.h" #include "savevm.h" #include "qemu/iov.h" +#include "qemu/threaded-workqueue.h" /***********************************************************/ /* ram save/restore */ @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus; CompressionStats compression_counters; -struct CompressParam { - bool done; - bool quit; - bool zero_page; - QEMUFile *file; - QemuMutex mutex; - QemuCond cond; - RAMBlock *block; - ram_addr_t offset; - - /* internally used fields */ - z_stream stream; - uint8_t *originbuf; -}; -typedef struct CompressParam CompressParam; - struct DecompressParam { bool done; bool quit; @@ -377,15 +362,6 @@ struct DecompressParam { }; typedef struct DecompressParam DecompressParam; -static CompressParam *comp_param; -static QemuThread *compress_threads; -/* comp_done_cond is used to wake up the migration thread when - * one of the compression threads has finished the compression. - * comp_done_lock is used to co-work with comp_done_cond. - */ -static QemuMutex comp_done_lock; -static QemuCond comp_done_cond; -/* The empty QEMUFileOps will be used by file in CompressParam */ static const QEMUFileOps empty_ops = { }; static QEMUFile *decomp_file; @@ -394,125 +370,6 @@ static QemuThread *decompress_threads; static QemuMutex decomp_done_lock; static QemuCond decomp_done_cond; -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, - ram_addr_t offset, uint8_t *source_buf); - -static void *do_data_compress(void *opaque) -{ - CompressParam *param = opaque; - RAMBlock *block; - ram_addr_t offset; - bool zero_page; - - qemu_mutex_lock(¶m->mutex); - while (!param->quit) { - if (param->block) { - block = param->block; - offset = param->offset; - param->block = NULL; - qemu_mutex_unlock(¶m->mutex); - - zero_page = do_compress_ram_page(param->file, ¶m->stream, - block, offset, param->originbuf); - - qemu_mutex_lock(&comp_done_lock); - param->done = true; - param->zero_page = zero_page; - qemu_cond_signal(&comp_done_cond); - qemu_mutex_unlock(&comp_done_lock); - - qemu_mutex_lock(¶m->mutex); - } else { - qemu_cond_wait(¶m->cond, ¶m->mutex); - } - } - qemu_mutex_unlock(¶m->mutex); - - return NULL; -} - -static void compress_threads_save_cleanup(void) -{ - int i, thread_count; - - if (!migrate_use_compression() || !comp_param) { - return; - } - - thread_count = migrate_compress_threads(); - for (i = 0; i < thread_count; i++) { - /* - * we use it as a indicator which shows if the thread is - * properly init'd or not - */ - if (!comp_param[i].file) { - break; - } - - qemu_mutex_lock(&comp_param[i].mutex); - comp_param[i].quit = true; - qemu_cond_signal(&comp_param[i].cond); - qemu_mutex_unlock(&comp_param[i].mutex); - - qemu_thread_join(compress_threads + i); - qemu_mutex_destroy(&comp_param[i].mutex); - qemu_cond_destroy(&comp_param[i].cond); - deflateEnd(&comp_param[i].stream); - g_free(comp_param[i].originbuf); - qemu_fclose(comp_param[i].file); - comp_param[i].file = NULL; - } - qemu_mutex_destroy(&comp_done_lock); - qemu_cond_destroy(&comp_done_cond); - g_free(compress_threads); - g_free(comp_param); - compress_threads = NULL; - comp_param = NULL; -} - -static int compress_threads_save_setup(void) -{ - int i, thread_count; - - if (!migrate_use_compression()) { - return 0; - } - thread_count = migrate_compress_threads(); - compress_threads = g_new0(QemuThread, thread_count); - comp_param = g_new0(CompressParam, thread_count); - qemu_cond_init(&comp_done_cond); - qemu_mutex_init(&comp_done_lock); - for (i = 0; i < thread_count; i++) { - comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE); - if (!comp_param[i].originbuf) { - goto exit; - } - - if (deflateInit(&comp_param[i].stream, - migrate_compress_level()) != Z_OK) { - g_free(comp_param[i].originbuf); - goto exit; - } - - /* comp_param[i].file is just used as a dummy buffer to save data, - * set its ops to empty. - */ - comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops); - comp_param[i].done = true; - comp_param[i].quit = false; - qemu_mutex_init(&comp_param[i].mutex); - qemu_cond_init(&comp_param[i].cond); - qemu_thread_create(compress_threads + i, "compress", - do_data_compress, comp_param + i, - QEMU_THREAD_JOINABLE); - } - return 0; - -exit: - compress_threads_save_cleanup(); - return -1; -} - /* Multiple fd's */ #define MULTIFD_MAGIC 0x11223344U @@ -1909,12 +1766,25 @@ exit: return zero_page; } +struct CompressData { + /* filled by migration thread.*/ + RAMBlock *block; + ram_addr_t offset; + + /* filled by compress thread. */ + QEMUFile *file; + z_stream stream; + uint8_t *originbuf; + bool zero_page; +}; +typedef struct CompressData CompressData; + static void -update_compress_thread_counts(const CompressParam *param, int bytes_xmit) +update_compress_thread_counts(CompressData *cd, int bytes_xmit) { ram_counters.transferred += bytes_xmit; - if (param->zero_page) { + if (cd->zero_page) { ram_counters.duplicate++; return; } @@ -1924,81 +1794,123 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit) compression_counters.pages++; } +static int compress_thread_data_init(void *request) +{ + CompressData *cd = request; + + cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE); + if (!cd->originbuf) { + return -1; + } + + if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) { + g_free(cd->originbuf); + return -1; + } + + cd->file = qemu_fopen_ops(NULL, &empty_ops); + return 0; +} + +static void compress_thread_data_fini(void *request) +{ + CompressData *cd = request; + + qemu_fclose(cd->file); + deflateEnd(&cd->stream); + g_free(cd->originbuf); +} + +static void compress_thread_data_handler(void *request) +{ + CompressData *cd = request; + + /* + * if compression fails, it will be indicated by + * migrate_get_current()->to_dst_file. + */ + cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block, + cd->offset, cd->originbuf); +} + +static void compress_thread_data_done(void *request) +{ + CompressData *cd = request; + RAMState *rs = ram_state; + int bytes_xmit; + + bytes_xmit = qemu_put_qemu_file(rs->f, cd->file); + update_compress_thread_counts(cd, bytes_xmit); +} + +static const ThreadedWorkqueueOps compress_ops = { + .thread_request_init = compress_thread_data_init, + .thread_request_uninit = compress_thread_data_fini, + .thread_request_handler = compress_thread_data_handler, + .thread_request_done = compress_thread_data_done, + .request_size = sizeof(CompressData), +}; + +static Threads *compress_threads; + static bool save_page_use_compression(RAMState *rs); static void flush_compressed_data(RAMState *rs) { - int idx, len, thread_count; - if (!save_page_use_compression(rs)) { return; } - thread_count = migrate_compress_threads(); - qemu_mutex_lock(&comp_done_lock); - for (idx = 0; idx < thread_count; idx++) { - while (!comp_param[idx].done) { - qemu_cond_wait(&comp_done_cond, &comp_done_lock); - } - } - qemu_mutex_unlock(&comp_done_lock); + threaded_workqueue_wait_for_requests(compress_threads); +} - for (idx = 0; idx < thread_count; idx++) { - qemu_mutex_lock(&comp_param[idx].mutex); - if (!comp_param[idx].quit) { - len = qemu_put_qemu_file(rs->f, comp_param[idx].file); - /* - * it's safe to fetch zero_page without holding comp_done_lock - * as there is no further request submitted to the thread, - * i.e, the thread should be waiting for a request at this point. - */ - update_compress_thread_counts(&comp_param[idx], len); - } - qemu_mutex_unlock(&comp_param[idx].mutex); +static void compress_threads_save_cleanup(void) +{ + if (!compress_threads) { + return; } + + threaded_workqueue_destroy(compress_threads); + compress_threads = NULL; } -static inline void set_compress_params(CompressParam *param, RAMBlock *block, - ram_addr_t offset) +static int compress_threads_save_setup(void) { - param->block = block; - param->offset = offset; + if (!migrate_use_compression()) { + return 0; + } + + compress_threads = threaded_workqueue_create("compress", + migrate_compress_threads(), + DEFAULT_THREAD_REQUEST_NR, &compress_ops); + return compress_threads ? 0 : -1; } static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block, ram_addr_t offset) { - int idx, thread_count, bytes_xmit = -1, pages = -1; + CompressData *cd; bool wait = migrate_compress_wait_thread(); - thread_count = migrate_compress_threads(); - qemu_mutex_lock(&comp_done_lock); retry: - for (idx = 0; idx < thread_count; idx++) { - if (comp_param[idx].done) { - comp_param[idx].done = false; - bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file); - qemu_mutex_lock(&comp_param[idx].mutex); - set_compress_params(&comp_param[idx], block, offset); - qemu_cond_signal(&comp_param[idx].cond); - qemu_mutex_unlock(&comp_param[idx].mutex); - pages = 1; - update_compress_thread_counts(&comp_param[idx], bytes_xmit); - break; + cd = threaded_workqueue_get_request(compress_threads); + if (!cd) { + /* + * wait for the free thread if the user specifies + * 'compress-wait-thread', otherwise we will post + * the page out in the main thread as normal page. + */ + if (wait) { + cpu_relax(); + goto retry; } - } - /* - * wait for the free thread if the user specifies 'compress-wait-thread', - * otherwise we will post the page out in the main thread as normal page. - */ - if (pages < 0 && wait) { - qemu_cond_wait(&comp_done_cond, &comp_done_lock); - goto retry; - } - qemu_mutex_unlock(&comp_done_lock); - - return pages; + return -1; + } + cd->block = block; + cd->offset = offset; + threaded_workqueue_submit_request(compress_threads, cd); + return 1; } /** From patchwork Thu Nov 22 07:20:27 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10693607 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 4166415A7 for ; Thu, 22 Nov 2018 07:22:42 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 21F3B2CBBB for ; Thu, 22 Nov 2018 07:22:42 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 15FB62CBD5; Thu, 22 Nov 2018 07:22:42 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.7 required=2.0 tests=BAYES_00,DKIM_ADSP_CUSTOM_MED, DKIM_INVALID,DKIM_SIGNED,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=unavailable version=3.3.1 Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by mail.wl.linuxfoundation.org (Postfix) with ESMTPS id 61B5D2CBBB for ; Thu, 22 Nov 2018 07:22:41 +0000 (UTC) Received: from localhost ([::1]:44381 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gPjJw-00046V-Kl for patchwork-qemu-devel@patchwork.kernel.org; Thu, 22 Nov 2018 02:22:40 -0500 Received: from eggs.gnu.org ([2001:4830:134:3::10]:51431) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gPjIK-0002rK-LN for qemu-devel@nongnu.org; Thu, 22 Nov 2018 02:21:07 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1gPjIF-0002EH-GR for qemu-devel@nongnu.org; Thu, 22 Nov 2018 02:21:00 -0500 Received: from mail-pl1-x644.google.com ([2607:f8b0:4864:20::644]:34479) by eggs.gnu.org with esmtps (TLS1.0:RSA_AES_128_CBC_SHA1:16) (Exim 4.71) (envelope-from ) id 1gPjIF-0002Cs-5z for qemu-devel@nongnu.org; Thu, 22 Nov 2018 02:20:55 -0500 Received: by mail-pl1-x644.google.com with SMTP id f12-v6so8843059plo.1 for ; Wed, 21 Nov 2018 23:20:55 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=61mMAJkubPYEEOer1T50gaBWivxzWtm0scPJxeVz4QI=; b=eL8T++jF0IgiWOr0/u3Ii28zGri7CNpzMQ9y78nqIFxmkLtWpWXUjnN3wu1zkJ5HVc DtOQtgHxGwDSAXNdRDTwkSBY/yz2Grz6Edoylv/hGP2fbQlDcAYJUkmg2f91AGTPdhMC Xsr6KdDZ5RFgdE0rlwDfDVADbiL/ahsh0HFXecTIkpNCzaopQEoVwEgqUDhTJ1At8kPU tWcp570OXqJd/eBqYjDiHyqLO6Gd4Uwd7dqNEWSb1TX9tEK318JTKJocZr4s7iunGzI6 6xxYv2PyVcBQtzX2e+I8tYXYfXeHPXJbsZoseyBYJc7D4wOJakkeRn7t6cdxSdnEO1I6 hl5A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=61mMAJkubPYEEOer1T50gaBWivxzWtm0scPJxeVz4QI=; b=WchJBQiSWtB2QOJjl7nMXVW71niDuC13JPOL7P3ANrzJAWTpSEjVBG0xAXqAIlaIR4 +/lLpAuw9MJ4Ltu3yah2/sfxXSa5ZoDBRnAYTJjxJ1JzMw50+kF351gaHtJ2FOsTlQI9 e9Abnrlgvr82spEjLb/KMDHQZ1cXMtK8JoXqG/u3DFoybNrryCPpcVL6aJMS2kRwuydS ZJpClV1oIAOr+QtRRoRuvQW6ZCBKNroBcejpYiymRWlKTTmu0CpiQgl8ezUZOQG3UGry kxqb0bZUmiRoQL8i/HLR+4LzQVmU4AOwoPxguIYrUUQHFQUbzeZfS8qKpMT8XpQYyJlM KSZA== X-Gm-Message-State: AA+aEWaSsMwSmgJ/WIsSD2LPEhWbGy2kQ2l+/Pjum1Ic1qpCjF1JK6uO sCMMFnG5rSyC+besb32tQPA= X-Google-Smtp-Source: AFSGD/W1LAZVCQyr9XN2zhPxj8EVEvn4Ih3h55XtvesmXHaXKBa3+670V/N1Z2JNSwaoyVxOpSVfvQ== X-Received: by 2002:a17:902:43e4:: with SMTP id j91mr9857335pld.147.1542871254089; Wed, 21 Nov 2018 23:20:54 -0800 (PST) Received: from localhost.localdomain ([203.205.141.36]) by smtp.gmail.com with ESMTPSA id 19sm52731312pfs.108.2018.11.21.23.20.50 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Wed, 21 Nov 2018 23:20:53 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Date: Thu, 22 Nov 2018 15:20:27 +0800 Message-Id: <20181122072028.22819-5-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181122072028.22819-1-xiaoguangrong@tencent.com> References: <20181122072028.22819-1-xiaoguangrong@tencent.com> X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 2607:f8b0:4864:20::644 Subject: [Qemu-devel] [PATCH v3 4/5] migration: use threaded workqueue for decompression X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: kvm@vger.kernel.org, quintela@redhat.com, Xiao Guangrong , qemu-devel@nongnu.org, peterx@redhat.com, dgilbert@redhat.com, wei.w.wang@intel.com, cota@braap.org, jiang.biao2@zte.com.cn Errors-To: qemu-devel-bounces+patchwork-qemu-devel=patchwork.kernel.org@nongnu.org Sender: "Qemu-devel" X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong Adapt the compression code to the threaded workqueue Signed-off-by: Xiao Guangrong --- migration/ram.c | 222 ++++++++++++++++++++------------------------------------ 1 file changed, 77 insertions(+), 145 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 254c08f27b..ccec59c35e 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -350,25 +350,9 @@ typedef struct PageSearchStatus PageSearchStatus; CompressionStats compression_counters; -struct DecompressParam { - bool done; - bool quit; - QemuMutex mutex; - QemuCond cond; - void *des; - uint8_t *compbuf; - int len; - z_stream stream; -}; -typedef struct DecompressParam DecompressParam; - static const QEMUFileOps empty_ops = { }; static QEMUFile *decomp_file; -static DecompressParam *decomp_param; -static QemuThread *decompress_threads; -static QemuMutex decomp_done_lock; -static QemuCond decomp_done_cond; /* Multiple fd's */ @@ -3399,6 +3383,7 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size) } } + /* return the size after decompression, or negative value on error */ static int qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, @@ -3424,166 +3409,113 @@ qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, return stream->total_out; } -static void *do_data_decompress(void *opaque) -{ - DecompressParam *param = opaque; - unsigned long pagesize; - uint8_t *des; - int len, ret; - - qemu_mutex_lock(¶m->mutex); - while (!param->quit) { - if (param->des) { - des = param->des; - len = param->len; - param->des = 0; - qemu_mutex_unlock(¶m->mutex); - - pagesize = TARGET_PAGE_SIZE; - - ret = qemu_uncompress_data(¶m->stream, des, pagesize, - param->compbuf, len); - if (ret < 0 && migrate_get_current()->decompress_error_check) { - error_report("decompress data failed"); - qemu_file_set_error(decomp_file, ret); - } +struct DecompressData { + /* filled by migration thread.*/ + void *des; + uint8_t *compbuf; + size_t len; - qemu_mutex_lock(&decomp_done_lock); - param->done = true; - qemu_cond_signal(&decomp_done_cond); - qemu_mutex_unlock(&decomp_done_lock); + z_stream stream; +}; +typedef struct DecompressData DecompressData; - qemu_mutex_lock(¶m->mutex); - } else { - qemu_cond_wait(¶m->cond, ¶m->mutex); - } +static Threads *decompress_threads; + +static int decompress_thread_data_init(void *request) +{ + DecompressData *dd = request; + + if (inflateInit(&dd->stream) != Z_OK) { + return -1; } - qemu_mutex_unlock(¶m->mutex); - return NULL; + dd->compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); + return 0; } -static int wait_for_decompress_done(void) +static void decompress_thread_data_fini(void *request) { - int idx, thread_count; + DecompressData *dd = request; - if (!migrate_use_compression()) { - return 0; - } + inflateEnd(&dd->stream); + g_free(dd->compbuf); +} - thread_count = migrate_decompress_threads(); - qemu_mutex_lock(&decomp_done_lock); - for (idx = 0; idx < thread_count; idx++) { - while (!decomp_param[idx].done) { - qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); - } +static void decompress_thread_data_handler(void *request) +{ + DecompressData *dd = request; + unsigned long pagesize = TARGET_PAGE_SIZE; + int ret; + + ret = qemu_uncompress_data(&dd->stream, dd->des, pagesize, + dd->compbuf, dd->len); + if (ret < 0 && migrate_get_current()->decompress_error_check) { + error_report("decompress data failed"); + qemu_file_set_error(decomp_file, ret); } - qemu_mutex_unlock(&decomp_done_lock); - return qemu_file_get_error(decomp_file); } -static void compress_threads_load_cleanup(void) +static void decompress_thread_data_done(void *request) { - int i, thread_count; +} + +static const ThreadedWorkqueueOps decompress_ops = { + .thread_request_init = decompress_thread_data_init, + .thread_request_uninit = decompress_thread_data_fini, + .thread_request_handler = decompress_thread_data_handler, + .thread_request_done = decompress_thread_data_done, + .request_size = sizeof(DecompressData), +}; +static int decompress_init(QEMUFile *f) +{ if (!migrate_use_compression()) { - return; + return 0; } - thread_count = migrate_decompress_threads(); - for (i = 0; i < thread_count; i++) { - /* - * we use it as a indicator which shows if the thread is - * properly init'd or not - */ - if (!decomp_param[i].compbuf) { - break; - } - qemu_mutex_lock(&decomp_param[i].mutex); - decomp_param[i].quit = true; - qemu_cond_signal(&decomp_param[i].cond); - qemu_mutex_unlock(&decomp_param[i].mutex); - } - for (i = 0; i < thread_count; i++) { - if (!decomp_param[i].compbuf) { - break; - } + decomp_file = f; + decompress_threads = threaded_workqueue_create("decompress", + migrate_decompress_threads(), + DEFAULT_THREAD_REQUEST_NR, &decompress_ops); + return decompress_threads ? 0 : -1; +} - qemu_thread_join(decompress_threads + i); - qemu_mutex_destroy(&decomp_param[i].mutex); - qemu_cond_destroy(&decomp_param[i].cond); - inflateEnd(&decomp_param[i].stream); - g_free(decomp_param[i].compbuf); - decomp_param[i].compbuf = NULL; +static void decompress_fini(void) +{ + if (!decompress_threads) { + return; } - g_free(decompress_threads); - g_free(decomp_param); + + threaded_workqueue_destroy(decompress_threads); decompress_threads = NULL; - decomp_param = NULL; decomp_file = NULL; } -static int compress_threads_load_setup(QEMUFile *f) +static int flush_decompressed_data(void) { - int i, thread_count; - if (!migrate_use_compression()) { return 0; } - thread_count = migrate_decompress_threads(); - decompress_threads = g_new0(QemuThread, thread_count); - decomp_param = g_new0(DecompressParam, thread_count); - qemu_mutex_init(&decomp_done_lock); - qemu_cond_init(&decomp_done_cond); - decomp_file = f; - for (i = 0; i < thread_count; i++) { - if (inflateInit(&decomp_param[i].stream) != Z_OK) { - goto exit; - } - - decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); - qemu_mutex_init(&decomp_param[i].mutex); - qemu_cond_init(&decomp_param[i].cond); - decomp_param[i].done = true; - decomp_param[i].quit = false; - qemu_thread_create(decompress_threads + i, "decompress", - do_data_decompress, decomp_param + i, - QEMU_THREAD_JOINABLE); - } - return 0; -exit: - compress_threads_load_cleanup(); - return -1; + threaded_workqueue_wait_for_requests(decompress_threads); + return qemu_file_get_error(decomp_file); } static void decompress_data_with_multi_threads(QEMUFile *f, - void *host, int len) + void *host, size_t len) { - int idx, thread_count; + DecompressData *dd; - thread_count = migrate_decompress_threads(); - qemu_mutex_lock(&decomp_done_lock); - while (true) { - for (idx = 0; idx < thread_count; idx++) { - if (decomp_param[idx].done) { - decomp_param[idx].done = false; - qemu_mutex_lock(&decomp_param[idx].mutex); - qemu_get_buffer(f, decomp_param[idx].compbuf, len); - decomp_param[idx].des = host; - decomp_param[idx].len = len; - qemu_cond_signal(&decomp_param[idx].cond); - qemu_mutex_unlock(&decomp_param[idx].mutex); - break; - } - } - if (idx < thread_count) { - break; - } else { - qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); - } +retry: + dd = threaded_workqueue_get_request(decompress_threads); + if (!dd) { + goto retry; } - qemu_mutex_unlock(&decomp_done_lock); + + dd->des = host; + dd->len = len; + qemu_get_buffer(f, dd->compbuf, len); + threaded_workqueue_submit_request(decompress_threads, dd); } /* @@ -3678,7 +3610,7 @@ void colo_release_ram_cache(void) */ static int ram_load_setup(QEMUFile *f, void *opaque) { - if (compress_threads_load_setup(f)) { + if (decompress_init(f)) { return -1; } @@ -3699,7 +3631,7 @@ static int ram_load_cleanup(void *opaque) } xbzrle_load_cleanup(); - compress_threads_load_cleanup(); + decompress_fini(); RAMBLOCK_FOREACH_MIGRATABLE(rb) { g_free(rb->receivedmap); @@ -4101,7 +4033,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) } } - ret |= wait_for_decompress_done(); + ret |= flush_decompressed_data(); rcu_read_unlock(); trace_ram_load_complete(ret, seq_iter); From patchwork Thu Nov 22 07:20:28 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10693611 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id ED21815A7 for ; Thu, 22 Nov 2018 07:25:05 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id C9E202CBBB for ; Thu, 22 Nov 2018 07:25:05 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id B79EE2CBD5; Thu, 22 Nov 2018 07:25:05 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.7 required=2.0 tests=BAYES_00,DKIM_ADSP_CUSTOM_MED, DKIM_INVALID,DKIM_SIGNED,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=unavailable version=3.3.1 Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by mail.wl.linuxfoundation.org (Postfix) with ESMTPS id 16E102CBBB for ; Thu, 22 Nov 2018 07:25:05 +0000 (UTC) Received: from localhost ([::1]:44402 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gPjMG-0006BI-Bb for patchwork-qemu-devel@patchwork.kernel.org; Thu, 22 Nov 2018 02:25:04 -0500 Received: from eggs.gnu.org ([2001:4830:134:3::10]:51439) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gPjIK-0002re-Vw for qemu-devel@nongnu.org; Thu, 22 Nov 2018 02:21:03 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1gPjIJ-0002LE-IH for qemu-devel@nongnu.org; Thu, 22 Nov 2018 02:21:00 -0500 Received: from mail-pl1-x641.google.com ([2607:f8b0:4864:20::641]:37534) by eggs.gnu.org with esmtps (TLS1.0:RSA_AES_128_CBC_SHA1:16) (Exim 4.71) (envelope-from ) id 1gPjIJ-0002Jh-9M for qemu-devel@nongnu.org; Thu, 22 Nov 2018 02:20:59 -0500 Received: by mail-pl1-x641.google.com with SMTP id b5so8846076plr.4 for ; Wed, 21 Nov 2018 23:20:59 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=wbPjx05ddO3emTOc85i0PBVC/qKf1knWe1gds+ynZDw=; b=VUqc7Zpn1rqjbGpPkPJnnm7vURgq2VoT1R1LLWyWBW+U8qn5/MRB8iRGSPjsYVWkht rw9Xxm8hgRDU40IlA/PAshVmenkXpx699Ogj4GI3u1iP8+np56fDtfCC4sRLUBNJRtXv Qhk70DKxrs5MLPT4GyL6NRwxkrRddoAR4WZ0J0kB7I+XoDXikydOA0X5cIRGsP2n7X4W Gy2cJH37L2G5reMWIzT5JdHwroylQHMx/nJfWHuuw20MKm3wzv+PGjmhG0CXKNDbFv3w f8XCU2q0PVTIDprKrxe5FHd+nnPGH2uU5C6TC0XCvzH8qhdAXVbqniwQ2Taj8HgZ0Ktf s0aw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=wbPjx05ddO3emTOc85i0PBVC/qKf1knWe1gds+ynZDw=; b=rk3ONkMpQiLyQNer0sseyayHigGwY3EHat730ZUTSvDgBimEi0Ro3BoOWIDSGd+DTp 082FtcNwzNDXecQSrJZSmOmnpDjkDzMzTpXiuTqiJkrc3oWSZDK1ozqIxiB4JPaP/ymG idY9BP1leLVcP6kZ7cIdNXfUGbAUgjiFrUsoEUHrJYX17l1B32ZyghP0LKjY/QEWVl2W Je8Fa2EJkcaJejaUuu9i1Nwz/SS80aBzPsigfGyQ4vnWKT6qAEIiK70nJhINxluJQhrj zCUJB3APJyaTZ9qO91NnvcXqL/5LqTqVcX4mCGQcLdcj52J+gRNp50V9vbUMsRNqEpGA 3miw== X-Gm-Message-State: AA+aEWZ9k/dgNtM6Dybi06KJ+4ySU5sF8Gxx7NP/OFl0rBjyPLAuwfFp QBXGfyLtEY6VdNg48bwDA1g= X-Google-Smtp-Source: AFSGD/XeJwyf+9wSMKTgdgd4HE2YPTpMeWIQZQ7zLysRNQhIJ283dKDS6Z6Xt+lv2BJ6KupbF9V7Tg== X-Received: by 2002:a17:902:8214:: with SMTP id x20-v6mr9719409pln.224.1542871258257; Wed, 21 Nov 2018 23:20:58 -0800 (PST) Received: from localhost.localdomain ([203.205.141.36]) by smtp.gmail.com with ESMTPSA id 19sm52731312pfs.108.2018.11.21.23.20.54 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Wed, 21 Nov 2018 23:20:57 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Date: Thu, 22 Nov 2018 15:20:28 +0800 Message-Id: <20181122072028.22819-6-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181122072028.22819-1-xiaoguangrong@tencent.com> References: <20181122072028.22819-1-xiaoguangrong@tencent.com> X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 2607:f8b0:4864:20::641 Subject: [Qemu-devel] [PATCH v3 5/5] tests: add threaded-workqueue-bench X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: kvm@vger.kernel.org, quintela@redhat.com, Xiao Guangrong , qemu-devel@nongnu.org, peterx@redhat.com, dgilbert@redhat.com, wei.w.wang@intel.com, cota@braap.org, jiang.biao2@zte.com.cn Errors-To: qemu-devel-bounces+patchwork-qemu-devel=patchwork.kernel.org@nongnu.org Sender: "Qemu-devel" X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong It's the benhcmark of threaded-workqueue, also it's a good example to show how threaded-workqueue is used Signed-off-by: Xiao Guangrong --- tests/Makefile.include | 5 +- tests/threaded-workqueue-bench.c | 255 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 259 insertions(+), 1 deletion(-) create mode 100644 tests/threaded-workqueue-bench.c diff --git a/tests/Makefile.include b/tests/Makefile.include index 613242bc6e..05ad27e75d 100644 --- a/tests/Makefile.include +++ b/tests/Makefile.include @@ -500,7 +500,8 @@ test-obj-y = tests/check-qnum.o tests/check-qstring.o tests/check-qdict.o \ tests/test-rcu-tailq.o \ tests/test-qdist.o tests/test-shift128.o \ tests/test-qht.o tests/qht-bench.o tests/test-qht-par.o \ - tests/atomic_add-bench.o tests/atomic64-bench.o + tests/atomic_add-bench.o tests/atomic64-bench.o \ + tests/threaded-workqueue-bench.o $(test-obj-y): QEMU_INCLUDES += -Itests QEMU_CFLAGS += -I$(SRC_PATH)/tests @@ -557,6 +558,8 @@ tests/qht-bench$(EXESUF): tests/qht-bench.o $(test-util-obj-y) tests/test-bufferiszero$(EXESUF): tests/test-bufferiszero.o $(test-util-obj-y) tests/atomic_add-bench$(EXESUF): tests/atomic_add-bench.o $(test-util-obj-y) tests/atomic64-bench$(EXESUF): tests/atomic64-bench.o $(test-util-obj-y) +tests/threaded-workqueue-bench$(EXESUF): tests/threaded-workqueue-bench.o migration/qemu-file.o \ + $(test-util-obj-y) tests/fp/%: $(MAKE) -C $(dir $@) $(notdir $@) diff --git a/tests/threaded-workqueue-bench.c b/tests/threaded-workqueue-bench.c new file mode 100644 index 0000000000..0d04948ed3 --- /dev/null +++ b/tests/threaded-workqueue-bench.c @@ -0,0 +1,255 @@ +/* + * Threaded Workqueue Benchmark + * + * Author: + * Xiao Guangrong + * + * Copyright(C) 2018 Tencent Corporation. + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ +#include + +#include "qemu/osdep.h" +#include "exec/cpu-common.h" +#include "qemu/error-report.h" +#include "migration/qemu-file.h" +#include "qemu/threaded-workqueue.h" + +#define PAGE_SHIFT 12 +#define PAGE_SIZE (1 << PAGE_SHIFT) +#define DEFAULT_THREAD_NR 2 +#define DEFAULT_MEM_SIZE 1 +#define DEFAULT_REPEATED_COUNT 3 + +static ssize_t test_writev_buffer(void *opaque, struct iovec *iov, int iovcnt, + int64_t pos) +{ + int i, size = 0; + + for (i = 0; i < iovcnt; i++) { + size += iov[i].iov_len; + } + return size; +} + +static int test_fclose(void *opaque) +{ + return 0; +} + +static const QEMUFileOps test_write_ops = { + .writev_buffer = test_writev_buffer, + .close = test_fclose +}; + +static QEMUFile *dest_file; + +static const QEMUFileOps empty_ops = { }; + +struct CompressData { + uint8_t *ram_addr; + QEMUFile *file; + z_stream stream; +}; +typedef struct CompressData CompressData; + +static int compress_request_init(void *request) +{ + CompressData *cd = request; + + if (deflateInit(&cd->stream, 1) != Z_OK) { + return -1; + } + cd->file = qemu_fopen_ops(NULL, &empty_ops); + return 0; +} + +static void compress_request_uninit(void *request) +{ + CompressData *cd = request; + + qemu_fclose(cd->file); + deflateEnd(&cd->stream); +} + +static void compress_thread_data_handler(void *request) +{ + CompressData *cd = request; + int blen; + + blen = qemu_put_compression_data(cd->file, &cd->stream, cd->ram_addr, + PAGE_SIZE); + if (blen < 0) { + error_report("compressed data failed!"); + qemu_file_set_error(dest_file, blen); + } +} + +struct CompressStats { + unsigned long pages; + unsigned long compressed_size; +}; +typedef struct CompressStats CompressStats; + +static CompressStats comp_stats; + +static void compress_thread_data_done(void *request) +{ + CompressData *cd = request; + int bytes_xmit; + + bytes_xmit = qemu_put_qemu_file(dest_file, cd->file); + + comp_stats.pages++; + comp_stats.compressed_size += bytes_xmit; +} + +static const ThreadedWorkqueueOps ops = { + .thread_request_init = compress_request_init, + .thread_request_uninit = compress_request_uninit, + .thread_request_handler = compress_thread_data_handler, + .thread_request_done = compress_thread_data_done, + .request_size = sizeof(CompressData), +}; + +static void compress_threads_save_cleanup(Threads *threads) +{ + threaded_workqueue_destroy(threads); + qemu_fclose(dest_file); +} + +static Threads *compress_threads_save_setup(int threads_nr, int requests_nr) +{ + Threads *compress_threads; + + dest_file = qemu_fopen_ops(NULL, &test_write_ops); + compress_threads = threaded_workqueue_create("compress", threads_nr, + requests_nr, &ops); + assert(compress_threads); + return compress_threads; +} + +static void compress_page_with_multi_thread(Threads *threads, uint8_t *addr) +{ + CompressData *cd; + +retry: + cd = threaded_workqueue_get_request(threads); + if (!cd) { + goto retry; + } + + cd->ram_addr = addr; + threaded_workqueue_submit_request(threads, cd); +} + +static void run(Threads *threads, uint8_t *mem, unsigned long mem_size, + int repeated_count) +{ + uint8_t *ptr = mem, *end = mem + mem_size; + uint64_t start_ts, spend, total_ts = 0, pages = mem_size >> PAGE_SHIFT; + double rate; + int i; + + for (i = 0; i < repeated_count; i++) { + ptr = mem; + memset(&comp_stats, 0, sizeof(comp_stats)); + + start_ts = g_get_monotonic_time(); + for (ptr = mem; ptr < end; ptr += PAGE_SIZE) { + *ptr = 0x10; + compress_page_with_multi_thread(threads, ptr); + } + threaded_workqueue_wait_for_requests(threads); + spend = g_get_monotonic_time() - start_ts; + total_ts += spend; + + if (comp_stats.pages != pages) { + printf("ERROR: pages are compressed %ld, expect %ld.\n", + comp_stats.pages, pages); + exit(-1); + } + + rate = (double)(comp_stats.pages * PAGE_SIZE) / + comp_stats.compressed_size; + printf("RUN %d: Request # %ld Cost %ld, Compression Rate %f.\n", i, + comp_stats.pages, spend, rate); + } + + printf("AVG: Time Cost %ld\n", total_ts / repeated_count); + printf("AVG Throughput: %f GB/s\n", + (double)(mem_size >> 30) * repeated_count * 1e6 / total_ts); +} + +static void usage(const char *arg0) +{ + printf("\nThreaded Workqueue Benchmark.\n"); + printf("Usage:\n"); + printf(" %s [OPTIONS]\n", arg0); + printf("Options:\n"); + printf(" -t the number of threads (default %d).\n", + DEFAULT_THREAD_NR); + printf(" -r: the number of requests handled by each thread (default %d).\n", + DEFAULT_THREAD_REQUEST_NR); + printf(" -m: the size of the memory (G) used to test (default %dG).\n", + DEFAULT_MEM_SIZE); + printf(" -c: the repeated count (default %d).\n", + DEFAULT_REPEATED_COUNT); + printf(" -h show this help info.\n"); +} + +int main(int argc, char *argv[]) +{ + int c, threads_nr, requests_nr, repeated_count; + unsigned long mem_size; + uint8_t *mem; + Threads *threads; + + threads_nr = DEFAULT_THREAD_NR; + requests_nr = DEFAULT_THREAD_REQUEST_NR; + mem_size = DEFAULT_MEM_SIZE; + repeated_count = DEFAULT_REPEATED_COUNT; + + for (;;) { + c = getopt(argc, argv, "t:r:m:c:h"); + if (c < 0) { + break; + } + + switch (c) { + case 't': + threads_nr = atoi(optarg); + break; + case 'r': + requests_nr = atoi(optarg); + break; + case 'm': + mem_size = atol(optarg); + break; + case 'c': + repeated_count = atoi(optarg); + break; + default: + printf("Unkown option: %c.\n", c); + case 'h': + usage(argv[0]); + return -1; + } + } + + printf("Run the benchmark: threads %d requests-per-thread: %d memory %ldG repeat %d.\n", + threads_nr, requests_nr, mem_size, repeated_count); + + mem_size = mem_size << 30; + mem = qemu_memalign(PAGE_SIZE, mem_size); + memset(mem, 0, mem_size); + + threads = compress_threads_save_setup(threads_nr, requests_nr); + run(threads, mem, mem_size, repeated_count); + compress_threads_save_cleanup(threads); + + qemu_vfree(mem); + return 0; +}