From patchwork Thu Nov 22 07:20:25 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10693593 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 9DBD415A7 for ; Thu, 22 Nov 2018 07:20:49 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 795E72CBBB for ; Thu, 22 Nov 2018 07:20:49 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 6D4EA2CBD5; Thu, 22 Nov 2018 07:20:49 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-8.0 required=2.0 tests=BAYES_00,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 2871D2CBBB for ; Thu, 22 Nov 2018 07:20:48 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2392554AbeKVR6z (ORCPT ); Thu, 22 Nov 2018 12:58:55 -0500 Received: from mail-pf1-f193.google.com ([209.85.210.193]:38347 "EHLO mail-pf1-f193.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S2392552AbeKVR6y (ORCPT ); Thu, 22 Nov 2018 12:58:54 -0500 Received: by mail-pf1-f193.google.com with SMTP id q1so1500288pfi.5 for ; Wed, 21 Nov 2018 23:20:46 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references :mime-version:content-transfer-encoding; bh=rax0SfV08nU6hzgMl3qgHqkzDVovsOb8w02R0HHxB98=; b=lUrOS14dg65z7Ciy/jCUX3X8N0I/zT5JLoqXduK+Icvi2qzqy1c9i6WMbVma41v9I4 3vW8FE9akuA7FaIeFtRfje8VhuC3WIU8Nh+gAiBKxI87j3TwLMHC8s4q/YZw4oHno0l0 thgGBPHToUV0/g7o8VxN0QfQlo70nh8b5NStLPzvtY9L0Sm7VCKUGLy7E40Otqh6Jwtv yL/5p3aMK0iztyl6OXLoyAKnYClby+AXTg/l3IaJXTnXQzganyfqo+IeS5t9aejPVsBE CqFlEXOmrTWWgnuRZANSALuRHjVUBmrAOK9DH8oSeQK5Omk1xRP13nXyFgKhoBa8fm9t 0kKw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references:mime-version:content-transfer-encoding; bh=rax0SfV08nU6hzgMl3qgHqkzDVovsOb8w02R0HHxB98=; b=b4AxBzVsDWR7YfxQQ10YJYlIaxa6GJgB19q4TARTtTBufuDyRi5ZJZxrxxwF8EI9x0 C5UWDWSaTRj4dY4J5XpJiHnHN/hNPGJ4oIAPEqcn/ZjF+Lp4w+v3A26yvnBDjhu+dKIM 5UT1hgZe7fkU4naW965ZFELadpBGmzNF8vvIBObYFM5t+T/8CB2cNVO/dXXBAEwBvMnP BMxp2XAhgeNJlwsohaVa6bts3MEwUIg7Gvol508Ml1F4WcDw5/XkHpNsvCbtwkAGy6NU 1QasB78TuAAkEJh5G3GVUh1EFE+SDRO0tSQsu9znL93q8dNzpzb2ondjFEaU5c6+Gt6i NvKg== X-Gm-Message-State: AA+aEWZdRWNTMri8k2F/h90S008MnvoG8xqrtT5tcgMuMwf1339YXP2V mLP0VfMEgfGt8VCR7m8pA+g= X-Google-Smtp-Source: AFSGD/WvgefpkXSrjAyWYYTfxGNyCNJ9cjKKVFveBLbmg2vFTI/ABvhzyhkW2/oDLAceTPS7ZXoFKw== X-Received: by 2002:a63:104d:: with SMTP id 13mr9007481pgq.303.1542871245669; Wed, 21 Nov 2018 23:20:45 -0800 (PST) Received: from localhost.localdomain ([203.205.141.36]) by smtp.gmail.com with ESMTPSA id 19sm52731312pfs.108.2018.11.21.23.20.41 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Wed, 21 Nov 2018 23:20:45 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Cc: qemu-devel@nongnu.org, kvm@vger.kernel.org, dgilbert@redhat.com, peterx@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn, eblake@redhat.com, quintela@redhat.com, cota@braap.org, Xiao Guangrong Subject: [PATCH v3 2/5] util: introduce threaded workqueue Date: Thu, 22 Nov 2018 15:20:25 +0800 Message-Id: <20181122072028.22819-3-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181122072028.22819-1-xiaoguangrong@tencent.com> References: <20181122072028.22819-1-xiaoguangrong@tencent.com> MIME-Version: 1.0 Sender: kvm-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: kvm@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong This modules implements the lockless and efficient threaded workqueue. Three abstracted objects are used in this module: - Request. It not only contains the data that the workqueue fetches out to finish the request but also offers the space to save the result after the workqueue handles the request. It's flowed between user and workqueue. The user fills the request data into it when it is owned by user. After it is submitted to the workqueue, the workqueue fetched data out and save the result into it after the request is handled. All the requests are pre-allocated and carefully partitioned between threads so there is no contention on the request, that make threads be parallel as much as possible. - User, i.e, the submitter It's the one fills the request and submits it to the workqueue, the result will be collected after it is handled by the work queue. The user can consecutively submit requests without waiting the previous requests been handled. It only supports one submitter, you should do serial submission by yourself if you want more, e.g, use lock on you side. - Workqueue, i.e, thread Each workqueue is represented by a running thread that fetches the request submitted by the user, do the specified work and save the result to the request. Signed-off-by: Xiao Guangrong --- include/qemu/threaded-workqueue.h | 106 +++++++++ util/Makefile.objs | 1 + util/threaded-workqueue.c | 463 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 570 insertions(+) create mode 100644 include/qemu/threaded-workqueue.h create mode 100644 util/threaded-workqueue.c diff --git a/include/qemu/threaded-workqueue.h b/include/qemu/threaded-workqueue.h new file mode 100644 index 0000000000..e0ede496d0 --- /dev/null +++ b/include/qemu/threaded-workqueue.h @@ -0,0 +1,106 @@ +/* + * Lockless and Efficient Threaded Workqueue Abstraction + * + * Author: + * Xiao Guangrong + * + * Copyright(C) 2018 Tencent Corporation. + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#ifndef QEMU_THREADED_WORKQUEUE_H +#define QEMU_THREADED_WORKQUEUE_H + +#include "qemu/queue.h" +#include "qemu/thread.h" + +/* + * This modules implements the lockless and efficient threaded workqueue. + * + * Three abstracted objects are used in this module: + * - Request. + * It not only contains the data that the workqueue fetches out + * to finish the request but also offers the space to save the result + * after the workqueue handles the request. + * + * It's flowed between user and workqueue. The user fills the request + * data into it when it is owned by user. After it is submitted to the + * workqueue, the workqueue fetched data out and save the result into + * it after the request is handled. + * + * All the requests are pre-allocated and carefully partitioned between + * threads so there is no contention on the request, that make threads + * be parallel as much as possible. + * + * - User, i.e, the submitter + * It's the one fills the request and submits it to the workqueue, + * the result will be collected after it is handled by the work queue. + * + * The user can consecutively submit requests without waiting the previous + * requests been handled. + * It only supports one submitter, you should do serial submission by + * yourself if you want more, e.g, use lock on you side. + * + * - Workqueue, i.e, thread + * Each workqueue is represented by a running thread that fetches + * the request submitted by the user, do the specified work and save + * the result to the request. + */ + +typedef struct Threads Threads; + +struct ThreadedWorkqueueOps { + /* constructor of the request */ + int (*thread_request_init)(void *request); + /* destructor of the request */ + void (*thread_request_uninit)(void *request); + + /* the handler of the request that is called by the thread */ + void (*thread_request_handler)(void *request); + /* called by the user after the request has been handled */ + void (*thread_request_done)(void *request); + + size_t request_size; +}; +typedef struct ThreadedWorkqueueOps ThreadedWorkqueueOps; + +/* the default number of requests that thread need handle */ +#define DEFAULT_THREAD_REQUEST_NR 4 +/* the max number of requests that thread need handle */ +#define MAX_THREAD_REQUEST_NR (sizeof(uint64_t) * BITS_PER_BYTE) + +/* + * create a threaded queue. Other APIs will work on the Threads it returned + * + * @name: the identity of the workqueue which is used to construct the name + * of threads only + * @threads_nr: the number of threads that the workqueue will create + * @thread_requests_nr: the number of requests that each single thread will + * handle + * @ops: the handlers of the request + * + * Return NULL if it failed + */ +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, + unsigned int thread_requests_nr, + const ThreadedWorkqueueOps *ops); +void threaded_workqueue_destroy(Threads *threads); + +/* + * find a free request where the user can store the data that is needed to + * finish the request + * + * If all requests are used up, return NULL + */ +void *threaded_workqueue_get_request(Threads *threads); +/* submit the request and notify the thread */ +void threaded_workqueue_submit_request(Threads *threads, void *request); + +/* + * wait all threads to complete the request to make sure there is no + * previous request exists + */ +void threaded_workqueue_wait_for_requests(Threads *threads); +#endif diff --git a/util/Makefile.objs b/util/Makefile.objs index 0820923c18..f26dfe5182 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -50,5 +50,6 @@ util-obj-y += range.o util-obj-y += stats64.o util-obj-y += systemd.o util-obj-y += iova-tree.o +util-obj-y += threaded-workqueue.o util-obj-$(CONFIG_LINUX) += vfio-helpers.o util-obj-$(CONFIG_OPENGL) += drm.o diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c new file mode 100644 index 0000000000..2ab37cee8d --- /dev/null +++ b/util/threaded-workqueue.c @@ -0,0 +1,463 @@ +/* + * Lockless and Efficient Threaded Workqueue Abstraction + * + * Author: + * Xiao Guangrong + * + * Copyright(C) 2018 Tencent Corporation. + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#include "qemu/osdep.h" +#include "qemu/bitmap.h" +#include "qemu/threaded-workqueue.h" + +#define SMP_CACHE_BYTES 64 + +/* + * the request representation which contains the internally used mete data, + * it is the header of user-defined data. + * + * It should be aligned to the nature size of CPU. + */ +struct ThreadRequest { + /* + * the request has been handled by the thread and need the user + * to fetch result out. + */ + uint8_t done; + + /* + * the index to Thread::requests. + * Save it to the padding space although it can be calculated at runtime. + */ + uint8_t request_index; + + /* the index to Threads::per_thread_data */ + unsigned int thread_index; +} QEMU_ALIGNED(sizeof(unsigned long)); +typedef struct ThreadRequest ThreadRequest; + +struct ThreadLocal { + struct Threads *threads; + + /* the index of the thread */ + int self; + + /* thread is useless and needs to exit */ + bool quit; + + QemuThread thread; + + void *requests; + + /* + * the bit in these two bitmaps indicates the index of the ï¼ requests + * respectively. If it's the same, the corresponding request is free + * and owned by the user, i.e, where the user fills a request. Otherwise, + * it is valid and owned by the thread, i.e, where the thread fetches + * the request and write the result. + */ + + /* after the user fills the request, the bit is flipped. */ + uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); + /* after handles the request, the thread flips the bit. */ + uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); + + /* + * the event used to wake up the thread whenever a valid request has + * been submitted + */ + QemuEvent request_valid_ev QEMU_ALIGNED(SMP_CACHE_BYTES); + + /* + * the event is notified whenever a request has been completed + * (i.e, become free), which is used to wake up the user + */ + QemuEvent request_free_ev QEMU_ALIGNED(SMP_CACHE_BYTES); +}; +typedef struct ThreadLocal ThreadLocal; + +/* + * the main data struct represents multithreads which is shared by + * all threads + */ +struct Threads { + /* the request header, ThreadRequest, is contained */ + unsigned int request_size; + unsigned int thread_requests_nr; + unsigned int threads_nr; + + /* the request is pushed to the thread with round-robin manner */ + unsigned int current_thread_index; + + const ThreadedWorkqueueOps *ops; + + ThreadLocal per_thread_data[0]; +}; +typedef struct Threads Threads; + +static ThreadRequest *index_to_request(ThreadLocal *thread, int request_index) +{ + ThreadRequest *request; + + request = thread->requests + request_index * thread->threads->request_size; + assert(request->request_index == request_index); + assert(request->thread_index == thread->self); + return request; +} + +static int request_to_index(ThreadRequest *request) +{ + return request->request_index; +} + +static int request_to_thread_index(ThreadRequest *request) +{ + return request->thread_index; +} + +/* + * free request: the request is not used by any thread, however, it might + * contain the result need the user to call thread_request_done() + * + * valid request: the request contains the request data and it's committed + * to the thread, i,e. it's owned by thread. + */ +static uint64_t get_free_request_bitmap(Threads *threads, ThreadLocal *thread) +{ + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; + + request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap); + request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap); + bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap, + threads->thread_requests_nr); + + /* + * paired with smp_wmb() in mark_request_free() to make sure that we + * read request_done_bitmap before fetching the result out. + */ + smp_rmb(); + + return result_bitmap; +} + +static ThreadRequest +*find_thread_free_request(Threads *threads, ThreadLocal *thread) +{ + uint64_t result_bitmap = get_free_request_bitmap(threads, thread); + int index; + + index = find_first_zero_bit(&result_bitmap, threads->thread_requests_nr); + if (index >= threads->thread_requests_nr) { + return NULL; + } + + return index_to_request(thread, index); +} + +static ThreadRequest *threads_find_free_request(Threads *threads) +{ + ThreadLocal *thread; + ThreadRequest *request; + int cur_thread, thread_index; + + cur_thread = threads->current_thread_index % threads->threads_nr; + thread_index = cur_thread; + do { + thread = threads->per_thread_data + thread_index++; + request = find_thread_free_request(threads, thread); + if (request) { + break; + } + thread_index %= threads->threads_nr; + } while (thread_index != cur_thread); + + return request; +} + +/* + * the change bit operation combined with READ_ONCE and WRITE_ONCE which + * only works on single uint64_t width + */ +static void change_bit_once(long nr, uint64_t *addr) +{ + uint64_t value = atomic_rcu_read(addr) ^ BIT_MASK(nr); + + atomic_rcu_set(addr, value); +} + +static void mark_request_valid(Threads *threads, ThreadRequest *request) +{ + int thread_index = request_to_thread_index(request); + int request_index = request_to_index(request); + ThreadLocal *thread = threads->per_thread_data + thread_index; + + /* + * paired with smp_rmb() in find_first_valid_request_index() to make + * sure the request has been filled before the bit is flipped that + * will make the request be visible to the thread + */ + smp_wmb(); + + change_bit_once(request_index, &thread->request_fill_bitmap); + qemu_event_set(&thread->request_valid_ev); +} + +static int thread_find_first_valid_request_index(ThreadLocal *thread) +{ + Threads *threads = thread->threads; + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; + int index; + + request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap); + request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap); + bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap, + threads->thread_requests_nr); + /* + * paired with smp_wmb() in mark_request_valid() to make sure that + * we read request_fill_bitmap before fetch the request out. + */ + smp_rmb(); + + index = find_first_bit(&result_bitmap, threads->thread_requests_nr); + return index >= threads->thread_requests_nr ? -1 : index; +} + +static void mark_request_free(ThreadLocal *thread, ThreadRequest *request) +{ + int index = request_to_index(request); + + /* + * smp_wmb() is implied in change_bit_atomic() that is paired with + * smp_rmb() in get_free_request_bitmap() to make sure the result + * has been saved before the bit is flipped. + */ + change_bit_atomic(index, &thread->request_done_bitmap); + qemu_event_set(&thread->request_free_ev); +} + +/* retry to see if there is available request before actually go to wait. */ +#define BUSY_WAIT_COUNT 1000 + +static ThreadRequest * +thread_busy_wait_for_request(ThreadLocal *thread) +{ + int index, count = 0; + + for (count = 0; count < BUSY_WAIT_COUNT; count++) { + index = thread_find_first_valid_request_index(thread); + if (index >= 0) { + return index_to_request(thread, index); + } + + cpu_relax(); + } + + return NULL; +} + +static void *thread_run(void *opaque) +{ + ThreadLocal *self_data = (ThreadLocal *)opaque; + Threads *threads = self_data->threads; + void (*handler)(void *request) = threads->ops->thread_request_handler; + ThreadRequest *request; + + for ( ; !atomic_read(&self_data->quit); ) { + qemu_event_reset(&self_data->request_valid_ev); + + request = thread_busy_wait_for_request(self_data); + if (!request) { + qemu_event_wait(&self_data->request_valid_ev); + continue; + } + + assert(!request->done); + + handler(request + 1); + request->done = true; + mark_request_free(self_data, request); + } + + return NULL; +} + +static void uninit_thread_requests(ThreadLocal *thread, int free_nr) +{ + Threads *threads = thread->threads; + ThreadRequest *request = thread->requests; + int i; + + for (i = 0; i < free_nr; i++) { + threads->ops->thread_request_uninit(request + 1); + request = (void *)request + threads->request_size; + } + g_free(thread->requests); +} + +static int init_thread_requests(ThreadLocal *thread) +{ + Threads *threads = thread->threads; + ThreadRequest *request; + int ret, i, thread_reqs_size; + + thread_reqs_size = threads->thread_requests_nr * threads->request_size; + thread_reqs_size = QEMU_ALIGN_UP(thread_reqs_size, SMP_CACHE_BYTES); + thread->requests = g_malloc0(thread_reqs_size); + + request = thread->requests; + for (i = 0; i < threads->thread_requests_nr; i++) { + ret = threads->ops->thread_request_init(request + 1); + if (ret < 0) { + goto exit; + } + + request->request_index = i; + request->thread_index = thread->self; + request = (void *)request + threads->request_size; + } + return 0; + +exit: + uninit_thread_requests(thread, i); + return -1; +} + +static void uninit_thread_data(Threads *threads, int free_nr) +{ + ThreadLocal *thread_local = threads->per_thread_data; + int i; + + for (i = 0; i < free_nr; i++) { + thread_local[i].quit = true; + qemu_event_set(&thread_local[i].request_valid_ev); + qemu_thread_join(&thread_local[i].thread); + qemu_event_destroy(&thread_local[i].request_valid_ev); + qemu_event_destroy(&thread_local[i].request_free_ev); + uninit_thread_requests(&thread_local[i], threads->thread_requests_nr); + } +} + +static int +init_thread_data(Threads *threads, const char *thread_name, int thread_nr) +{ + ThreadLocal *thread_local = threads->per_thread_data; + char *name; + int i; + + for (i = 0; i < thread_nr; i++) { + thread_local[i].threads = threads; + thread_local[i].self = i; + + if (init_thread_requests(&thread_local[i]) < 0) { + goto exit; + } + + qemu_event_init(&thread_local[i].request_free_ev, false); + qemu_event_init(&thread_local[i].request_valid_ev, false); + + name = g_strdup_printf("%s/%d", thread_name, thread_local[i].self); + qemu_thread_create(&thread_local[i].thread, name, + thread_run, &thread_local[i], QEMU_THREAD_JOINABLE); + g_free(name); + } + return 0; + +exit: + uninit_thread_data(threads, i); + return -1; +} + +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, + unsigned int thread_requests_nr, + const ThreadedWorkqueueOps *ops) +{ + Threads *threads; + + if (threads_nr > MAX_THREAD_REQUEST_NR) { + return NULL; + } + + threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal)); + threads->ops = ops; + threads->threads_nr = threads_nr; + threads->thread_requests_nr = thread_requests_nr; + + QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(sizeof(ThreadRequest), sizeof(long))); + threads->request_size = threads->ops->request_size; + threads->request_size = QEMU_ALIGN_UP(threads->request_size, sizeof(long)); + threads->request_size += sizeof(ThreadRequest); + + if (init_thread_data(threads, name, threads_nr) < 0) { + g_free(threads); + return NULL; + } + + return threads; +} + +void threaded_workqueue_destroy(Threads *threads) +{ + uninit_thread_data(threads, threads->threads_nr); + g_free(threads); +} + +static void request_done(Threads *threads, ThreadRequest *request) +{ + if (!request->done) { + return; + } + + threads->ops->thread_request_done(request + 1); + request->done = false; +} + +void *threaded_workqueue_get_request(Threads *threads) +{ + ThreadRequest *request; + + request = threads_find_free_request(threads); + if (!request) { + return NULL; + } + + request_done(threads, request); + return request + 1; +} + +void threaded_workqueue_submit_request(Threads *threads, void *request) +{ + ThreadRequest *req = request - sizeof(ThreadRequest); + int thread_index = request_to_thread_index(request); + + assert(!req->done); + mark_request_valid(threads, req); + threads->current_thread_index = thread_index + 1; +} + +void threaded_workqueue_wait_for_requests(Threads *threads) +{ + ThreadLocal *thread; + uint64_t result_bitmap; + int thread_index, index = 0; + + for (thread_index = 0; thread_index < threads->threads_nr; thread_index++) { + thread = threads->per_thread_data + thread_index; + index = 0; +retry: + qemu_event_reset(&thread->request_free_ev); + result_bitmap = get_free_request_bitmap(threads, thread); + + for (; index < threads->thread_requests_nr; index++) { + if (test_bit(index, &result_bitmap)) { + qemu_event_wait(&thread->request_free_ev); + goto retry; + } + + request_done(threads, index_to_request(thread, index)); + } + } +}