Message ID | 20181122072028.22819-3-xiaoguangrong@tencent.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | migration: improve multithreads | expand |
* guangrong.xiao@gmail.com (guangrong.xiao@gmail.com) wrote: > From: Xiao Guangrong <xiaoguangrong@tencent.com> > > This modules implements the lockless and efficient threaded workqueue. > > Three abstracted objects are used in this module: > - Request. > It not only contains the data that the workqueue fetches out > to finish the request but also offers the space to save the result > after the workqueue handles the request. > > It's flowed between user and workqueue. The user fills the request > data into it when it is owned by user. After it is submitted to the > workqueue, the workqueue fetched data out and save the result into > it after the request is handled. > > All the requests are pre-allocated and carefully partitioned between > threads so there is no contention on the request, that make threads > be parallel as much as possible. > > - User, i.e, the submitter > It's the one fills the request and submits it to the workqueue, > the result will be collected after it is handled by the work queue. > > The user can consecutively submit requests without waiting the previous > requests been handled. > It only supports one submitter, you should do serial submission by > yourself if you want more, e.g, use lock on you side. > > - Workqueue, i.e, thread > Each workqueue is represented by a running thread that fetches > the request submitted by the user, do the specified work and save > the result to the request. > > Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com> > --- > include/qemu/threaded-workqueue.h | 106 +++++++++ > util/Makefile.objs | 1 + > util/threaded-workqueue.c | 463 ++++++++++++++++++++++++++++++++++++++ > 3 files changed, 570 insertions(+) > create mode 100644 include/qemu/threaded-workqueue.h > create mode 100644 util/threaded-workqueue.c > > diff --git a/include/qemu/threaded-workqueue.h b/include/qemu/threaded-workqueue.h > new file mode 100644 > index 0000000000..e0ede496d0 > --- /dev/null > +++ b/include/qemu/threaded-workqueue.h > @@ -0,0 +1,106 @@ > +/* > + * Lockless and Efficient Threaded Workqueue Abstraction > + * > + * Author: > + * Xiao Guangrong <xiaoguangrong@tencent.com> > + * > + * Copyright(C) 2018 Tencent Corporation. > + * > + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. > + * See the COPYING.LIB file in the top-level directory. > + */ > + > +#ifndef QEMU_THREADED_WORKQUEUE_H > +#define QEMU_THREADED_WORKQUEUE_H > + > +#include "qemu/queue.h" > +#include "qemu/thread.h" > + > +/* > + * This modules implements the lockless and efficient threaded workqueue. > + * > + * Three abstracted objects are used in this module: > + * - Request. > + * It not only contains the data that the workqueue fetches out > + * to finish the request but also offers the space to save the result > + * after the workqueue handles the request. > + * > + * It's flowed between user and workqueue. The user fills the request > + * data into it when it is owned by user. After it is submitted to the > + * workqueue, the workqueue fetched data out and save the result into > + * it after the request is handled. > + * > + * All the requests are pre-allocated and carefully partitioned between > + * threads so there is no contention on the request, that make threads > + * be parallel as much as possible. > + * > + * - User, i.e, the submitter > + * It's the one fills the request and submits it to the workqueue, > + * the result will be collected after it is handled by the work queue. > + * > + * The user can consecutively submit requests without waiting the previous > + * requests been handled. > + * It only supports one submitter, you should do serial submission by > + * yourself if you want more, e.g, use lock on you side. > + * > + * - Workqueue, i.e, thread > + * Each workqueue is represented by a running thread that fetches > + * the request submitted by the user, do the specified work and save > + * the result to the request. > + */ > + > +typedef struct Threads Threads; > + > +struct ThreadedWorkqueueOps { > + /* constructor of the request */ > + int (*thread_request_init)(void *request); > + /* destructor of the request */ > + void (*thread_request_uninit)(void *request); > + > + /* the handler of the request that is called by the thread */ > + void (*thread_request_handler)(void *request); > + /* called by the user after the request has been handled */ > + void (*thread_request_done)(void *request); > + > + size_t request_size; > +}; > +typedef struct ThreadedWorkqueueOps ThreadedWorkqueueOps; > + > +/* the default number of requests that thread need handle */ > +#define DEFAULT_THREAD_REQUEST_NR 4 > +/* the max number of requests that thread need handle */ > +#define MAX_THREAD_REQUEST_NR (sizeof(uint64_t) * BITS_PER_BYTE) > + > +/* > + * create a threaded queue. Other APIs will work on the Threads it returned > + * > + * @name: the identity of the workqueue which is used to construct the name > + * of threads only > + * @threads_nr: the number of threads that the workqueue will create > + * @thread_requests_nr: the number of requests that each single thread will > + * handle > + * @ops: the handlers of the request > + * > + * Return NULL if it failed > + */ > +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, > + unsigned int thread_requests_nr, > + const ThreadedWorkqueueOps *ops); > +void threaded_workqueue_destroy(Threads *threads); > + > +/* > + * find a free request where the user can store the data that is needed to > + * finish the request > + * > + * If all requests are used up, return NULL > + */ > +void *threaded_workqueue_get_request(Threads *threads); > +/* submit the request and notify the thread */ > +void threaded_workqueue_submit_request(Threads *threads, void *request); > + > +/* > + * wait all threads to complete the request to make sure there is no > + * previous request exists > + */ > +void threaded_workqueue_wait_for_requests(Threads *threads); > +#endif > diff --git a/util/Makefile.objs b/util/Makefile.objs > index 0820923c18..f26dfe5182 100644 > --- a/util/Makefile.objs > +++ b/util/Makefile.objs > @@ -50,5 +50,6 @@ util-obj-y += range.o > util-obj-y += stats64.o > util-obj-y += systemd.o > util-obj-y += iova-tree.o > +util-obj-y += threaded-workqueue.o > util-obj-$(CONFIG_LINUX) += vfio-helpers.o > util-obj-$(CONFIG_OPENGL) += drm.o > diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c > new file mode 100644 > index 0000000000..2ab37cee8d > --- /dev/null > +++ b/util/threaded-workqueue.c > @@ -0,0 +1,463 @@ > +/* > + * Lockless and Efficient Threaded Workqueue Abstraction > + * > + * Author: > + * Xiao Guangrong <xiaoguangrong@tencent.com> > + * > + * Copyright(C) 2018 Tencent Corporation. > + * > + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. > + * See the COPYING.LIB file in the top-level directory. > + */ > + > +#include "qemu/osdep.h" > +#include "qemu/bitmap.h" > +#include "qemu/threaded-workqueue.h" > + > +#define SMP_CACHE_BYTES 64 That's architecture dependent isn't it? > + > +/* > + * the request representation which contains the internally used mete data, > + * it is the header of user-defined data. > + * > + * It should be aligned to the nature size of CPU. > + */ > +struct ThreadRequest { > + /* > + * the request has been handled by the thread and need the user > + * to fetch result out. > + */ > + uint8_t done; > + > + /* > + * the index to Thread::requests. > + * Save it to the padding space although it can be calculated at runtime. > + */ > + uint8_t request_index; > + > + /* the index to Threads::per_thread_data */ > + unsigned int thread_index; > +} QEMU_ALIGNED(sizeof(unsigned long)); > +typedef struct ThreadRequest ThreadRequest; > + > +struct ThreadLocal { > + struct Threads *threads; > + > + /* the index of the thread */ > + int self; > + > + /* thread is useless and needs to exit */ > + bool quit; > + > + QemuThread thread; > + > + void *requests; > + > + /* > + * the bit in these two bitmaps indicates the index of the @requests > + * respectively. If it's the same, the corresponding request is free > + * and owned by the user, i.e, where the user fills a request. Otherwise, > + * it is valid and owned by the thread, i.e, where the thread fetches > + * the request and write the result. > + */ > + > + /* after the user fills the request, the bit is flipped. */ > + uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); > + /* after handles the request, the thread flips the bit. */ > + uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); Patchew complained about some type mismatches; I think those are because you're using the bitmap_* functions on these; those functions always operate on 'long' not on uint64_t - and on some platforms they're unfortunately not the same. Dave > + /* > + * the event used to wake up the thread whenever a valid request has > + * been submitted > + */ > + QemuEvent request_valid_ev QEMU_ALIGNED(SMP_CACHE_BYTES); > + > + /* > + * the event is notified whenever a request has been completed > + * (i.e, become free), which is used to wake up the user > + */ > + QemuEvent request_free_ev QEMU_ALIGNED(SMP_CACHE_BYTES); > +}; > +typedef struct ThreadLocal ThreadLocal; > + > +/* > + * the main data struct represents multithreads which is shared by > + * all threads > + */ > +struct Threads { > + /* the request header, ThreadRequest, is contained */ > + unsigned int request_size; > + unsigned int thread_requests_nr; > + unsigned int threads_nr; > + > + /* the request is pushed to the thread with round-robin manner */ > + unsigned int current_thread_index; > + > + const ThreadedWorkqueueOps *ops; > + > + ThreadLocal per_thread_data[0]; > +}; > +typedef struct Threads Threads; > + > +static ThreadRequest *index_to_request(ThreadLocal *thread, int request_index) > +{ > + ThreadRequest *request; > + > + request = thread->requests + request_index * thread->threads->request_size; > + assert(request->request_index == request_index); > + assert(request->thread_index == thread->self); > + return request; > +} > + > +static int request_to_index(ThreadRequest *request) > +{ > + return request->request_index; > +} > + > +static int request_to_thread_index(ThreadRequest *request) > +{ > + return request->thread_index; > +} > + > +/* > + * free request: the request is not used by any thread, however, it might > + * contain the result need the user to call thread_request_done() > + * > + * valid request: the request contains the request data and it's committed > + * to the thread, i,e. it's owned by thread. > + */ > +static uint64_t get_free_request_bitmap(Threads *threads, ThreadLocal *thread) > +{ > + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; > + > + request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap); > + request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap); > + bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap, > + threads->thread_requests_nr); > + > + /* > + * paired with smp_wmb() in mark_request_free() to make sure that we > + * read request_done_bitmap before fetching the result out. > + */ > + smp_rmb(); > + > + return result_bitmap; > +} > + > +static ThreadRequest > +*find_thread_free_request(Threads *threads, ThreadLocal *thread) > +{ > + uint64_t result_bitmap = get_free_request_bitmap(threads, thread); > + int index; > + > + index = find_first_zero_bit(&result_bitmap, threads->thread_requests_nr); > + if (index >= threads->thread_requests_nr) { > + return NULL; > + } > + > + return index_to_request(thread, index); > +} > + > +static ThreadRequest *threads_find_free_request(Threads *threads) > +{ > + ThreadLocal *thread; > + ThreadRequest *request; > + int cur_thread, thread_index; > + > + cur_thread = threads->current_thread_index % threads->threads_nr; > + thread_index = cur_thread; > + do { > + thread = threads->per_thread_data + thread_index++; > + request = find_thread_free_request(threads, thread); > + if (request) { > + break; > + } > + thread_index %= threads->threads_nr; > + } while (thread_index != cur_thread); > + > + return request; > +} > + > +/* > + * the change bit operation combined with READ_ONCE and WRITE_ONCE which > + * only works on single uint64_t width > + */ > +static void change_bit_once(long nr, uint64_t *addr) > +{ > + uint64_t value = atomic_rcu_read(addr) ^ BIT_MASK(nr); > + > + atomic_rcu_set(addr, value); > +} > + > +static void mark_request_valid(Threads *threads, ThreadRequest *request) > +{ > + int thread_index = request_to_thread_index(request); > + int request_index = request_to_index(request); > + ThreadLocal *thread = threads->per_thread_data + thread_index; > + > + /* > + * paired with smp_rmb() in find_first_valid_request_index() to make > + * sure the request has been filled before the bit is flipped that > + * will make the request be visible to the thread > + */ > + smp_wmb(); > + > + change_bit_once(request_index, &thread->request_fill_bitmap); > + qemu_event_set(&thread->request_valid_ev); > +} > + > +static int thread_find_first_valid_request_index(ThreadLocal *thread) > +{ > + Threads *threads = thread->threads; > + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; > + int index; > + > + request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap); > + request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap); > + bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap, > + threads->thread_requests_nr); > + /* > + * paired with smp_wmb() in mark_request_valid() to make sure that > + * we read request_fill_bitmap before fetch the request out. > + */ > + smp_rmb(); > + > + index = find_first_bit(&result_bitmap, threads->thread_requests_nr); > + return index >= threads->thread_requests_nr ? -1 : index; > +} > + > +static void mark_request_free(ThreadLocal *thread, ThreadRequest *request) > +{ > + int index = request_to_index(request); > + > + /* > + * smp_wmb() is implied in change_bit_atomic() that is paired with > + * smp_rmb() in get_free_request_bitmap() to make sure the result > + * has been saved before the bit is flipped. > + */ > + change_bit_atomic(index, &thread->request_done_bitmap); > + qemu_event_set(&thread->request_free_ev); > +} > + > +/* retry to see if there is available request before actually go to wait. */ > +#define BUSY_WAIT_COUNT 1000 > + > +static ThreadRequest * > +thread_busy_wait_for_request(ThreadLocal *thread) > +{ > + int index, count = 0; > + > + for (count = 0; count < BUSY_WAIT_COUNT; count++) { > + index = thread_find_first_valid_request_index(thread); > + if (index >= 0) { > + return index_to_request(thread, index); > + } > + > + cpu_relax(); > + } > + > + return NULL; > +} > + > +static void *thread_run(void *opaque) > +{ > + ThreadLocal *self_data = (ThreadLocal *)opaque; > + Threads *threads = self_data->threads; > + void (*handler)(void *request) = threads->ops->thread_request_handler; > + ThreadRequest *request; > + > + for ( ; !atomic_read(&self_data->quit); ) { > + qemu_event_reset(&self_data->request_valid_ev); > + > + request = thread_busy_wait_for_request(self_data); > + if (!request) { > + qemu_event_wait(&self_data->request_valid_ev); > + continue; > + } > + > + assert(!request->done); > + > + handler(request + 1); > + request->done = true; > + mark_request_free(self_data, request); > + } > + > + return NULL; > +} > + > +static void uninit_thread_requests(ThreadLocal *thread, int free_nr) > +{ > + Threads *threads = thread->threads; > + ThreadRequest *request = thread->requests; > + int i; > + > + for (i = 0; i < free_nr; i++) { > + threads->ops->thread_request_uninit(request + 1); > + request = (void *)request + threads->request_size; > + } > + g_free(thread->requests); > +} > + > +static int init_thread_requests(ThreadLocal *thread) > +{ > + Threads *threads = thread->threads; > + ThreadRequest *request; > + int ret, i, thread_reqs_size; > + > + thread_reqs_size = threads->thread_requests_nr * threads->request_size; > + thread_reqs_size = QEMU_ALIGN_UP(thread_reqs_size, SMP_CACHE_BYTES); > + thread->requests = g_malloc0(thread_reqs_size); > + > + request = thread->requests; > + for (i = 0; i < threads->thread_requests_nr; i++) { > + ret = threads->ops->thread_request_init(request + 1); > + if (ret < 0) { > + goto exit; > + } > + > + request->request_index = i; > + request->thread_index = thread->self; > + request = (void *)request + threads->request_size; > + } > + return 0; > + > +exit: > + uninit_thread_requests(thread, i); > + return -1; > +} > + > +static void uninit_thread_data(Threads *threads, int free_nr) > +{ > + ThreadLocal *thread_local = threads->per_thread_data; > + int i; > + > + for (i = 0; i < free_nr; i++) { > + thread_local[i].quit = true; > + qemu_event_set(&thread_local[i].request_valid_ev); > + qemu_thread_join(&thread_local[i].thread); > + qemu_event_destroy(&thread_local[i].request_valid_ev); > + qemu_event_destroy(&thread_local[i].request_free_ev); > + uninit_thread_requests(&thread_local[i], threads->thread_requests_nr); > + } > +} > + > +static int > +init_thread_data(Threads *threads, const char *thread_name, int thread_nr) > +{ > + ThreadLocal *thread_local = threads->per_thread_data; > + char *name; > + int i; > + > + for (i = 0; i < thread_nr; i++) { > + thread_local[i].threads = threads; > + thread_local[i].self = i; > + > + if (init_thread_requests(&thread_local[i]) < 0) { > + goto exit; > + } > + > + qemu_event_init(&thread_local[i].request_free_ev, false); > + qemu_event_init(&thread_local[i].request_valid_ev, false); > + > + name = g_strdup_printf("%s/%d", thread_name, thread_local[i].self); > + qemu_thread_create(&thread_local[i].thread, name, > + thread_run, &thread_local[i], QEMU_THREAD_JOINABLE); > + g_free(name); > + } > + return 0; > + > +exit: > + uninit_thread_data(threads, i); > + return -1; > +} > + > +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, > + unsigned int thread_requests_nr, > + const ThreadedWorkqueueOps *ops) > +{ > + Threads *threads; > + > + if (threads_nr > MAX_THREAD_REQUEST_NR) { > + return NULL; > + } > + > + threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal)); > + threads->ops = ops; > + threads->threads_nr = threads_nr; > + threads->thread_requests_nr = thread_requests_nr; > + > + QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(sizeof(ThreadRequest), sizeof(long))); > + threads->request_size = threads->ops->request_size; > + threads->request_size = QEMU_ALIGN_UP(threads->request_size, sizeof(long)); > + threads->request_size += sizeof(ThreadRequest); > + > + if (init_thread_data(threads, name, threads_nr) < 0) { > + g_free(threads); > + return NULL; > + } > + > + return threads; > +} > + > +void threaded_workqueue_destroy(Threads *threads) > +{ > + uninit_thread_data(threads, threads->threads_nr); > + g_free(threads); > +} > + > +static void request_done(Threads *threads, ThreadRequest *request) > +{ > + if (!request->done) { > + return; > + } > + > + threads->ops->thread_request_done(request + 1); > + request->done = false; > +} > + > +void *threaded_workqueue_get_request(Threads *threads) > +{ > + ThreadRequest *request; > + > + request = threads_find_free_request(threads); > + if (!request) { > + return NULL; > + } > + > + request_done(threads, request); > + return request + 1; > +} > + > +void threaded_workqueue_submit_request(Threads *threads, void *request) > +{ > + ThreadRequest *req = request - sizeof(ThreadRequest); > + int thread_index = request_to_thread_index(request); > + > + assert(!req->done); > + mark_request_valid(threads, req); > + threads->current_thread_index = thread_index + 1; > +} > + > +void threaded_workqueue_wait_for_requests(Threads *threads) > +{ > + ThreadLocal *thread; > + uint64_t result_bitmap; > + int thread_index, index = 0; > + > + for (thread_index = 0; thread_index < threads->threads_nr; thread_index++) { > + thread = threads->per_thread_data + thread_index; > + index = 0; > +retry: > + qemu_event_reset(&thread->request_free_ev); > + result_bitmap = get_free_request_bitmap(threads, thread); > + > + for (; index < threads->thread_requests_nr; index++) { > + if (test_bit(index, &result_bitmap)) { > + qemu_event_wait(&thread->request_free_ev); > + goto retry; > + } > + > + request_done(threads, index_to_request(thread, index)); > + } > + } > +} > -- > 2.14.5 > -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
On Thu, Nov 22, 2018 at 15:20:25 +0800, guangrong.xiao@gmail.com wrote: > + /* > + * the bit in these two bitmaps indicates the index of the @requests This @ is not ASCII, is it? > + * respectively. If it's the same, the corresponding request is free > + * and owned by the user, i.e, where the user fills a request. Otherwise, > + * it is valid and owned by the thread, i.e, where the thread fetches > + * the request and write the result. > + */ > + > + /* after the user fills the request, the bit is flipped. */ > + uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); > + /* after handles the request, the thread flips the bit. */ > + uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); Use DECLARE_BITMAP, otherwise you'll get type errors as David pointed out. Thanks, Emilio
On Thu, Nov 22, 2018 at 15:20:25 +0800, guangrong.xiao@gmail.com wrote: > +static uint64_t get_free_request_bitmap(Threads *threads, ThreadLocal *thread) > +{ > + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; > + > + request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap); > + request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap); > + bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap, > + threads->thread_requests_nr); This is not wrong, but it's a big ugly. Instead, I would: - Introduce bitmap_xor_atomic in a previous patch - Use bitmap_xor_atomic here, getting rid of the rcu reads Thanks, Emilio
On 11/23/18 7:02 PM, Dr. David Alan Gilbert wrote: >> +#include "qemu/osdep.h" >> +#include "qemu/bitmap.h" >> +#include "qemu/threaded-workqueue.h" >> + >> +#define SMP_CACHE_BYTES 64 > > That's architecture dependent isn't it? > Yes, it's arch dependent indeed. I just used 64 for simplification and i think it is <= 64 on most CPU arch-es so that can work. Should i introduce statically defined CACHE LINE SIZE for all arch-es? :( >> + /* >> + * the bit in these two bitmaps indicates the index of the @requests >> + * respectively. If it's the same, the corresponding request is free >> + * and owned by the user, i.e, where the user fills a request. Otherwise, >> + * it is valid and owned by the thread, i.e, where the thread fetches >> + * the request and write the result. >> + */ >> + >> + /* after the user fills the request, the bit is flipped. */ >> + uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); >> + /* after handles the request, the thread flips the bit. */ >> + uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); > > Patchew complained about some type mismatches; I think those are because > you're using the bitmap_* functions on these; those functions always > operate on 'long' not on uint64_t - and on some platforms they're > unfortunately not the same. I guess you were taking about this error: ERROR: externs should be avoided in .c files #233: FILE: util/threaded-workqueue.c:65: + uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); The complained thing is "QEMU_ALIGNED(SMP_CACHE_BYTES)" as it gone when the aligned thing is removed... The issue you pointed out can be avoid by using type-casting, like: bitmap_xor(..., (void *)&thread->request_fill_bitmap) cannot we? Thanks!
On 11/24/18 8:12 AM, Emilio G. Cota wrote: > On Thu, Nov 22, 2018 at 15:20:25 +0800, guangrong.xiao@gmail.com wrote: >> + /* >> + * the bit in these two bitmaps indicates the index of the @requests > > This @ is not ASCII, is it? > Good eyes. :) Will fix it. >> + * respectively. If it's the same, the corresponding request is free >> + * and owned by the user, i.e, where the user fills a request. Otherwise, >> + * it is valid and owned by the thread, i.e, where the thread fetches >> + * the request and write the result. >> + */ >> + >> + /* after the user fills the request, the bit is flipped. */ >> + uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); >> + /* after handles the request, the thread flips the bit. */ >> + uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); > > Use DECLARE_BITMAP, otherwise you'll get type errors as David > pointed out. If we do it, the field becomes a pointer... that complicates the thing. Hmm, i am using the same trick applied by kvm module when it handles vcpu->requests: static inline bool kvm_test_request(int req, struct kvm_vcpu *vcpu) { return test_bit(req & KVM_REQUEST_MASK, (void *)&vcpu->requests); } Is it good?
On 11/24/18 8:17 AM, Emilio G. Cota wrote: > On Thu, Nov 22, 2018 at 15:20:25 +0800, guangrong.xiao@gmail.com wrote: >> +static uint64_t get_free_request_bitmap(Threads *threads, ThreadLocal *thread) >> +{ >> + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; >> + >> + request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap); >> + request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap); >> + bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap, >> + threads->thread_requests_nr); > > This is not wrong, but it's a big ugly. Instead, I would: > > - Introduce bitmap_xor_atomic in a previous patch > - Use bitmap_xor_atomic here, getting rid of the rcu reads Hmm, however, we do not need atomic xor operation here... that should be slower than just two READ_ONCE calls.
On 26/11/18 09:18, Xiao Guangrong wrote: >> >>> +static uint64_t get_free_request_bitmap(Threads *threads, >>> ThreadLocal *thread) >>> +{ >>> + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; >>> + >>> + request_fill_bitmap = >>> atomic_rcu_read(&thread->request_fill_bitmap); >>> + request_done_bitmap = >>> atomic_rcu_read(&thread->request_done_bitmap); >>> + bitmap_xor(&result_bitmap, &request_fill_bitmap, >>> &request_done_bitmap, >>> + threads->thread_requests_nr); >> >> This is not wrong, but it's a big ugly. Instead, I would: >> >> - Introduce bitmap_xor_atomic in a previous patch >> - Use bitmap_xor_atomic here, getting rid of the rcu reads > > Hmm, however, we do not need atomic xor operation here... that should be > slower than > just two READ_ONCE calls. Yeah, I'd just go with Guangrong's version. Alternatively, add find_{first,next}_{same,different}_bit functions (whatever subset of the 4 you need). Paolo
* Xiao Guangrong (guangrong.xiao@gmail.com) wrote: > > > On 11/23/18 7:02 PM, Dr. David Alan Gilbert wrote: > > > > +#include "qemu/osdep.h" > > > +#include "qemu/bitmap.h" > > > +#include "qemu/threaded-workqueue.h" > > > + > > > +#define SMP_CACHE_BYTES 64 > > > > That's architecture dependent isn't it? > > > > Yes, it's arch dependent indeed. > > I just used 64 for simplification and i think it is <= 64 on most CPU arch-es > so that can work. > > Should i introduce statically defined CACHE LINE SIZE for all arch-es? :( I think it depends why you need it; but we shouldn't have a constant that is wrong, and we shouldn't define something architecture dependent in here. > > > + /* > > > + * the bit in these two bitmaps indicates the index of the @requests > > > + * respectively. If it's the same, the corresponding request is free > > > + * and owned by the user, i.e, where the user fills a request. Otherwise, > > > + * it is valid and owned by the thread, i.e, where the thread fetches > > > + * the request and write the result. > > > + */ > > > + > > > + /* after the user fills the request, the bit is flipped. */ > > > + uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); > > > + /* after handles the request, the thread flips the bit. */ > > > + uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); > > > > Patchew complained about some type mismatches; I think those are because > > you're using the bitmap_* functions on these; those functions always > > operate on 'long' not on uint64_t - and on some platforms they're > > unfortunately not the same. > > I guess you were taking about this error: > ERROR: externs should be avoided in .c files > #233: FILE: util/threaded-workqueue.c:65: > + uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); > > The complained thing is "QEMU_ALIGNED(SMP_CACHE_BYTES)" as it gone > when the aligned thing is removed... > > The issue you pointed out can be avoid by using type-casting, like: > bitmap_xor(..., (void *)&thread->request_fill_bitmap) > cannot we? I thought the error was just due to long vs uint64_t ratehr than the qemu_aligned. I don't think it's just a casting problem, since I don't think the long's are always 64bit. Dave > Thanks! -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
On Mon, Nov 26, 2018 at 15:57:25 +0800, Xiao Guangrong wrote: > > > On 11/23/18 7:02 PM, Dr. David Alan Gilbert wrote: > > > > +#include "qemu/osdep.h" > > > +#include "qemu/bitmap.h" > > > +#include "qemu/threaded-workqueue.h" > > > + > > > +#define SMP_CACHE_BYTES 64 > > > > That's architecture dependent isn't it? > > > > Yes, it's arch dependent indeed. > > I just used 64 for simplification and i think it is <= 64 on most CPU arch-es > so that can work. > > Should i introduce statically defined CACHE LINE SIZE for all arch-es? :( No, at compile-time this is impossible to know. We do query this info at run-time though (see util/cacheinfo.c), but using that info here would complicate things too much. You can just give it a different name, and perhaps add a comment. See for instance what we do in qht.c with QHT_BUCKET_ALIGN. Thanks, Emilio
On 11/26/18 6:56 PM, Dr. David Alan Gilbert wrote: > * Xiao Guangrong (guangrong.xiao@gmail.com) wrote: >> >> >> On 11/23/18 7:02 PM, Dr. David Alan Gilbert wrote: >> >>>> +#include "qemu/osdep.h" >>>> +#include "qemu/bitmap.h" >>>> +#include "qemu/threaded-workqueue.h" >>>> + >>>> +#define SMP_CACHE_BYTES 64 >>> >>> That's architecture dependent isn't it? >>> >> >> Yes, it's arch dependent indeed. >> >> I just used 64 for simplification and i think it is <= 64 on most CPU arch-es >> so that can work. >> >> Should i introduce statically defined CACHE LINE SIZE for all arch-es? :( > > I think it depends why you need it; but we shouldn't have a constant > that is wrong, and we shouldn't define something architecture dependent > in here. > I see. I will address Emilio's suggestion that rename SMP_CACHE_BYTES to THREAD_QUEUE_ALIGN and additional comments. >>>> + /* >>>> + * the bit in these two bitmaps indicates the index of the @requests >>>> + * respectively. If it's the same, the corresponding request is free >>>> + * and owned by the user, i.e, where the user fills a request. Otherwise, >>>> + * it is valid and owned by the thread, i.e, where the thread fetches >>>> + * the request and write the result. >>>> + */ >>>> + >>>> + /* after the user fills the request, the bit is flipped. */ >>>> + uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); >>>> + /* after handles the request, the thread flips the bit. */ >>>> + uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); >>> >>> Patchew complained about some type mismatches; I think those are because >>> you're using the bitmap_* functions on these; those functions always >>> operate on 'long' not on uint64_t - and on some platforms they're >>> unfortunately not the same. >> >> I guess you were taking about this error: >> ERROR: externs should be avoided in .c files >> #233: FILE: util/threaded-workqueue.c:65: >> + uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); >> >> The complained thing is "QEMU_ALIGNED(SMP_CACHE_BYTES)" as it gone >> when the aligned thing is removed... >> >> The issue you pointed out can be avoid by using type-casting, like: >> bitmap_xor(..., (void *)&thread->request_fill_bitmap) >> cannot we? > > I thought the error was just due to long vs uint64_t ratehr than the > qemu_aligned. I don't think it's just a casting problem, since I don't > think the long's are always 64bit. Well, i made some adjustments that makes check_patch.sh really happy :), as followings: $ git diff util/ diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c index 2ab37cee8d..e34c65a8eb 100644 --- a/util/threaded-workqueue.c +++ b/util/threaded-workqueue.c @@ -62,21 +62,30 @@ struct ThreadLocal { */ /* after the user fills the request, the bit is flipped. */ - uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); + struct { + uint64_t request_fill_bitmap; + } QEMU_ALIGNED(SMP_CACHE_BYTES); + /* after handles the request, the thread flips the bit. */ - uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); + struct { + uint64_t request_done_bitmap; + } QEMU_ALIGNED(SMP_CACHE_BYTES); /* * the event used to wake up the thread whenever a valid request has * been submitted */ - QemuEvent request_valid_ev QEMU_ALIGNED(SMP_CACHE_BYTES); + struct { + QemuEvent request_valid_ev; + } QEMU_ALIGNED(SMP_CACHE_BYTES); /* * the event is notified whenever a request has been completed * (i.e, become free), which is used to wake up the user */ - QemuEvent request_free_ev QEMU_ALIGNED(SMP_CACHE_BYTES); + struct { + QemuEvent request_free_ev; + } QEMU_ALIGNED(SMP_CACHE_BYTES); }; typedef struct ThreadLocal ThreadLocal; $ ./scripts/checkpatch.pl -f util/threaded-workqueue.c total: 0 errors, 0 warnings, 472 lines checked util/threaded-workqueue.c has no obvious style problems and is ready for submission. check_patch.sh somehow treats QEMU_ALIGNED as a function before the modification. And yes, u64 is not a long type on 32 bit arch, it's long[2] instead. that's fine when we pass the &(u64) to the function whose parameter is (long *). I thing this trick is widely used. e.g, the example in kvm that i replied to Emilio: static inline bool kvm_test_request(int req, struct kvm_vcpu *vcpu) { return test_bit(req & KVM_REQUEST_MASK, (void *)&vcpu->requests); }
On 11/27/18 2:55 AM, Emilio G. Cota wrote: > On Mon, Nov 26, 2018 at 15:57:25 +0800, Xiao Guangrong wrote: >> >> >> On 11/23/18 7:02 PM, Dr. David Alan Gilbert wrote: >> >>>> +#include "qemu/osdep.h" >>>> +#include "qemu/bitmap.h" >>>> +#include "qemu/threaded-workqueue.h" >>>> + >>>> +#define SMP_CACHE_BYTES 64 >>> >>> That's architecture dependent isn't it? >>> >> >> Yes, it's arch dependent indeed. >> >> I just used 64 for simplification and i think it is <= 64 on most CPU arch-es >> so that can work. >> >> Should i introduce statically defined CACHE LINE SIZE for all arch-es? :( > > No, at compile-time this is impossible to know. > > We do query this info at run-time though (see util/cacheinfo.c), > but using that info here would complicate things too much. I see. > > You can just give it a different name, and perhaps add a comment. > See for instance what we do in qht.c with QHT_BUCKET_ALIGN. That's really a good lesson to me, will follow it. :)
On 11/26/18 6:28 PM, Paolo Bonzini wrote: > On 26/11/18 09:18, Xiao Guangrong wrote: >>> >>>> +static uint64_t get_free_request_bitmap(Threads *threads, >>>> ThreadLocal *thread) >>>> +{ >>>> + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; >>>> + >>>> + request_fill_bitmap = >>>> atomic_rcu_read(&thread->request_fill_bitmap); >>>> + request_done_bitmap = >>>> atomic_rcu_read(&thread->request_done_bitmap); >>>> + bitmap_xor(&result_bitmap, &request_fill_bitmap, >>>> &request_done_bitmap, >>>> + threads->thread_requests_nr); >>> >>> This is not wrong, but it's a big ugly. Instead, I would: >>> >>> - Introduce bitmap_xor_atomic in a previous patch >>> - Use bitmap_xor_atomic here, getting rid of the rcu reads >> >> Hmm, however, we do not need atomic xor operation here... that should be >> slower than >> just two READ_ONCE calls. > > Yeah, I'd just go with Guangrong's version. Alternatively, add > find_{first,next}_{same,different}_bit functions (whatever subset of the > 4 you need). That's good to me. will try it. ;)
(I did not finish the review, but decided to send what I already had). > On 22 Nov 2018, at 08:20, guangrong.xiao@gmail.com wrote: > > From: Xiao Guangrong <xiaoguangrong@tencent.com> > > This modules implements the lockless and efficient threaded workqueue. I’m not entirely convinced that it’s either “lockless” or “efficient” in the current iteration. I believe that it’s relatively easy to fix, though. > > Three abstracted objects are used in this module: > - Request. > It not only contains the data that the workqueue fetches out > to finish the request but also offers the space to save the result > after the workqueue handles the request. > > It's flowed between user and workqueue. The user fills the request > data into it when it is owned by user. After it is submitted to the > workqueue, the workqueue fetched data out and save the result into > it after the request is handled. fetched -> fetches save -> saves > > All the requests are pre-allocated and carefully partitioned between > threads so there is no contention on the request, that make threads > be parallel as much as possible. That sentence confused me (it’s also in a comment in the text). I think I’m mostly confused by “there is no contention”. Perhaps you meant “so as to avoid contention if possible”? If there is a reason why there would never be any contention even if requests arrive faster than completions, I did not figure it out. I personally see serious contention on the fields in the Threads structure, for example, but also possibly on the targets of the “modulo” operation in thread_find_free_request. Specifically, if three CPUs are entering thread_find_free_request at the same time, they will all run the same loop and all, presumably, “attack” the same memory locations. Sorry if I mis-read the code, but at the moment, it does not seem to avoid contention as intended. I don’t see how it could without having some way to discriminate between CPUs to start with, which I did not find. > > - User, i.e, the submitter > It's the one fills the request and submits it to the workqueue, the one -> the one who > the result will be collected after it is handled by the work queue. > > The user can consecutively submit requests without waiting the previous waiting -> waiting for > requests been handled. > It only supports one submitter, you should do serial submission by > yourself if you want more, e.g, use lock on you side. I’m also confused by this last statement. The proposal purports to be “lockless”, which I read as working correctly without a lock… Reading the code, I indeed see issues if different threads try to place requests at the same time. So I believe the word “lockless” is a bit misleading. > > - Workqueue, i.e, thread > Each workqueue is represented by a running thread that fetches > the request submitted by the user, do the specified work and save > the result to the request. > > Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com> > --- > include/qemu/threaded-workqueue.h | 106 +++++++++ > util/Makefile.objs | 1 + > util/threaded-workqueue.c | 463 ++++++++++++++++++++++++++++++++++++++ > 3 files changed, 570 insertions(+) > create mode 100644 include/qemu/threaded-workqueue.h > create mode 100644 util/threaded-workqueue.c > > diff --git a/include/qemu/threaded-workqueue.h b/include/qemu/threaded-workqueue.h > new file mode 100644 > index 0000000000..e0ede496d0 > --- /dev/null > +++ b/include/qemu/threaded-workqueue.h > @@ -0,0 +1,106 @@ > +/* > + * Lockless and Efficient Threaded Workqueue Abstraction > + * > + * Author: > + * Xiao Guangrong <xiaoguangrong@tencent.com> > + * > + * Copyright(C) 2018 Tencent Corporation. > + * > + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. > + * See the COPYING.LIB file in the top-level directory. > + */ > + > +#ifndef QEMU_THREADED_WORKQUEUE_H > +#define QEMU_THREADED_WORKQUEUE_H > + > +#include "qemu/queue.h" > +#include "qemu/thread.h" > + > +/* > + * This modules implements the lockless and efficient threaded workqueue. > + * > + * Three abstracted objects are used in this module: > + * - Request. > + * It not only contains the data that the workqueue fetches out > + * to finish the request but also offers the space to save the result > + * after the workqueue handles the request. > + * > + * It's flowed between user and workqueue. The user fills the request > + * data into it when it is owned by user. After it is submitted to the > + * workqueue, the workqueue fetched data out and save the result into > + * it after the request is handled. > + * > + * All the requests are pre-allocated and carefully partitioned between > + * threads so there is no contention on the request, that make threads > + * be parallel as much as possible. > + * > + * - User, i.e, the submitter > + * It's the one fills the request and submits it to the workqueue, > + * the result will be collected after it is handled by the work queue. > + * > + * The user can consecutively submit requests without waiting the previous > + * requests been handled. > + * It only supports one submitter, you should do serial submission by > + * yourself if you want more, e.g, use lock on you side. > + * > + * - Workqueue, i.e, thread > + * Each workqueue is represented by a running thread that fetches > + * the request submitted by the user, do the specified work and save > + * the result to the request. > + */ > + > +typedef struct Threads Threads; > + > +struct ThreadedWorkqueueOps { > + /* constructor of the request */ > + int (*thread_request_init)(void *request); > + /* destructor of the request */ > + void (*thread_request_uninit)(void *request); > + > + /* the handler of the request that is called by the thread */ > + void (*thread_request_handler)(void *request); > + /* called by the user after the request has been handled */ > + void (*thread_request_done)(void *request); > + > + size_t request_size; > +}; > +typedef struct ThreadedWorkqueueOps ThreadedWorkqueueOps; > + > +/* the default number of requests that thread need handle */ > +#define DEFAULT_THREAD_REQUEST_NR 4 > +/* the max number of requests that thread need handle */ > +#define MAX_THREAD_REQUEST_NR (sizeof(uint64_t) * BITS_PER_BYTE) > + > +/* > + * create a threaded queue. Other APIs will work on the Threads it returned > + * > + * @name: the identity of the workqueue which is used to construct the name > + * of threads only > + * @threads_nr: the number of threads that the workqueue will create > + * @thread_requests_nr: the number of requests that each single thread will > + * handle > + * @ops: the handlers of the request > + * > + * Return NULL if it failed > + */ > +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, > + unsigned int thread_requests_nr, > + const ThreadedWorkqueueOps *ops); > +void threaded_workqueue_destroy(Threads *threads); > + > +/* > + * find a free request where the user can store the data that is needed to > + * finish the request > + * > + * If all requests are used up, return NULL > + */ > +void *threaded_workqueue_get_request(Threads *threads); Using void * to represent the payload makes it easy to get the wrong pointer in there without the compiler noticing. Consider adding a type for the payload? > +/* submit the request and notify the thread */ > +void threaded_workqueue_submit_request(Threads *threads, void *request); > + > +/* > + * wait all threads to complete the request to make sure there is no > + * previous request exists > + */ > +void threaded_workqueue_wait_for_requests(Threads *threads); > +#endif > diff --git a/util/Makefile.objs b/util/Makefile.objs > index 0820923c18..f26dfe5182 100644 > --- a/util/Makefile.objs > +++ b/util/Makefile.objs > @@ -50,5 +50,6 @@ util-obj-y += range.o > util-obj-y += stats64.o > util-obj-y += systemd.o > util-obj-y += iova-tree.o > +util-obj-y += threaded-workqueue.o > util-obj-$(CONFIG_LINUX) += vfio-helpers.o > util-obj-$(CONFIG_OPENGL) += drm.o > diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c > new file mode 100644 > index 0000000000..2ab37cee8d > --- /dev/null > +++ b/util/threaded-workqueue.c > @@ -0,0 +1,463 @@ > +/* > + * Lockless and Efficient Threaded Workqueue Abstraction > + * > + * Author: > + * Xiao Guangrong <xiaoguangrong@tencent.com> > + * > + * Copyright(C) 2018 Tencent Corporation. > + * > + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. > + * See the COPYING.LIB file in the top-level directory. > + */ > + > +#include "qemu/osdep.h" > +#include "qemu/bitmap.h" > +#include "qemu/threaded-workqueue.h" > + > +#define SMP_CACHE_BYTES 64 +1 on comments already made by others > + > +/* > + * the request representation which contains the internally used mete data, mete -> meta > + * it is the header of user-defined data. > + * > + * It should be aligned to the nature size of CPU. > + */ > +struct ThreadRequest { > + /* > + * the request has been handled by the thread and need the user > + * to fetch result out. > + */ > + uint8_t done; > + > + /* > + * the index to Thread::requests. > + * Save it to the padding space although it can be calculated at runtime. > + */ > + uint8_t request_index; So no more than 256? This is blocked by MAX_THREAD_REQUEST_NR test at the beginning of threaded_workqueue_create, but I would make it more explicit either with a compile-time assert that MAX_THREAD_REQUEST_NR is below UINT8_MAX, or by adding a second test for UINT8_MAX in threaded_workqueue_create. Also, an obvious extension would be to make bitmaps into arrays. Do you think someone would want to use the package to assign requests per CPU or per VCPU? If so, that could quickly go above 64. > + > + /* the index to Threads::per_thread_data */ > + unsigned int thread_index; Don’t you want to use a size_t for that? > +} QEMU_ALIGNED(sizeof(unsigned long)); Nit: the alignment type is inconsistent with that given to QEMU_BUILD_BUG_ON in threaded_workqueue_create. (long vs. unsigned long). Also, why is the alignment required? Aren’t you more interested in cache-line alignment? > +typedef struct ThreadRequest ThreadRequest; > + > +struct ThreadLocal { > + struct Threads *threads; > + > + /* the index of the thread */ > + int self; Why signed? > + > + /* thread is useless and needs to exit */ > + bool quit; > + > + QemuThread thread; > + > + void *requests; > + > + /* > + * the bit in these two bitmaps indicates the index of the @requests > + * respectively. If it's the same, the corresponding request is free > + * and owned by the user, i.e, where the user fills a request. Otherwise, > + * it is valid and owned by the thread, i.e, where the thread fetches > + * the request and write the result. > + */ > + > + /* after the user fills the request, the bit is flipped. */ > + uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); I believe you are trying to ensure that data accessed from multiple CPUs is on different cache lines. As others have pointed out, the real value for SMP_CACHE_BYTES can only be known at run-time. So this is not really helping. Also, the ThreadLocal structure itself is not necessarily aligned within struct Threads. Therefore, it’s possible that “requests” for example could be on the same cache line as request_fill_bitmap if planets align the wrong way. In order to mitigate these effects, I would group the data that the user writes and the data that the thread writes, i.e. reorder declarations, put request_fill_bitmap and request_valid_ev together, and try to put them in the same cache line so that only one cache line is invalidated from within mark_request_valid instead of two. Then you end up with a single alignment directive instead of 4, to separate requests from completions. That being said, I’m not sure why you use a bitmap here. What is the expected benefit relative to atomic lists (which would also make it really lock-free)? > + /* after handles the request, the thread flips the bit. */ > + uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); > + > + /* > + * the event used to wake up the thread whenever a valid request has > + * been submitted > + */ > + QemuEvent request_valid_ev QEMU_ALIGNED(SMP_CACHE_BYTES); > + > + /* > + * the event is notified whenever a request has been completed > + * (i.e, become free), which is used to wake up the user > + */ > + QemuEvent request_free_ev QEMU_ALIGNED(SMP_CACHE_BYTES); > +}; > +typedef struct ThreadLocal ThreadLocal; > + > +/* > + * the main data struct represents multithreads which is shared by > + * all threads > + */ > +struct Threads { > + /* the request header, ThreadRequest, is contained */ > + unsigned int request_size; size_t? > + unsigned int thread_requests_nr; > + unsigned int threads_nr; > + > + /* the request is pushed to the thread with round-robin manner */ > + unsigned int current_thread_index; > + > + const ThreadedWorkqueueOps *ops; > + > + ThreadLocal per_thread_data[0]; > +}; > +typedef struct Threads Threads; > + > +static ThreadRequest *index_to_request(ThreadLocal *thread, int request_index) > +{ > + ThreadRequest *request; > + > + request = thread->requests + request_index * thread->threads->request_size; > + assert(request->request_index == request_index); > + assert(request->thread_index == thread->self); > + return request; > +} > + > +static int request_to_index(ThreadRequest *request) > +{ > + return request->request_index; > +} > + > +static int request_to_thread_index(ThreadRequest *request) > +{ > + return request->thread_index; > +} > + > +/* > + * free request: the request is not used by any thread, however, it might > + * contain the result need the user to call thread_request_done() might contain the result -> might still contain the result result need the user to call -> result. The user needs to call > + * > + * valid request: the request contains the request data and it's committed > + * to the thread, i,e. it's owned by thread. > + */ > +static uint64_t get_free_request_bitmap(Threads *threads, ThreadLocal *thread) > +{ > + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; > + > + request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap); > + request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap); > + bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap, > + threads->thread_requests_nr); > + > + /* > + * paired with smp_wmb() in mark_request_free() to make sure that we > + * read request_done_bitmap before fetching the result out. > + */ > + smp_rmb(); > + > + return result_bitmap; > +} It seems that this part would be much simpler to understand using atomic lists. > + > +static ThreadRequest > +*find_thread_free_request(Threads *threads, ThreadLocal *thread) > +{ > + uint64_t result_bitmap = get_free_request_bitmap(threads, thread); > + int index; > + > + index = find_first_zero_bit(&result_bitmap, threads->thread_requests_nr); > + if (index >= threads->thread_requests_nr) { > + return NULL; > + } > + > + return index_to_request(thread, index); > +} > + > +static ThreadRequest *threads_find_free_request(Threads *threads) > +{ > + ThreadLocal *thread; > + ThreadRequest *request; > + int cur_thread, thread_index; > + > + cur_thread = threads->current_thread_index % threads->threads_nr; > + thread_index = cur_thread; > + do { > + thread = threads->per_thread_data + thread_index++; > + request = find_thread_free_request(threads, thread); > + if (request) { > + break; > + } > + thread_index %= threads->threads_nr; > + } while (thread_index != cur_thread); > + > + return request; > +} > + > +/* > + * the change bit operation combined with READ_ONCE and WRITE_ONCE which > + * only works on single uint64_t width > + */ > +static void change_bit_once(long nr, uint64_t *addr) > +{ > + uint64_t value = atomic_rcu_read(addr) ^ BIT_MASK(nr); > + > + atomic_rcu_set(addr, value); > +} > + > +static void mark_request_valid(Threads *threads, ThreadRequest *request) > +{ > + int thread_index = request_to_thread_index(request); > + int request_index = request_to_index(request); > + ThreadLocal *thread = threads->per_thread_data + thread_index; > + > + /* > + * paired with smp_rmb() in find_first_valid_request_index() to make > + * sure the request has been filled before the bit is flipped that > + * will make the request be visible to the thread > + */ > + smp_wmb(); > + > + change_bit_once(request_index, &thread->request_fill_bitmap); > + qemu_event_set(&thread->request_valid_ev); > +} > + > +static int thread_find_first_valid_request_index(ThreadLocal *thread) > +{ > + Threads *threads = thread->threads; > + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; > + int index; > + > + request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap); > + request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap); > + bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap, > + threads->thread_requests_nr); > + /* > + * paired with smp_wmb() in mark_request_valid() to make sure that > + * we read request_fill_bitmap before fetch the request out. > + */ > + smp_rmb(); > + > + index = find_first_bit(&result_bitmap, threads->thread_requests_nr); > + return index >= threads->thread_requests_nr ? -1 : index; > +} > + > +static void mark_request_free(ThreadLocal *thread, ThreadRequest *request) > +{ > + int index = request_to_index(request); > + > + /* > + * smp_wmb() is implied in change_bit_atomic() that is paired with > + * smp_rmb() in get_free_request_bitmap() to make sure the result > + * has been saved before the bit is flipped. > + */ > + change_bit_atomic(index, &thread->request_done_bitmap); > + qemu_event_set(&thread->request_free_ev); > +} > + > +/* retry to see if there is available request before actually go to wait. */ > +#define BUSY_WAIT_COUNT 1000 > + > +static ThreadRequest * > +thread_busy_wait_for_request(ThreadLocal *thread) > +{ > + int index, count = 0; > + > + for (count = 0; count < BUSY_WAIT_COUNT; count++) { > + index = thread_find_first_valid_request_index(thread); > + if (index >= 0) { > + return index_to_request(thread, index); > + } > + > + cpu_relax(); > + } > + > + return NULL; > +} > + > +static void *thread_run(void *opaque) > +{ > + ThreadLocal *self_data = (ThreadLocal *)opaque; > + Threads *threads = self_data->threads; > + void (*handler)(void *request) = threads->ops->thread_request_handler; > + ThreadRequest *request; > + > + for ( ; !atomic_read(&self_data->quit); ) { > + qemu_event_reset(&self_data->request_valid_ev); > + > + request = thread_busy_wait_for_request(self_data); > + if (!request) { > + qemu_event_wait(&self_data->request_valid_ev); > + continue; > + } > + > + assert(!request->done); > + > + handler(request + 1); > + request->done = true; > + mark_request_free(self_data, request); > + } > + > + return NULL; > +} > + > +static void uninit_thread_requests(ThreadLocal *thread, int free_nr) > +{ > + Threads *threads = thread->threads; > + ThreadRequest *request = thread->requests; > + int i; > + > + for (i = 0; i < free_nr; i++) { > + threads->ops->thread_request_uninit(request + 1); > + request = (void *)request + threads->request_size; Despite GCC’s tolerance for it and rather lengthy debates, pointer arithmetic on void * is illegal in C [1]. Consider using char * arithmetic, and using macros such as: #define request_to_payload(req) (((ThreadRequest *) req) + 1) #define payload_to_request(req) (((ThreadRequest *) req) - 1) #define request_to_next(req,threads) ((ThreadRequest *) ((char *) req) + threads->request_size)) where appropriate, that would clarify the intent. [1] https://stackoverflow.com/questions/3523145/pointer-arithmetic-for-void-pointer-in-c > + } > + g_free(thread->requests); > +} > + > +static int init_thread_requests(ThreadLocal *thread) > +{ > + Threads *threads = thread->threads; > + ThreadRequest *request; > + int ret, i, thread_reqs_size; > + > + thread_reqs_size = threads->thread_requests_nr * threads->request_size; > + thread_reqs_size = QEMU_ALIGN_UP(thread_reqs_size, SMP_CACHE_BYTES); > + thread->requests = g_malloc0(thread_reqs_size); > + > + request = thread->requests; > + for (i = 0; i < threads->thread_requests_nr; i++) { > + ret = threads->ops->thread_request_init(request + 1); > + if (ret < 0) { > + goto exit; > + } > + > + request->request_index = i; > + request->thread_index = thread->self; > + request = (void *)request + threads->request_size; Pointer arithmetic on void * is illegal in C, see above. > + } > + return 0; > + > +exit: > + uninit_thread_requests(thread, i); > + return -1; > +} > + > +static void uninit_thread_data(Threads *threads, int free_nr) > +{ > + ThreadLocal *thread_local = threads->per_thread_data; thread_local is a keyword in C++11. I would avoid it as a name, consider replacing with “per_thread_data” as in struct Threads? > + int i; > + > + for (i = 0; i < free_nr; i++) { > + thread_local[i].quit = true; > + qemu_event_set(&thread_local[i].request_valid_ev); > + qemu_thread_join(&thread_local[i].thread); > + qemu_event_destroy(&thread_local[i].request_valid_ev); > + qemu_event_destroy(&thread_local[i].request_free_ev); > + uninit_thread_requests(&thread_local[i], threads->thread_requests_nr); > + } > +} > + > +static int > +init_thread_data(Threads *threads, const char *thread_name, int thread_nr) > +{ > + ThreadLocal *thread_local = threads->per_thread_data; > + char *name; > + int i; > + > + for (i = 0; i < thread_nr; i++) { > + thread_local[i].threads = threads; > + thread_local[i].self = i; > + > + if (init_thread_requests(&thread_local[i]) < 0) { > + goto exit; > + } > + > + qemu_event_init(&thread_local[i].request_free_ev, false); > + qemu_event_init(&thread_local[i].request_valid_ev, false); > + > + name = g_strdup_printf("%s/%d", thread_name, thread_local[i].self); > + qemu_thread_create(&thread_local[i].thread, name, > + thread_run, &thread_local[i], QEMU_THREAD_JOINABLE); > + g_free(name); > + } > + return 0; > + > +exit: > + uninit_thread_data(threads, i); > + return -1; > +} > + > +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, > + unsigned int thread_requests_nr, > + const ThreadedWorkqueueOps *ops) > +{ > + Threads *threads; > + > + if (threads_nr > MAX_THREAD_REQUEST_NR) { > + return NULL; > + } > + > + threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal)); > + threads->ops = ops; > + threads->threads_nr = threads_nr; > + threads->thread_requests_nr = thread_requests_nr; > + > + QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(sizeof(ThreadRequest), sizeof(long))); > + threads->request_size = threads->ops->request_size; > + threads->request_size = QEMU_ALIGN_UP(threads->request_size, sizeof(long)); > + threads->request_size += sizeof(ThreadRequest); > + > + if (init_thread_data(threads, name, threads_nr) < 0) { > + g_free(threads); > + return NULL; > + } > + > + return threads; > +} > + > +void threaded_workqueue_destroy(Threads *threads) > +{ > + uninit_thread_data(threads, threads->threads_nr); > + g_free(threads); > +} > + > +static void request_done(Threads *threads, ThreadRequest *request) > +{ > + if (!request->done) { > + return; > + } > + > + threads->ops->thread_request_done(request + 1); > + request->done = false; > +} > + > +void *threaded_workqueue_get_request(Threads *threads) > +{ > + ThreadRequest *request; > + > + request = threads_find_free_request(threads); > + if (!request) { > + return NULL; > + } > + > + request_done(threads, request); > + return request + 1; > +} > + > +void threaded_workqueue_submit_request(Threads *threads, void *request) > +{ > + ThreadRequest *req = request - sizeof(ThreadRequest); Pointer arithmetic on void *… Please consider rewriting as: ThreadRequest *req = (ThreadRequest *) request - 1; which achieves the same objective, is legal C, and is the symmetric counterpart of “return request + 1” above. > + int thread_index = request_to_thread_index(request); > + > + assert(!req->done); > + mark_request_valid(threads, req); > + threads->current_thread_index = thread_index + 1; > +} > + > +void threaded_workqueue_wait_for_requests(Threads *threads) > +{ > + ThreadLocal *thread; > + uint64_t result_bitmap; > + int thread_index, index = 0; > + > + for (thread_index = 0; thread_index < threads->threads_nr; thread_index++) { > + thread = threads->per_thread_data + thread_index; > + index = 0; > +retry: > + qemu_event_reset(&thread->request_free_ev); > + result_bitmap = get_free_request_bitmap(threads, thread); > + > + for (; index < threads->thread_requests_nr; index++) { > + if (test_bit(index, &result_bitmap)) { > + qemu_event_wait(&thread->request_free_ev); > + goto retry; > + } > + > + request_done(threads, index_to_request(thread, index)); > + } > + } > +} > -- > 2.14.5
On 27/11/18 13:49, Christophe de Dinechin wrote: > So this is not really > helping. Also, the ThreadLocal structure itself is not necessarily aligned > within struct Threads. Therefore, it’s possible that “requests” for example > could be on the same cache line as request_fill_bitmap if planets align > the wrong way. I think this is a bit exaggerated. Linux and QEMU's own qht work just fine with compile-time directives. > In order to mitigate these effects, I would group the data that the user > writes and the data that the thread writes, i.e. reorder declarations, > put request_fill_bitmap and request_valid_ev together, and try > to put them in the same cache line so that only one cache line is invalidated > from within mark_request_valid instead of two. > > Then you end up with a single alignment directive instead of 4, to > separate requests from completions. Yeah, I agree with this. > That being said, I’m not sure why you use a bitmap here. What is the > expected benefit relative to atomic lists (which would also make it really > lock-free)? > I don't think lock-free lists are easier. Bitmaps smaller than 64 elements are both faster and easier to manage. Paolo
On Tue, Nov 27, 2018 at 13:49:13 +0100, Christophe de Dinechin wrote: > (I did not finish the review, but decided to send what I already had). > > > On 22 Nov 2018, at 08:20, guangrong.xiao@gmail.com wrote: > > > > From: Xiao Guangrong <xiaoguangrong@tencent.com> > > > > This modules implements the lockless and efficient threaded workqueue. > > I’m not entirely convinced that it’s either “lockless” or “efficient” > in the current iteration. I believe that it’s relatively easy to fix, though. (snip) > > > > All the requests are pre-allocated and carefully partitioned between > > threads so there is no contention on the request, that make threads > > be parallel as much as possible. > > That sentence confused me (it’s also in a comment in the text). > I think I’m mostly confused by “there is no contention”. Perhaps you > meant “so as to avoid contention if possible”? If there is a reason > why there would never be any contention even if requests arrive faster than > completions, I did not figure it out. > > I personally see serious contention on the fields in the Threads structure, > for example, but also possibly on the targets of the “modulo” operation in > thread_find_free_request. Specifically, if three CPUs are entering > thread_find_free_request at the same time, they will all run the same > loop and all, presumably, “attack” the same memory locations. > > Sorry if I mis-read the code, but at the moment, it does not seem to > avoid contention as intended. I don’t see how it could without having > some way to discriminate between CPUs to start with, which I did not find. You might have missed that only one thread can request jobs. So contention should only happen between that thread and the worker threads, but not among worker threads (they should only share cache lines with the requester thread). > > - User, i.e, the submitter > > It's the one fills the request and submits it to the workqueue, > the one -> the one who > > the result will be collected after it is handled by the work queue. > > > > The user can consecutively submit requests without waiting the previous > waiting -> waiting for > > requests been handled. > > It only supports one submitter, you should do serial submission by > > yourself if you want more, e.g, use lock on you side. > > I’m also confused by this last statement. The proposal purports > to be “lockless”, which I read as working correctly without a lock… > Reading the code, I indeed see issues if different threads > try to place requests at the same time. So I believe the word > “lockless” is a bit misleading. ditto, it is lockless as presented here, i.e. one requester thread. > > +static void uninit_thread_requests(ThreadLocal *thread, int free_nr) > > +{ > > + Threads *threads = thread->threads; > > + ThreadRequest *request = thread->requests; > > + int i; > > + > > + for (i = 0; i < free_nr; i++) { > > + threads->ops->thread_request_uninit(request + 1); > > + request = (void *)request + threads->request_size; > > Despite GCC’s tolerance for it and rather lengthy debates, > pointer arithmetic on void * is illegal in C [1]. > > Consider using char * arithmetic, and using macros such as: > > #define request_to_payload(req) (((ThreadRequest *) req) + 1) > #define payload_to_request(req) (((ThreadRequest *) req) - 1) > #define request_to_next(req,threads) ((ThreadRequest *) ((char *) req) + threads->request_size)) > > where appropriate, that would clarify the intent. > > [1] https://stackoverflow.com/questions/3523145/pointer-arithmetic-for-void-pointer-in-c FWIW, we use void pointer arithmetic in other places in QEMU, so I wouldn't worry about it being illegal. I like those little macros though; even better as inlines. Thanks, Emilio
On 11/27/18 8:49 PM, Christophe de Dinechin wrote: > (I did not finish the review, but decided to send what I already had). > > >> On 22 Nov 2018, at 08:20, guangrong.xiao@gmail.com wrote: >> >> From: Xiao Guangrong <xiaoguangrong@tencent.com> >> >> This modules implements the lockless and efficient threaded workqueue. > > I’m not entirely convinced that it’s either “lockless” or “efficient” > in the current iteration. I believe that it’s relatively easy to fix, though. > I think Emilio has already replied to your concern why it is "lockless". :) >> >> Three abstracted objects are used in this module: >> - Request. >> It not only contains the data that the workqueue fetches out >> to finish the request but also offers the space to save the result >> after the workqueue handles the request. >> >> It's flowed between user and workqueue. The user fills the request >> data into it when it is owned by user. After it is submitted to the >> workqueue, the workqueue fetched data out and save the result into >> it after the request is handled. > > fetched -> fetches > save -> saves Will fix... My English is even worse than C. :( >> + >> +/* >> + * find a free request where the user can store the data that is needed to >> + * finish the request >> + * >> + * If all requests are used up, return NULL >> + */ >> +void *threaded_workqueue_get_request(Threads *threads); > > Using void * to represent the payload makes it easy to get > the wrong pointer in there without the compiler noticing. > Consider adding a type for the payload? > Another option could be taken is exporting the ThreadRequest to the user and it's put at the very beginning in the user-defined data struct. However, it will export the internal designed things to the user, i am not sure it is a good idea... >> + * >> + * Author: >> + * Xiao Guangrong <xiaoguangrong@tencent.com> >> + * >> + * Copyright(C) 2018 Tencent Corporation. >> + * >> + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. >> + * See the COPYING.LIB file in the top-level directory. >> + */ >> + >> +#include "qemu/osdep.h" >> +#include "qemu/bitmap.h" >> +#include "qemu/threaded-workqueue.h" >> + >> +#define SMP_CACHE_BYTES 64 > > +1 on comments already made by others Will improve it. > >> + >> +/* >> + * the request representation which contains the internally used mete data, > > mete -> meta Will fix. > >> + * it is the header of user-defined data. >> + * >> + * It should be aligned to the nature size of CPU. >> + */ >> +struct ThreadRequest { >> + /* >> + * the request has been handled by the thread and need the user >> + * to fetch result out. >> + */ >> + uint8_t done; >> + >> + /* >> + * the index to Thread::requests. >> + * Save it to the padding space although it can be calculated at runtime. >> + */ >> + uint8_t request_index; > > So no more than 256? > > This is blocked by MAX_THREAD_REQUEST_NR test at the beginning > of threaded_workqueue_create, but I would make it more explicit either > with a compile-time assert that MAX_THREAD_REQUEST_NR is > below UINT8_MAX, or by adding a second test for UINT8_MAX in > threaded_workqueue_create. It's good to me. I prefer the former one that "compile-time assert that MAX_THREAD_REQUEST_NR is below UINT8_MAX" > > Also, an obvious extension would be to make bitmaps into arrays. > > Do you think someone would want to use the package to assign > requests per CPU or per VCPU? If so, that could quickly go above 64. > Well... it specifies the depth of each single thread, it has negative affection if larger depth is used, as it causes threaded_workqueue_wait_for_requests() too slow, at that point, the user needs to wait all the threads to exhaust all its requests. Another impact is that u64 is more efficient than bitmaps, we can see it from the performance data: https://ibb.co/hq7u5V Based on those, i think 64 should be enough, at least for the present user, migration thread. > >> + >> + /* the index to Threads::per_thread_data */ >> + unsigned int thread_index; > > Don’t you want to use a size_t for that? size_t is 8 bytes... i'd like to make the request header more tiny... > >> +} QEMU_ALIGNED(sizeof(unsigned long)); > > Nit: the alignment type is inconsistent with that given > to QEMU_BUILD_BUG_ON in threaded_workqueue_create. > (long vs. unsigned long). > Yup, will make them consistent. > Also, why is the alignment required? Aren’t you more interested > in cache-line alignment? > ThreadRequest actually is the header put at the very beginning of the request. If is not aligned to "long", the user-defined data struct could be accessed without properly aligned. > >> +typedef struct ThreadRequest ThreadRequest; > > >> + >> +struct ThreadLocal { >> + struct Threads *threads; >> + >> + /* the index of the thread */ >> + int self; > > Why signed? Mistake, will fix. > >> + >> + /* thread is useless and needs to exit */ >> + bool quit; >> + >> + QemuThread thread; >> + >> + void *requests; >> + >> + /* >> + * the bit in these two bitmaps indicates the index of the @requests >> + * respectively. If it's the same, the corresponding request is free >> + * and owned by the user, i.e, where the user fills a request. Otherwise, >> + * it is valid and owned by the thread, i.e, where the thread fetches >> + * the request and write the result. >> + */ >> + >> + /* after the user fills the request, the bit is flipped. */ >> + uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); > > I believe you are trying to ensure that data accessed from multiple CPUs > is on different cache lines. As others have pointed out, the real value for > SMP_CACHE_BYTES can only be known at run-time. So this is not really > helping. Also, the ThreadLocal structure itself is not necessarily aligned > within struct Threads. Therefore, it’s possible that “requests” for example > could be on the same cache line as request_fill_bitmap if planets align > the wrong way. > > In order to mitigate these effects, I would group the data that the user > writes and the data that the thread writes, i.e. reorder declarations, > put request_fill_bitmap and request_valid_ev together, and try > to put them in the same cache line so that only one cache line is invalidated > from within mark_request_valid instead of two. > However, QemuEvent is atomically updated at both sides, it is not good to mix it with other fields, isn't? > Then you end up with a single alignment directive instead of 4, to > separate requests from completions. > > That being said, I’m not sure why you use a bitmap here. What is the > expected benefit relative to atomic lists (which would also make it really > lock-free)? > I agree Paolo's comments in another mail. :) > >> + >> +/* >> + * the main data struct represents multithreads which is shared by >> + * all threads >> + */ >> +struct Threads { >> + /* the request header, ThreadRequest, is contained */ >> + unsigned int request_size; > > size_t? Please see the comments above about "unsigned int thread_index;" in ThreadRequest. >> +/* >> + * free request: the request is not used by any thread, however, it might >> + * contain the result need the user to call thread_request_done() > > might contain the result -> might still contain the result > result need the user to call -> result. The user needs to call > Will fix. >> +static void uninit_thread_requests(ThreadLocal *thread, int free_nr) >> +{ >> + Threads *threads = thread->threads; >> + ThreadRequest *request = thread->requests; >> + int i; >> + >> + for (i = 0; i < free_nr; i++) { >> + threads->ops->thread_request_uninit(request + 1); >> + request = (void *)request + threads->request_size; > > Despite GCC’s tolerance for it and rather lengthy debates, > pointer arithmetic on void * is illegal in C [1]. > > Consider using char * arithmetic, and using macros such as: > > #define request_to_payload(req) (((ThreadRequest *) req) + 1) > #define payload_to_request(req) (((ThreadRequest *) req) - 1) > #define request_to_next(req,threads) ((ThreadRequest *) ((char *) req) + threads->request_size)) > These definitions are really nice, will use them instead. >> +static void uninit_thread_data(Threads *threads, int free_nr) >> +{ >> + ThreadLocal *thread_local = threads->per_thread_data; > > thread_local is a keyword in C++11. I would avoid it as a name, > consider replacing with “per_thread_data” as in struct Threads? > Sure, it's good to me. >> +void threaded_workqueue_submit_request(Threads *threads, void *request) >> +{ >> + ThreadRequest *req = request - sizeof(ThreadRequest); > > Pointer arithmetic on void *… > > Please consider rewriting as: > > ThreadRequest *req = (ThreadRequest *) request - 1; > > which achieves the same objective, is legal C, and is the symmetric > counterpart of “return request + 1” above. > It's nice, indeed.
> On 27 Nov 2018, at 14:51, Paolo Bonzini <pbonzini@redhat.com> wrote: > > On 27/11/18 13:49, Christophe de Dinechin wrote: >> So this is not really >> helping. Also, the ThreadLocal structure itself is not necessarily aligned >> within struct Threads. Therefore, it’s possible that “requests” for example >> could be on the same cache line as request_fill_bitmap if planets align >> the wrong way. > > I think this is a bit exaggerated. Hence my “if planets align the wrong way” :-) But I understand that my wording came out too strong. My apologies. I think the fix is to align ThreadLocal as well. > Linux and QEMU's own qht work just fine with compile-time directives. Wouldn’t it work fine without any compile-time directive at all? Alignment is just a performance optimization. > >> In order to mitigate these effects, I would group the data that the user >> writes and the data that the thread writes, i.e. reorder declarations, >> put request_fill_bitmap and request_valid_ev together, and try >> to put them in the same cache line so that only one cache line is invalidated >> from within mark_request_valid instead of two. >> >> Then you end up with a single alignment directive instead of 4, to >> separate requests from completions. > > Yeah, I agree with this. > >> That being said, I’m not sure why you use a bitmap here. What is the >> expected benefit relative to atomic lists (which would also make it really >> lock-free)? >> > > I don't think lock-free lists are easier. Bitmaps smaller than 64 > elements are both faster and easier to manage. I believe that this is only true if you use a linked list for both freelist management and for thread notification (i.e. to replace the bitmaps). However, if you use an atomic list only for the free list, and keep bitmaps for signaling, then performance is at least equal, often better. Plus you get the added benefit of having a thread-safe API, i.e. something that is truly lock-free. I did a small experiment to test / prove this. Last commit on branch: https://github.com/c3d/recorder/commits/181122-xiao_guangdong_introduce-threaded-workqueue Take with a grain of salt, microbenchmarks are always suspect ;-) The code in “thread_test.c” includes Xiao’s code with two variations, plus some testing code lifted from the flight recorder library. 1. The FREE_LIST variation (sl_test) is what I would like to propose. 2. The BITMAP variation (bm_test) is the baseline 3. The DOUBLE_LIST variation (ll_test) is the slow double-list approach To run it, you need to do “make opt-test”, then run “test_script” which outputs a CSV file. The summary of my findings testing on a ThinkPad, a Xeon machine and a MacBook is here: https://imgur.com/a/4HmbB9K Overall, the proposed approach: - makes the API thread safe and lock free, addressing the one drawback that Xiao was mentioning. - delivers up to 30% more requests on the Macbook, while being “within noise” (sometimes marginally better) for the other two. I suspect an optimization opportunity found by clang, because the Macbook delivers really high numbers. - spends less time blocking when all threads are busy, which accounts for the higher number of client loops. If you think that makes sense, then either Xiao can adapt the code from the branch above, or I can send a follow-up patch. Thanks Christophe
On 04/12/18 16:49, Christophe de Dinechin wrote: >> Linux and QEMU's own qht work just fine with compile-time directives. > > Wouldn’t it work fine without any compile-time directive at all? Yes, that's what I meant. Though there are certainly cases in which the difference without proper cacheline alignment is an order of magnitude less throughput or something like that; it would certainly be noticeable. >> I don't think lock-free lists are easier. Bitmaps smaller than 64 >> elements are both faster and easier to manage. > > I believe that this is only true if you use a linked list for both freelist > management and for thread notification (i.e. to replace the bitmaps). > However, if you use an atomic list only for the free list, and keep > bitmaps for signaling, then performance is at least equal, often better. > Plus you get the added benefit of having a thread-safe API, i.e. > something that is truly lock-free. > > I did a small experiment to test / prove this. Last commit on branch: > https://github.com/c3d/recorder/commits/181122-xiao_guangdong_introduce-threaded-workqueue > Take with a grain of salt, microbenchmarks are always suspect ;-) > > The code in “thread_test.c” includes Xiao’s code with two variations, > plus some testing code lifted from the flight recorder library. > 1. The FREE_LIST variation (sl_test) is what I would like to propose. > 2. The BITMAP variation (bm_test) is the baseline > 3. The DOUBLE_LIST variation (ll_test) is the slow double-list approach > > To run it, you need to do “make opt-test”, then run “test_script” > which outputs a CSV file. The summary of my findings testing on > a ThinkPad, a Xeon machine and a MacBook is here: > https://imgur.com/a/4HmbB9K > > Overall, the proposed approach: > > - makes the API thread safe and lock free, addressing the one > drawback that Xiao was mentioning. > > - delivers up to 30% more requests on the Macbook, while being > “within noise” (sometimes marginally better) for the other two. > I suspect an optimization opportunity found by clang, because > the Macbook delivers really high numbers. > > - spends less time blocking when all threads are busy, which > accounts for the higher number of client loops. > > If you think that makes sense, then either Xiao can adapt the code > from the branch above, or I can send a follow-up patch. Having a follow-up patch would be best I think. Thanks for experimenting with this, it's always fun stuff. :) Paolo
On 12/5/18 1:16 AM, Paolo Bonzini wrote: > On 04/12/18 16:49, Christophe de Dinechin wrote: >>> Linux and QEMU's own qht work just fine with compile-time directives. >> >> Wouldn’t it work fine without any compile-time directive at all? > > Yes, that's what I meant. Though there are certainly cases in which the > difference without proper cacheline alignment is an order of magnitude > less throughput or something like that; it would certainly be noticeable. > >>> I don't think lock-free lists are easier. Bitmaps smaller than 64 >>> elements are both faster and easier to manage. >> >> I believe that this is only true if you use a linked list for both freelist >> management and for thread notification (i.e. to replace the bitmaps). >> However, if you use an atomic list only for the free list, and keep >> bitmaps for signaling, then performance is at least equal, often better. >> Plus you get the added benefit of having a thread-safe API, i.e. >> something that is truly lock-free. >> >> I did a small experiment to test / prove this. Last commit on branch: >> https://github.com/c3d/recorder/commits/181122-xiao_guangdong_introduce-threaded-workqueue >> Take with a grain of salt, microbenchmarks are always suspect ;-) >> >> The code in “thread_test.c” includes Xiao’s code with two variations, >> plus some testing code lifted from the flight recorder library. >> 1. The FREE_LIST variation (sl_test) is what I would like to propose. >> 2. The BITMAP variation (bm_test) is the baseline >> 3. The DOUBLE_LIST variation (ll_test) is the slow double-list approach >> >> To run it, you need to do “make opt-test”, then run “test_script” >> which outputs a CSV file. The summary of my findings testing on >> a ThinkPad, a Xeon machine and a MacBook is here: >> https://imgur.com/a/4HmbB9K >> >> Overall, the proposed approach: >> >> - makes the API thread safe and lock free, addressing the one >> drawback that Xiao was mentioning. >> >> - delivers up to 30% more requests on the Macbook, while being >> “within noise” (sometimes marginally better) for the other two. >> I suspect an optimization opportunity found by clang, because >> the Macbook delivers really high numbers. >> >> - spends less time blocking when all threads are busy, which >> accounts for the higher number of client loops. >> >> If you think that makes sense, then either Xiao can adapt the code >> from the branch above, or I can send a follow-up patch. > > Having a follow-up patch would be best I think. Thanks for > experimenting with this, it's always fun stuff. :) > Yup, Christophe, please post the follow-up patches and add yourself to the author list if you like. I am looking forward to it. :) Thanks!
diff --git a/include/qemu/threaded-workqueue.h b/include/qemu/threaded-workqueue.h new file mode 100644 index 0000000000..e0ede496d0 --- /dev/null +++ b/include/qemu/threaded-workqueue.h @@ -0,0 +1,106 @@ +/* + * Lockless and Efficient Threaded Workqueue Abstraction + * + * Author: + * Xiao Guangrong <xiaoguangrong@tencent.com> + * + * Copyright(C) 2018 Tencent Corporation. + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#ifndef QEMU_THREADED_WORKQUEUE_H +#define QEMU_THREADED_WORKQUEUE_H + +#include "qemu/queue.h" +#include "qemu/thread.h" + +/* + * This modules implements the lockless and efficient threaded workqueue. + * + * Three abstracted objects are used in this module: + * - Request. + * It not only contains the data that the workqueue fetches out + * to finish the request but also offers the space to save the result + * after the workqueue handles the request. + * + * It's flowed between user and workqueue. The user fills the request + * data into it when it is owned by user. After it is submitted to the + * workqueue, the workqueue fetched data out and save the result into + * it after the request is handled. + * + * All the requests are pre-allocated and carefully partitioned between + * threads so there is no contention on the request, that make threads + * be parallel as much as possible. + * + * - User, i.e, the submitter + * It's the one fills the request and submits it to the workqueue, + * the result will be collected after it is handled by the work queue. + * + * The user can consecutively submit requests without waiting the previous + * requests been handled. + * It only supports one submitter, you should do serial submission by + * yourself if you want more, e.g, use lock on you side. + * + * - Workqueue, i.e, thread + * Each workqueue is represented by a running thread that fetches + * the request submitted by the user, do the specified work and save + * the result to the request. + */ + +typedef struct Threads Threads; + +struct ThreadedWorkqueueOps { + /* constructor of the request */ + int (*thread_request_init)(void *request); + /* destructor of the request */ + void (*thread_request_uninit)(void *request); + + /* the handler of the request that is called by the thread */ + void (*thread_request_handler)(void *request); + /* called by the user after the request has been handled */ + void (*thread_request_done)(void *request); + + size_t request_size; +}; +typedef struct ThreadedWorkqueueOps ThreadedWorkqueueOps; + +/* the default number of requests that thread need handle */ +#define DEFAULT_THREAD_REQUEST_NR 4 +/* the max number of requests that thread need handle */ +#define MAX_THREAD_REQUEST_NR (sizeof(uint64_t) * BITS_PER_BYTE) + +/* + * create a threaded queue. Other APIs will work on the Threads it returned + * + * @name: the identity of the workqueue which is used to construct the name + * of threads only + * @threads_nr: the number of threads that the workqueue will create + * @thread_requests_nr: the number of requests that each single thread will + * handle + * @ops: the handlers of the request + * + * Return NULL if it failed + */ +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, + unsigned int thread_requests_nr, + const ThreadedWorkqueueOps *ops); +void threaded_workqueue_destroy(Threads *threads); + +/* + * find a free request where the user can store the data that is needed to + * finish the request + * + * If all requests are used up, return NULL + */ +void *threaded_workqueue_get_request(Threads *threads); +/* submit the request and notify the thread */ +void threaded_workqueue_submit_request(Threads *threads, void *request); + +/* + * wait all threads to complete the request to make sure there is no + * previous request exists + */ +void threaded_workqueue_wait_for_requests(Threads *threads); +#endif diff --git a/util/Makefile.objs b/util/Makefile.objs index 0820923c18..f26dfe5182 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -50,5 +50,6 @@ util-obj-y += range.o util-obj-y += stats64.o util-obj-y += systemd.o util-obj-y += iova-tree.o +util-obj-y += threaded-workqueue.o util-obj-$(CONFIG_LINUX) += vfio-helpers.o util-obj-$(CONFIG_OPENGL) += drm.o diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c new file mode 100644 index 0000000000..2ab37cee8d --- /dev/null +++ b/util/threaded-workqueue.c @@ -0,0 +1,463 @@ +/* + * Lockless and Efficient Threaded Workqueue Abstraction + * + * Author: + * Xiao Guangrong <xiaoguangrong@tencent.com> + * + * Copyright(C) 2018 Tencent Corporation. + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#include "qemu/osdep.h" +#include "qemu/bitmap.h" +#include "qemu/threaded-workqueue.h" + +#define SMP_CACHE_BYTES 64 + +/* + * the request representation which contains the internally used mete data, + * it is the header of user-defined data. + * + * It should be aligned to the nature size of CPU. + */ +struct ThreadRequest { + /* + * the request has been handled by the thread and need the user + * to fetch result out. + */ + uint8_t done; + + /* + * the index to Thread::requests. + * Save it to the padding space although it can be calculated at runtime. + */ + uint8_t request_index; + + /* the index to Threads::per_thread_data */ + unsigned int thread_index; +} QEMU_ALIGNED(sizeof(unsigned long)); +typedef struct ThreadRequest ThreadRequest; + +struct ThreadLocal { + struct Threads *threads; + + /* the index of the thread */ + int self; + + /* thread is useless and needs to exit */ + bool quit; + + QemuThread thread; + + void *requests; + + /* + * the bit in these two bitmaps indicates the index of the @requests + * respectively. If it's the same, the corresponding request is free + * and owned by the user, i.e, where the user fills a request. Otherwise, + * it is valid and owned by the thread, i.e, where the thread fetches + * the request and write the result. + */ + + /* after the user fills the request, the bit is flipped. */ + uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); + /* after handles the request, the thread flips the bit. */ + uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); + + /* + * the event used to wake up the thread whenever a valid request has + * been submitted + */ + QemuEvent request_valid_ev QEMU_ALIGNED(SMP_CACHE_BYTES); + + /* + * the event is notified whenever a request has been completed + * (i.e, become free), which is used to wake up the user + */ + QemuEvent request_free_ev QEMU_ALIGNED(SMP_CACHE_BYTES); +}; +typedef struct ThreadLocal ThreadLocal; + +/* + * the main data struct represents multithreads which is shared by + * all threads + */ +struct Threads { + /* the request header, ThreadRequest, is contained */ + unsigned int request_size; + unsigned int thread_requests_nr; + unsigned int threads_nr; + + /* the request is pushed to the thread with round-robin manner */ + unsigned int current_thread_index; + + const ThreadedWorkqueueOps *ops; + + ThreadLocal per_thread_data[0]; +}; +typedef struct Threads Threads; + +static ThreadRequest *index_to_request(ThreadLocal *thread, int request_index) +{ + ThreadRequest *request; + + request = thread->requests + request_index * thread->threads->request_size; + assert(request->request_index == request_index); + assert(request->thread_index == thread->self); + return request; +} + +static int request_to_index(ThreadRequest *request) +{ + return request->request_index; +} + +static int request_to_thread_index(ThreadRequest *request) +{ + return request->thread_index; +} + +/* + * free request: the request is not used by any thread, however, it might + * contain the result need the user to call thread_request_done() + * + * valid request: the request contains the request data and it's committed + * to the thread, i,e. it's owned by thread. + */ +static uint64_t get_free_request_bitmap(Threads *threads, ThreadLocal *thread) +{ + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; + + request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap); + request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap); + bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap, + threads->thread_requests_nr); + + /* + * paired with smp_wmb() in mark_request_free() to make sure that we + * read request_done_bitmap before fetching the result out. + */ + smp_rmb(); + + return result_bitmap; +} + +static ThreadRequest +*find_thread_free_request(Threads *threads, ThreadLocal *thread) +{ + uint64_t result_bitmap = get_free_request_bitmap(threads, thread); + int index; + + index = find_first_zero_bit(&result_bitmap, threads->thread_requests_nr); + if (index >= threads->thread_requests_nr) { + return NULL; + } + + return index_to_request(thread, index); +} + +static ThreadRequest *threads_find_free_request(Threads *threads) +{ + ThreadLocal *thread; + ThreadRequest *request; + int cur_thread, thread_index; + + cur_thread = threads->current_thread_index % threads->threads_nr; + thread_index = cur_thread; + do { + thread = threads->per_thread_data + thread_index++; + request = find_thread_free_request(threads, thread); + if (request) { + break; + } + thread_index %= threads->threads_nr; + } while (thread_index != cur_thread); + + return request; +} + +/* + * the change bit operation combined with READ_ONCE and WRITE_ONCE which + * only works on single uint64_t width + */ +static void change_bit_once(long nr, uint64_t *addr) +{ + uint64_t value = atomic_rcu_read(addr) ^ BIT_MASK(nr); + + atomic_rcu_set(addr, value); +} + +static void mark_request_valid(Threads *threads, ThreadRequest *request) +{ + int thread_index = request_to_thread_index(request); + int request_index = request_to_index(request); + ThreadLocal *thread = threads->per_thread_data + thread_index; + + /* + * paired with smp_rmb() in find_first_valid_request_index() to make + * sure the request has been filled before the bit is flipped that + * will make the request be visible to the thread + */ + smp_wmb(); + + change_bit_once(request_index, &thread->request_fill_bitmap); + qemu_event_set(&thread->request_valid_ev); +} + +static int thread_find_first_valid_request_index(ThreadLocal *thread) +{ + Threads *threads = thread->threads; + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; + int index; + + request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap); + request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap); + bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap, + threads->thread_requests_nr); + /* + * paired with smp_wmb() in mark_request_valid() to make sure that + * we read request_fill_bitmap before fetch the request out. + */ + smp_rmb(); + + index = find_first_bit(&result_bitmap, threads->thread_requests_nr); + return index >= threads->thread_requests_nr ? -1 : index; +} + +static void mark_request_free(ThreadLocal *thread, ThreadRequest *request) +{ + int index = request_to_index(request); + + /* + * smp_wmb() is implied in change_bit_atomic() that is paired with + * smp_rmb() in get_free_request_bitmap() to make sure the result + * has been saved before the bit is flipped. + */ + change_bit_atomic(index, &thread->request_done_bitmap); + qemu_event_set(&thread->request_free_ev); +} + +/* retry to see if there is available request before actually go to wait. */ +#define BUSY_WAIT_COUNT 1000 + +static ThreadRequest * +thread_busy_wait_for_request(ThreadLocal *thread) +{ + int index, count = 0; + + for (count = 0; count < BUSY_WAIT_COUNT; count++) { + index = thread_find_first_valid_request_index(thread); + if (index >= 0) { + return index_to_request(thread, index); + } + + cpu_relax(); + } + + return NULL; +} + +static void *thread_run(void *opaque) +{ + ThreadLocal *self_data = (ThreadLocal *)opaque; + Threads *threads = self_data->threads; + void (*handler)(void *request) = threads->ops->thread_request_handler; + ThreadRequest *request; + + for ( ; !atomic_read(&self_data->quit); ) { + qemu_event_reset(&self_data->request_valid_ev); + + request = thread_busy_wait_for_request(self_data); + if (!request) { + qemu_event_wait(&self_data->request_valid_ev); + continue; + } + + assert(!request->done); + + handler(request + 1); + request->done = true; + mark_request_free(self_data, request); + } + + return NULL; +} + +static void uninit_thread_requests(ThreadLocal *thread, int free_nr) +{ + Threads *threads = thread->threads; + ThreadRequest *request = thread->requests; + int i; + + for (i = 0; i < free_nr; i++) { + threads->ops->thread_request_uninit(request + 1); + request = (void *)request + threads->request_size; + } + g_free(thread->requests); +} + +static int init_thread_requests(ThreadLocal *thread) +{ + Threads *threads = thread->threads; + ThreadRequest *request; + int ret, i, thread_reqs_size; + + thread_reqs_size = threads->thread_requests_nr * threads->request_size; + thread_reqs_size = QEMU_ALIGN_UP(thread_reqs_size, SMP_CACHE_BYTES); + thread->requests = g_malloc0(thread_reqs_size); + + request = thread->requests; + for (i = 0; i < threads->thread_requests_nr; i++) { + ret = threads->ops->thread_request_init(request + 1); + if (ret < 0) { + goto exit; + } + + request->request_index = i; + request->thread_index = thread->self; + request = (void *)request + threads->request_size; + } + return 0; + +exit: + uninit_thread_requests(thread, i); + return -1; +} + +static void uninit_thread_data(Threads *threads, int free_nr) +{ + ThreadLocal *thread_local = threads->per_thread_data; + int i; + + for (i = 0; i < free_nr; i++) { + thread_local[i].quit = true; + qemu_event_set(&thread_local[i].request_valid_ev); + qemu_thread_join(&thread_local[i].thread); + qemu_event_destroy(&thread_local[i].request_valid_ev); + qemu_event_destroy(&thread_local[i].request_free_ev); + uninit_thread_requests(&thread_local[i], threads->thread_requests_nr); + } +} + +static int +init_thread_data(Threads *threads, const char *thread_name, int thread_nr) +{ + ThreadLocal *thread_local = threads->per_thread_data; + char *name; + int i; + + for (i = 0; i < thread_nr; i++) { + thread_local[i].threads = threads; + thread_local[i].self = i; + + if (init_thread_requests(&thread_local[i]) < 0) { + goto exit; + } + + qemu_event_init(&thread_local[i].request_free_ev, false); + qemu_event_init(&thread_local[i].request_valid_ev, false); + + name = g_strdup_printf("%s/%d", thread_name, thread_local[i].self); + qemu_thread_create(&thread_local[i].thread, name, + thread_run, &thread_local[i], QEMU_THREAD_JOINABLE); + g_free(name); + } + return 0; + +exit: + uninit_thread_data(threads, i); + return -1; +} + +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, + unsigned int thread_requests_nr, + const ThreadedWorkqueueOps *ops) +{ + Threads *threads; + + if (threads_nr > MAX_THREAD_REQUEST_NR) { + return NULL; + } + + threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal)); + threads->ops = ops; + threads->threads_nr = threads_nr; + threads->thread_requests_nr = thread_requests_nr; + + QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(sizeof(ThreadRequest), sizeof(long))); + threads->request_size = threads->ops->request_size; + threads->request_size = QEMU_ALIGN_UP(threads->request_size, sizeof(long)); + threads->request_size += sizeof(ThreadRequest); + + if (init_thread_data(threads, name, threads_nr) < 0) { + g_free(threads); + return NULL; + } + + return threads; +} + +void threaded_workqueue_destroy(Threads *threads) +{ + uninit_thread_data(threads, threads->threads_nr); + g_free(threads); +} + +static void request_done(Threads *threads, ThreadRequest *request) +{ + if (!request->done) { + return; + } + + threads->ops->thread_request_done(request + 1); + request->done = false; +} + +void *threaded_workqueue_get_request(Threads *threads) +{ + ThreadRequest *request; + + request = threads_find_free_request(threads); + if (!request) { + return NULL; + } + + request_done(threads, request); + return request + 1; +} + +void threaded_workqueue_submit_request(Threads *threads, void *request) +{ + ThreadRequest *req = request - sizeof(ThreadRequest); + int thread_index = request_to_thread_index(request); + + assert(!req->done); + mark_request_valid(threads, req); + threads->current_thread_index = thread_index + 1; +} + +void threaded_workqueue_wait_for_requests(Threads *threads) +{ + ThreadLocal *thread; + uint64_t result_bitmap; + int thread_index, index = 0; + + for (thread_index = 0; thread_index < threads->threads_nr; thread_index++) { + thread = threads->per_thread_data + thread_index; + index = 0; +retry: + qemu_event_reset(&thread->request_free_ev); + result_bitmap = get_free_request_bitmap(threads, thread); + + for (; index < threads->thread_requests_nr; index++) { + if (test_bit(index, &result_bitmap)) { + qemu_event_wait(&thread->request_free_ev); + goto retry; + } + + request_done(threads, index_to_request(thread, index)); + } + } +}