From patchwork Mon Jun 4 09:55:17 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10446243 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork.web.codeaurora.org (Postfix) with ESMTP id 92D7D603D7 for ; Mon, 4 Jun 2018 10:03:42 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 846B027480 for ; Mon, 4 Jun 2018 10:03:42 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 78A3527F97; Mon, 4 Jun 2018 10:03:42 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.8 required=2.0 tests=BAYES_00, DKIM_ADSP_CUSTOM_MED, DKIM_SIGNED, FREEMAIL_FROM, MAILING_LIST_MULTI, RCVD_IN_DNSWL_HI, T_DKIM_INVALID autolearn=unavailable version=3.3.1 Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by mail.wl.linuxfoundation.org (Postfix) with ESMTPS id CE4DE27480 for ; Mon, 4 Jun 2018 10:03:41 +0000 (UTC) Received: from localhost ([::1]:38650 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1fPmKz-0005N7-2q for patchwork-qemu-devel@patchwork.kernel.org; Mon, 04 Jun 2018 06:03:41 -0400 Received: from eggs.gnu.org ([2001:4830:134:3::10]:46587) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1fPmE7-0007mT-71 for qemu-devel@nongnu.org; Mon, 04 Jun 2018 05:56:36 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1fPmE5-0001I3-G3 for qemu-devel@nongnu.org; Mon, 04 Jun 2018 05:56:35 -0400 Received: from mail-pg0-x243.google.com ([2607:f8b0:400e:c05::243]:43792) by eggs.gnu.org with esmtps (TLS1.0:RSA_AES_128_CBC_SHA1:16) (Exim 4.71) (envelope-from ) id 1fPmE5-0001Hb-6u for qemu-devel@nongnu.org; Mon, 04 Jun 2018 05:56:33 -0400 Received: by mail-pg0-x243.google.com with SMTP id p8-v6so14245454pgq.10 for ; Mon, 04 Jun 2018 02:56:33 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=qXkycAqMmbGcGFZKPe690vC4AXBOJPj/HAGZpJQqslU=; b=iI1npKXEiC8xUBD2ekhS57kr/ZgNRwdA5IBNN3Z41AYi9lm8qQ6bb09Xn26V2wyp2F 0akjbkm3qZqQieGokM63iscUCshWFTp9OfTzlnQ2rFijWR2gSteF6n38wv9x6RCr5T11 qtwTToEy44WrZn8xfHwJa6zgNv6LrImlvghqPmWvOzQ8qxodz+BLNwuOAxKBczvgBCv8 pfCmopGyRBpSFmMn5iByuRsrGZr9XVk97nSDhO+fe+CND8YRsdYNk9JTy7h8FSarC89o 2e9JHAhhNllidXX8nnaNwTlm+B6XDaG8yXc+XhnUCl6dx4jWd9FuUoa6QID8GFWDDvs3 2wOw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=qXkycAqMmbGcGFZKPe690vC4AXBOJPj/HAGZpJQqslU=; b=P0hzhEIn0Y2K4/hnBQSt4m5NY7XhC+BIXnB9USq3tBLuhvUxjxSdvJWeraYSr9RlVB jJrwp99HMx6CJz6Ef+ZcUQOcSR3opwzFy9MimCxu2McPkPYatfuplPQ+OSE3LHAzx2sb TP7/5+q79RUr3HYxdDcF8AjMTrP2f0ht+AMZuD9++0d6O4tH/0afrbEsKDzRHVUvkRFN tQ4gociV43oWkct4RIrfCN2mX9ntmbtEaUKU9CfLBqXThK92lrYhLsPTf8UUnzP0iot+ Fo1BakArP7IZRGuvASKD4Y5KIf3re15uDbnYSC035al8kVev9JuEp8QABuxChyDqgckZ XVpQ== X-Gm-Message-State: ALKqPwe/DzvIO63Zy5kK0SMzUg2nu3itVNoxErhya7u5m+hEh51mWFqF Zg/JsgV1WisDrOUCiQxqbZI= X-Google-Smtp-Source: ADUXVKKGBHojH0VYIuqHTYOD8DEaNzhsKfS6mHtySWP4j3rVsQlpiat7TYME7Z2rerEwnPE6Jr3w/g== X-Received: by 2002:a63:770f:: with SMTP id s15-v6mr17086199pgc.30.1528106192142; Mon, 04 Jun 2018 02:56:32 -0700 (PDT) Received: from localhost.localdomain ([203.205.141.35]) by smtp.gmail.com with ESMTPSA id h130-v6sm124502105pfc.98.2018.06.04.02.56.29 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Mon, 04 Jun 2018 02:56:31 -0700 (PDT) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Date: Mon, 4 Jun 2018 17:55:17 +0800 Message-Id: <20180604095520.8563-10-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.4 In-Reply-To: <20180604095520.8563-1-xiaoguangrong@tencent.com> References: <20180604095520.8563-1-xiaoguangrong@tencent.com> X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 2607:f8b0:400e:c05::243 Subject: [Qemu-devel] [PATCH 09/12] ring: introduce lockless ring buffer X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: kvm@vger.kernel.org, Xiao Guangrong , qemu-devel@nongnu.org, peterx@redhat.com, dgilbert@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn Errors-To: qemu-devel-bounces+patchwork-qemu-devel=patchwork.kernel.org@nongnu.org Sender: "Qemu-devel" X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong It's the simple lockless ring buffer implement which supports both single producer vs. single consumer and multiple producers vs. single consumer. Many lessons were learned from Linux Kernel's kfifo (1) and DPDK's rte_ring (2) before i wrote this implement. It corrects some bugs of memory barriers in kfifo and it is the simpler lockless version of rte_ring as currently multiple access is only allowed for producer. If has single producer vs. single consumer, it is the traditional fifo, If has multiple producers, it uses the algorithm as followings: For the producer, it uses two steps to update the ring: - first step, occupy the entry in the ring: retry: in = ring->in if (cmpxhg(&ring->in, in, in +1) != in) goto retry; after that the entry pointed by ring->data[in] has been owned by the producer. assert(ring->data[in] == NULL); Note, no other producer can touch this entry so that this entry should always be the initialized state. - second step, write the data to the entry: ring->data[in] = data; For the consumer, it first checks if there is available entry in the ring and fetches the entry from the ring: if (!ring_is_empty(ring)) entry = &ring[ring->out]; Note: the ring->out has not been updated so that the entry pointed by ring->out is completely owned by the consumer. Then it checks if the data is ready: retry: if (*entry == NULL) goto retry; That means, the producer has updated the index but haven't written any data to it. Finally, it fetches the valid data out, set the entry to the initialized state and update ring->out to make the entry be usable to the producer: data = *entry; *entry = NULL; ring->out++; Memory barrier is omitted here, please refer to the comment in the code. (1) https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h (2) http://dpdk.org/doc/api/rte__ring_8h.html Signed-off-by: Xiao Guangrong --- migration/ring.h | 265 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 265 insertions(+) create mode 100644 migration/ring.h diff --git a/migration/ring.h b/migration/ring.h new file mode 100644 index 0000000000..da9b8bdcbb --- /dev/null +++ b/migration/ring.h @@ -0,0 +1,265 @@ +/* + * Ring Buffer + * + * Multiple producers and single consumer are supported with lock free. + * + * Copyright (c) 2018 Tencent Inc + * + * Authors: + * Xiao Guangrong + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ + +#ifndef _RING__ +#define _RING__ + +#define CACHE_LINE 64 +#define cache_aligned __attribute__((__aligned__(CACHE_LINE))) + +#define RING_MULTI_PRODUCER 0x1 + +struct Ring { + unsigned int flags; + unsigned int size; + unsigned int mask; + + unsigned int in cache_aligned; + + unsigned int out cache_aligned; + + void *data[0] cache_aligned; +}; +typedef struct Ring Ring; + +/* + * allocate and initialize the ring + * + * @size: the number of element, it should be power of 2 + * @flags: set to RING_MULTI_PRODUCER if the ring has multiple producer, + * otherwise set it to 0, i,e. single producer and single consumer. + * + * return the ring. + */ +static inline Ring *ring_alloc(unsigned int size, unsigned int flags) +{ + Ring *ring; + + assert(is_power_of_2(size)); + + ring = g_malloc0(sizeof(*ring) + size * sizeof(void *)); + ring->size = size; + ring->mask = ring->size - 1; + ring->flags = flags; + return ring; +} + +static inline void ring_free(Ring *ring) +{ + g_free(ring); +} + +static inline bool __ring_is_empty(unsigned int in, unsigned int out) +{ + return in == out; +} + +static inline bool ring_is_empty(Ring *ring) +{ + return ring->in == ring->out; +} + +static inline unsigned int ring_len(unsigned int in, unsigned int out) +{ + return in - out; +} + +static inline bool +__ring_is_full(Ring *ring, unsigned int in, unsigned int out) +{ + return ring_len(in, out) > ring->mask; +} + +static inline bool ring_is_full(Ring *ring) +{ + return __ring_is_full(ring, ring->in, ring->out); +} + +static inline unsigned int ring_index(Ring *ring, unsigned int pos) +{ + return pos & ring->mask; +} + +static inline int __ring_put(Ring *ring, void *data) +{ + unsigned int index, out; + + out = atomic_load_acquire(&ring->out); + /* + * smp_mb() + * + * should read ring->out before updating the entry, see the comments in + * __ring_get(). + */ + + if (__ring_is_full(ring, ring->in, out)) { + return -ENOBUFS; + } + + index = ring_index(ring, ring->in); + + atomic_set(&ring->data[index], data); + + /* + * should make sure the entry is updated before increasing ring->in + * otherwise the consumer will get a entry but its content is useless. + */ + smp_wmb(); + atomic_set(&ring->in, ring->in + 1); + return 0; +} + +static inline void *__ring_get(Ring *ring) +{ + unsigned int index, in; + void *data; + + in = atomic_read(&ring->in); + + /* + * should read ring->in first to make sure the entry pointed by this + * index is available, see the comments in __ring_put(). + */ + smp_rmb(); + if (__ring_is_empty(in, ring->out)) { + return NULL; + } + + index = ring_index(ring, ring->out); + + data = atomic_read(&ring->data[index]); + + /* + * smp_mb() + * + * once the ring->out is updated the entry originally indicated by the + * the index is visible and usable to the producer so that we should + * make sure reading the entry out before updating ring->out to avoid + * the entry being overwritten by the producer. + */ + atomic_store_release(&ring->out, ring->out + 1); + + return data; +} + +static inline int ring_mp_put(Ring *ring, void *data) +{ + unsigned int index, in, in_next, out; + + do { + in = atomic_read(&ring->in); + out = atomic_read(&ring->out); + + if (__ring_is_full(ring, in, out)) { + if (atomic_read(&ring->in) == in && + atomic_read(&ring->out) == out) { + return -ENOBUFS; + } + + /* a entry has been fetched out, retry. */ + continue; + } + + in_next = in + 1; + } while (atomic_cmpxchg(&ring->in, in, in_next) != in); + + index = ring_index(ring, in); + + /* + * smp_rmb() paired with the memory barrier of (A) in ring_mp_get() + * is implied in atomic_cmpxchg() as we should read ring->out first + * before fetching the entry, otherwise this assert will fail. + */ + assert(!atomic_read(&ring->data[index])); + + /* + * smp_mb() paired with the memory barrier of (B) in ring_mp_get() is + * implied in atomic_cmpxchg(), that is needed here as we should read + * ring->out before updating the entry, it is the same as we did in + * __ring_put(). + * + * smp_wmb() paired with the memory barrier of (C) in ring_mp_get() + * is implied in atomic_cmpxchg(), that is needed as we should increase + * ring->in before updating the entry. + */ + atomic_set(&ring->data[index], data); + + return 0; +} + +static inline void *ring_mp_get(Ring *ring) +{ + unsigned int index, in; + void *data; + + do { + in = atomic_read(&ring->in); + + /* + * (C) should read ring->in first to make sure the entry pointed by this + * index is available + */ + smp_rmb(); + + if (!__ring_is_empty(in, ring->out)) { + break; + } + + if (atomic_read(&ring->in) == in) { + return NULL; + } + /* new entry has been added in, retry. */ + } while (1); + + index = ring_index(ring, ring->out); + + do { + data = atomic_read(&ring->data[index]); + if (data) { + break; + } + /* the producer is updating the entry, retry */ + cpu_relax(); + } while (1); + + atomic_set(&ring->data[index], NULL); + + /* + * (B) smp_mb() is needed as we should read the entry out before + * updating ring->out as we did in __ring_get(). + * + * (A) smp_wmb() is needed as we should make the entry be NULL before + * updating ring->out (which will make the entry be visible and usable). + */ + atomic_store_release(&ring->out, ring->out + 1); + + return data; +} + +static inline int ring_put(Ring *ring, void *data) +{ + if (ring->flags & RING_MULTI_PRODUCER) { + return ring_mp_put(ring, data); + } + return __ring_put(ring, data); +} + +static inline void *ring_get(Ring *ring) +{ + if (ring->flags & RING_MULTI_PRODUCER) { + return ring_mp_get(ring); + } + return __ring_get(ring); +} +#endif