Message ID | 20181016111006.629-3-xiaoguangrong@tencent.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | migration: improve multithreads | expand |
On 16/10/2018 13:10, guangrong.xiao@gmail.com wrote: > From: Xiao Guangrong <xiaoguangrong@tencent.com> > > Current implementation of compression and decompression are very > hard to be enabled on productions. We noticed that too many wait-wakes > go to kernel space and CPU usages are very low even if the system > is really free > > The reasons are: > 1) there are two many locks used to do synchronous,there > is a global lock and each single thread has its own lock, > migration thread and work threads need to go to sleep if > these locks are busy > > 2) migration thread separately submits request to the thread > however, only one request can be pended, that means, the > thread has to go to sleep after finishing the request > > To make it work better, we introduce a lockless multithread model, > the user, currently it is the migration thread, submits request > to each thread which has its own ring whose capacity is 4 and > puts the result to a global ring where the user fetches result > out and do remaining operations for the request, e.g, posting the > compressed data out for migration on the source QEMU > > Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com> > --- > include/qemu/lockless-threads.h | 63 +++++++ > util/Makefile.objs | 1 + > util/lockless-threads.c | 373 ++++++++++++++++++++++++++++++++++++++++ > 3 files changed, 437 insertions(+) > create mode 100644 include/qemu/lockless-threads.h > create mode 100644 util/lockless-threads.c > > diff --git a/include/qemu/lockless-threads.h b/include/qemu/lockless-threads.h > new file mode 100644 > index 0000000000..9340d3a748 > --- /dev/null > +++ b/include/qemu/lockless-threads.h > @@ -0,0 +1,63 @@ > +/* > + * Lockless Multithreads Abstraction > + * > + * This is the abstraction layer for lockless multithreads management. > + * > + * Note: currently only one producer is allowed. > + * > + * Copyright(C) 2018 Tencent Corporation. > + * > + * Author: > + * Xiao Guangrong <xiaoguangrong@tencent.com> > + * > + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. > + * See the COPYING.LIB file in the top-level directory. > + */ > + > +#ifndef QEMU_LOCKLESS_THREAD_H > +#define QEMU_LOCKLESS_THREAD_H > + > +#include "qemu/queue.h" > +#include "qemu/thread.h" > +#include "qemu/ptr_ring.h" > + > +/* > + * the request representation which contains the internally used mete data, > + * it can be embedded to user's self-defined data struct and the user can > + * use container_of() to get the self-defined data > + */ > +struct ThreadRequest { > + QSLIST_ENTRY(ThreadRequest) node; > + unsigned int thread_index; > +}; > +typedef struct ThreadRequest ThreadRequest; > + > +typedef struct Threads Threads; The module is really nice. I just have a few suggestions on the naming and the data organization, but it's really small stuff. The only bigger suggestion is about the communication of completed requests back to the submitter. First, can you rename this module to something like ThreadedWorkqueue? (So threaded_workqueue_create, threaded_workqueue_destroy, ...) The file can also be renamed to {qemu,utils}/threaded-workqueue.[ch]. > +/* the size of thread local request ring on default */ > +#define DEFAULT_THREAD_RING_SIZE 4 > + > +Threads *threads_create(unsigned int threads_nr, const char *name, > + int thread_ring_size, > + ThreadRequest *(*thread_request_init)(void), > + void (*thread_request_uninit)(ThreadRequest *request), > + void (*thread_request_handler)(ThreadRequest *request), > + void (*thread_request_done)(ThreadRequest *request)); Please put these four members into a ThreadedWorkqueueOps struct. > +void threads_destroy(Threads *threads); > + > +/* > + * find a free request and associate it with a free thread. > + * If no request or no thread is free, return NULL > + */ > +ThreadRequest *threads_submit_request_prepare(Threads *threads); threaded_workqueue_get_request > +/* > + * push the request to its thread's local ring and notify the thread > + */ > +void threads_submit_request_commit(Threads *threads, ThreadRequest *request); threaded_workqueue_submit_request > +/* > + * wait all threads to complete the request filled in their local rings > + * to make sure there is no previous request exists. > + */ > +void threads_wait_done(Threads *threads); threaded_workqueue_wait_for_requests ? > +struct ThreadLocal { > + QemuThread thread; > + > + /* the event used to wake up the thread */ > + QemuEvent ev; > + > + struct Threads *threads; > + > + /* local request ring which is filled by the user */ > + Ptr_Ring request_ring; Put the request ring and ev first, so that they certainly fit a cacheline together. > + /* the index of the thread */ > + int self; > + > + /* thread is useless and needs to exit */ > + bool quit; > +}; > +typedef struct ThreadLocal ThreadLocal; > + > +/* > + * the main data struct represents multithreads which is shared by > + * all threads > + */ > +struct Threads { > + const char *name; > + unsigned int threads_nr; > + /* the request is pushed to the thread with round-robin manner */ > + unsigned int current_thread_index; > + > + int thread_ring_size; > + int total_requests; > + > + /* the request is pre-allocated and linked in the list */ > + int free_requests_nr; > + QSLIST_HEAD(, ThreadRequest) free_requests; > + > + /* the constructor of request */ > + ThreadRequest *(*thread_request_init)(void); > + /* the destructor of request */ > + void (*thread_request_uninit)(ThreadRequest *request); > + /* the handler of the request which is called in the thread */ > + void (*thread_request_handler)(ThreadRequest *request); > + /* > + * the handler to process the result which is called in the > + * user's context > + */ > + void (*thread_request_done)(ThreadRequest *request); You can now store the ops struct pointer here instead of the four separate fields. > + /* the thread push the result to this ring so it has multiple producers */ > + QemuSpin done_ring_lock; > + Ptr_Ring request_done_ring; Again, putting the request_done_ring first ensures that there's no false sharing with ops. Though, see more below about the request_done_ring. My suggestion below would change these three fields to: char *requests; unsigned long *completed_requests; QemuEvent complete_ev; > + ThreadLocal per_thread_data[0]; > +}; > +typedef struct Threads Threads; > + > +static void put_done_request(Threads *threads, ThreadRequest *request) > +{ > + int ret; > + > + qemu_spin_lock(&threads->done_ring_lock); > + ret = ptr_ring_produce(&threads->request_done_ring, request); > + /* there should be enough room to save all request. */ > + assert(!ret); > + qemu_spin_unlock(&threads->done_ring_lock); > +} An idea: the total number of requests is going to be very small, and a PtrRing is not the nicest data structure for multiple producer/single consumer. So you could instead: - add the size of one request to the ops structure. Move the allocation in init_requests, so that you can have one single large array that stores all requests. thread_request_init gets the pointer to a single request. - now that you have a single array (let's call it threads->requests), the request index can be found with "(req - threads->requests) / threads->ops->request_size". The thread index, furthermore, is just request_index / threads->thread_ring_size and you can remove it from ThreadRequest. - now that you have request indices, you can replace the completion ptr_ring with a bitmap, and set a bit in the bitmap with set_bit_atomic to report completion. On the writer side you use find_next_bit to find a completed request and clear_bit_atomic to clear its index. The index passed to find_next_bit is threads->current_thread_index * threads->thread_ring_size, And you also don't need find_free_thread, because you can update threads->current_thread_index to threads->current_thread_index = ((free_request_id / threads->thread_ring_size) + 1) % threads->thread_nr; after you prepare a request, and threads will then naturally receive requests in round-robin order. (If find_next_bit fails it means you have to move the current_thread_index to 0 and retry). - you don't need the free requests list and free_requests_nr either: you just initialize the completed request bitmap to all-ones, and find_next_bit + clear_bit_atomic will do the work of free_requests. ThreadRequest goes away completely now! The advantage is that you get rid of the spinlock on the consumer side, and many auxiliary data structures on the producer side: a bitmap is a much nicer data structure for multiple producer/single consumer than the PtrRing. In addition, with this design the entire Threads structure becomes read-mostly, which is nice for the cache. The disadvantage is that you add an atomic operation (clear_bit_atomic) to threads_submit_request_prepare(*). The PtrRing is still useful for the other direction, because there you have single producer/single consumer. (*) It's probably possible to have two separate bitmaps, e.g. where the producer and consumers *flip* bits and the producer looks for mismatched bits between the two bitmaps. I'm not asking you to flesh out and implement that; it's just why I think you can ignore the extra cost of clear_bit_atomic. In fact, if the maintainers think this is overkill you can go ahead with just the naming fixes and I'll take a look at a rewrite when I have some time for fun stuff. :) > +void threads_wait_done(Threads *threads) > +{ > + ThreadRequest *requests[DEFAULT_THREAD_RING_SIZE * 2]; > + int nr; > + > +retry: > + nr = ptr_ring_consume_batched(&threads->request_done_ring, > + (void **)requests, ARRAY_SIZE(requests)); This is not a fast path, so it should use an event in the thread->submitter direction too. qemu_event_set is quite fast (basically a smp_mb but no cacheline bouncing) if the event is already set, so it should not be a performance problem to add it in put_done_request after set_bit_atomic. (This is more or less unrelated to the bitmap idea above). Emilio, can you review the above ideas? Paolo > + while (nr--) { > + threads->thread_request_done(requests[nr]); > + add_free_request(threads, requests[nr]); > + } > + > + if (threads->free_requests_nr != threads->total_requests) { > + cpu_relax(); > + goto retry; > + } > +} >
On 10/17/2018 06:10 PM, Paolo Bonzini wrote: > > An idea: the total number of requests is going to be very small, and a > PtrRing is not the nicest data structure for multiple producer/single > consumer. So you could instead: > > - add the size of one request to the ops structure. Move the allocation > in init_requests, so that you can have one single large array that > stores all requests. thread_request_init gets the pointer to a single > request. > > - now that you have a single array (let's call it threads->requests), > the request index can be found with "(req - threads->requests) / > threads->ops->request_size". The thread index, furthermore, is just > request_index / threads->thread_ring_size and you can remove it from > ThreadRequest. > > - now that you have request indices, you can replace the completion > ptr_ring with a bitmap, and set a bit in the bitmap with set_bit_atomic > to report completion. On the writer side you use find_next_bit to find > a completed request and clear_bit_atomic to clear its index. The index > passed to find_next_bit is threads->current_thread_index * > threads->thread_ring_size, And you also don't need find_free_thread, > because you can update threads->current_thread_index to > > threads->current_thread_index = > ((free_request_id / threads->thread_ring_size) + 1) > % threads->thread_nr; > > after you prepare a request, and threads will then naturally receive > requests in round-robin order. (If find_next_bit fails it means you > have to move the current_thread_index to 0 and retry). > > - you don't need the free requests list and free_requests_nr either: you > just initialize the completed request bitmap to all-ones, and > find_next_bit + clear_bit_atomic will do the work of free_requests. > ThreadRequest goes away completely now! > > The advantage is that you get rid of the spinlock on the consumer side, > and many auxiliary data structures on the producer side: a bitmap is a > much nicer data structure for multiple producer/single consumer than the > PtrRing. In addition, with this design the entire Threads structure > becomes read-mostly, which is nice for the cache. The disadvantage is > that you add an atomic operation (clear_bit_atomic) to > threads_submit_request_prepare(*). > All your comments are good to me and you are a GENIUS, the idea that make the requests be a single array and partitions it like this way simplifies the thing a lot. > The PtrRing is still useful for the other direction, because there you > have single producer/single consumer. > > (*) It's probably possible to have two separate bitmaps, e.g. > where the producer and consumers *flip* bits and the > producer looks for mismatched bits between the two bitmaps. > I'm not asking you to flesh out and implement that; it's > just why I think you can ignore the extra cost of > clear_bit_atomic. In fact, if the maintainers think this > is overkill you can go ahead with just the naming fixes and > I'll take a look at a rewrite when I have some time for fun > stuff. :) > LOL! Great minds think alike, the idea, "flipping bitmaps", was in my mind too. :) Beside that... i think we get the chance to remove ptr_ring gracefully, as the bitmap can indicate the ownership of the request as well. If the bit is 1 (supposing all bits are 1 on default), only the user can operate it, the bit will be cleared after the user fills the info to the request. After that, the thread sees the bit is cleared, then it gets the ownership and finishes the request, then sets bit in the bitmap. The ownership is returned to the user again. One thing may be disadvantage is, it can't differentiate the case if the request is empty or contains the result which need the user call threads_wait_done(), that will slow threads_wait_done() a little as it need check all requests, but it is not a big deal as 1) at the point needing flush, it's high possible that all most requests have been used. 2) the total number of requests is going to be very small. It is illustrated by following code by combining the "flip" bitmaps: struct Threads { ...... /* * the bit in these two bitmaps indicates the index of the requests * respectively. If it's the same, the request is owned by the user, * i.e, only the use can use the request. Otherwise, it is owned by * the thread. */ /* after the user fills the request, the bit is flipped. */ unsigned long *request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); /* after handles the request, the thread flips the bit. */ unsigned long *request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); } threads_submit_request_prepare() { request_done_bitmap = READ_ONCE(threads->request_done_bitmap); result_bitmap = bitmap_xor(&request_done_bitmap, threads->request_fill_bitmap); index = find_first_zero_bit(current-thread-to-request-index, &result_bitmap); /* make sure we get the data the thread written. */ smp_rmb(); thread_request_done(requests[index]); ... } threads_submit_request_commit() { /* make sure the user have filled the request before we make it be viable to the threads. */ smp_wmb(); /* after that, the thread can handle the request. */ bitmap_change_bit(request-to-index, threads->request_fill_bitmap); ...... } In the thread, it does: thread_run() { index_start = threads->requests + itself->index * threads->thread_ring_size; index_end = index_start + threads->thread_ring_size; loop: request_fill_bitmap = READ_ONCE(threads->request_fill_bitmap); request_done_bitmap = READ_ONCE(threads->request_done_bitmap); result_bitmap = bitmap_xor(&request_fill_bitmap, &request_done_bitmap); index = find_first_bit_set(&result_bitmap, .start = index_start, .end = index_end); /* * paired with smp_wmb() in threads_submit_request_commit to make sure the * thread can get data filled by the user. */ smp_rmb(); request = threads->requests[index]; thread_request_handler(request); /* * updating the request is viable before flip the bitmap, paired * with smp_rmb() in threads_submit_request_prepare(). */ smp_wmb(); bitmap_change_bit_atomic(&threads->request_done_bitmap, index); ...... }
On 18/10/2018 11:30, Xiao Guangrong wrote: > Beside that... i think we get the chance to remove ptr_ring gracefully, > as the bitmap can indicate the ownership of the request as well. If > the bit is 1 (supposing all bits are 1 on default), only the user can > operate it, the bit will be cleared after the user fills the info > to the request. After that, the thread sees the bit is cleared, then > it gets the ownership and finishes the request, then sets bit in > the bitmap. The ownership is returned to the user again. Yes, even better. :) > One thing may be disadvantage is, it can't differentiate the case if the > request is empty or contains the result which need the user call > threads_wait_done(), that will slow threads_wait_done() a little as it > need check all requests, but it is not a big deal as > 1) at the point needing flush, it's high possible that all most requests > have been used. > 2) the total number of requests is going to be very small. threads_wait_done only needs to check bitmap_equal for the two bitmaps, no? (I'm not sure if, with the code below, it would be bitmap_equal or "all bits are different", i.e. xor is all ones. But it's a trivial change). > > It is illustrated by following code by combining the "flip" bitmaps: > > struct Threads { > ...... > > /* > * the bit in these two bitmaps indicates the index of the requests > * respectively. If it's the same, the request is owned by the user, > * i.e, only the use can use the request. Otherwise, it is owned by > * the thread. > */ > > /* after the user fills the request, the bit is flipped. */ > unsigned long *request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); > > /* after handles the request, the thread flips the bit. */ > unsigned long *request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); > } Note that the pointers need not be aligned, because they are only read. It's the data that should be aligned instead (qemu_memalign to allocate it). > threads_submit_request_prepare() > { > request_done_bitmap = READ_ONCE(threads->request_done_bitmap); > result_bitmap = bitmap_xor(&request_done_bitmap, > threads->request_fill_bitmap); > > index = find_first_zero_bit(current-thread-to-request-index, > &result_bitmap); find_next_zero_bit. > /* make sure we get the data the thread written. */ > smp_rmb(); > > thread_request_done(requests[index]); > ... > } > > threads_submit_request_commit() > { > /* make sure the user have filled the request before we make it > be viable to the threads. */ > smp_wmb(); > > /* after that, the thread can handle the request. */ > bitmap_change_bit(request-to-index, threads->request_fill_bitmap); > ...... > } > > In the thread, it does: > thread_run() > { > index_start = threads->requests + itself->index * > threads->thread_ring_size; > index_end = index_start + threads->thread_ring_size; > > loop: > request_fill_bitmap = READ_ONCE(threads->request_fill_bitmap); > request_done_bitmap = READ_ONCE(threads->request_done_bitmap); No need for READ_ONCE (atomic_read in QEMU), as the pointers are never written. Technically READ_ONCE _would_ be needed in bitmap_xor. Either just ignore the issue or write a find_{equal,different}_bit yourself in util/threads.c, so that it can use atomic_read. > result_bitmap = bitmap_xor(&request_fill_bitmap, &request_done_bitmap); > index = find_first_bit_set(&result_bitmap, .start = index_start, > .end = index_end); > > /* > * paired with smp_wmb() in threads_submit_request_commit to > make sure the > * thread can get data filled by the user. > */ > smp_rmb(); > > request = threads->requests[index]; > thread_request_handler(request); > > /* > * updating the request is viable before flip the bitmap, paired > * with smp_rmb() in threads_submit_request_prepare(). > */ > smp_wmb(); No need for smp_wmb before atomic_xor. > bitmap_change_bit_atomic(&threads->request_done_bitmap, index); > ...... > }
On Wed, Oct 17, 2018 at 12:10:15 +0200, Paolo Bonzini wrote: > On 16/10/2018 13:10, guangrong.xiao@gmail.com wrote: > An idea: the total number of requests is going to be very small, and a > PtrRing is not the nicest data structure for multiple producer/single > consumer. So you could instead: (snip) > - now that you have request indices, you can replace the completion > ptr_ring with a bitmap, and set a bit in the bitmap with set_bit_atomic > to report completion. On the writer side you use find_next_bit to find (snip) > Emilio, can you review the above ideas? Sorry it took me a while to go through this. I like your suggestions. Just one nit; I'm not sure I understood the use case very well, but I think using a bitmap to signal completion might be suboptimal, since we'd have several thread spinning on the same cacheline yet caring about different bits. Xiao: a couple of suggestions - Since you'll be adding a generic module, make its commit and description self-contained. That is, mentioning in the log that this will be used for migration is fine, but please describe the module (and the assumptions it makes about its users) in general, so that someone that doesn't know anything about migration can still understand this module (and hopefully adopt it for other use cases). - I'd like to see a simple test program (or rather, benchmark) that shows how this works. This benchmark would be completely unrelated to migration; it should just be a simple test of the performance/scalability of this module. Having this benchmark would help (1) discuss and quantitately evaluate modifications to the module, and (2) help others to quickly understand what the module does. See tests/qht-bench.c for an example. Thanks, Emilio
On 27/10/2018 01:33, Emilio G. Cota wrote: > On Wed, Oct 17, 2018 at 12:10:15 +0200, Paolo Bonzini wrote: >> On 16/10/2018 13:10, guangrong.xiao@gmail.com wrote: > >> An idea: the total number of requests is going to be very small, and a >> PtrRing is not the nicest data structure for multiple producer/single >> consumer. So you could instead: > (snip) >> - now that you have request indices, you can replace the completion >> ptr_ring with a bitmap, and set a bit in the bitmap with set_bit_atomic >> to report completion. On the writer side you use find_next_bit to find > (snip) >> Emilio, can you review the above ideas? > > Sorry it took me a while to go through this. > > I like your suggestions. Just one nit; I'm not sure I understood > the use case very well, but I think using a bitmap to signal > completion might be suboptimal, since we'd have several > thread spinning on the same cacheline yet caring about > different bits. Requests are asynchronous, the bitmap is only used to find a free submission slot. You're right that the bitmap can bounce across processors, but I'm not sure how else you would do that because you don't know in advance how many submitting threads you have. It wouldn't be any worse if there was a spinlock. However, in the migration case there is only one submitting thread, so it's okay. :) Paolo > Xiao: a couple of suggestions > > - Since you'll be adding a generic module, make its commit and > description self-contained. That is, mentioning in the > log that this will be used for migration is fine, but please > describe the module (and the assumptions it makes about its > users) in general, so that someone that doesn't know anything > about migration can still understand this module (and hopefully > adopt it for other use cases). > > - I'd like to see a simple test program (or rather, benchmark) > that shows how this works. This benchmark would be completely > unrelated to migration; it should just be a simple test of > the performance/scalability of this module. > Having this benchmark would help (1) discuss and quantitately > evaluate modifications to the module, and (2) help others to > quickly understand what the module does. > See tests/qht-bench.c for an example. > > Thanks, > > Emilio >
On 10/28/2018 03:50 PM, Paolo Bonzini wrote: > On 27/10/2018 01:33, Emilio G. Cota wrote: >> On Wed, Oct 17, 2018 at 12:10:15 +0200, Paolo Bonzini wrote: >>> On 16/10/2018 13:10, guangrong.xiao@gmail.com wrote: >> >>> An idea: the total number of requests is going to be very small, and a >>> PtrRing is not the nicest data structure for multiple producer/single >>> consumer. So you could instead: >> (snip) >>> - now that you have request indices, you can replace the completion >>> ptr_ring with a bitmap, and set a bit in the bitmap with set_bit_atomic >>> to report completion. On the writer side you use find_next_bit to find >> (snip) >>> Emilio, can you review the above ideas? >> >> Sorry it took me a while to go through this. >> >> I like your suggestions. Just one nit; I'm not sure I understood >> the use case very well, but I think using a bitmap to signal >> completion might be suboptimal, since we'd have several >> thread spinning on the same cacheline yet caring about >> different bits. > > Requests are asynchronous, the bitmap is only used to find a free > submission slot. You're right that the bitmap can bounce across > processors, but I'm not sure how else you would do that because you > don't know in advance how many submitting threads you have. It wouldn't > be any worse if there was a spinlock. > > However, in the migration case there is only one submitting thread, so > it's okay. :) > Yup. The cache contention only exists in the work threads, the sumbiter thread is totally free who is the main migration thread. Making the main thread be faster is good. > Paolo > >> Xiao: a couple of suggestions >> >> - Since you'll be adding a generic module, make its commit and >> description self-contained. That is, mentioning in the >> log that this will be used for migration is fine, but please >> describe the module (and the assumptions it makes about its >> users) in general, so that someone that doesn't know anything >> about migration can still understand this module (and hopefully >> adopt it for other use cases). Good to me, i will add more detailed description for this module in the next version. >> >> - I'd like to see a simple test program (or rather, benchmark) >> that shows how this works. This benchmark would be completely >> unrelated to migration; it should just be a simple test of >> the performance/scalability of this module. >> Having this benchmark would help (1) discuss and quantitately >> evaluate modifications to the module, and (2) help others to >> quickly understand what the module does. >> See tests/qht-bench.c for an example. >> Can not agree with you more, will do. :) Thank you, Emilio and Paolo!
diff --git a/include/qemu/lockless-threads.h b/include/qemu/lockless-threads.h new file mode 100644 index 0000000000..9340d3a748 --- /dev/null +++ b/include/qemu/lockless-threads.h @@ -0,0 +1,63 @@ +/* + * Lockless Multithreads Abstraction + * + * This is the abstraction layer for lockless multithreads management. + * + * Note: currently only one producer is allowed. + * + * Copyright(C) 2018 Tencent Corporation. + * + * Author: + * Xiao Guangrong <xiaoguangrong@tencent.com> + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#ifndef QEMU_LOCKLESS_THREAD_H +#define QEMU_LOCKLESS_THREAD_H + +#include "qemu/queue.h" +#include "qemu/thread.h" +#include "qemu/ptr_ring.h" + +/* + * the request representation which contains the internally used mete data, + * it can be embedded to user's self-defined data struct and the user can + * use container_of() to get the self-defined data + */ +struct ThreadRequest { + QSLIST_ENTRY(ThreadRequest) node; + unsigned int thread_index; +}; +typedef struct ThreadRequest ThreadRequest; + +typedef struct Threads Threads; + +/* the size of thread local request ring on default */ +#define DEFAULT_THREAD_RING_SIZE 4 + +Threads *threads_create(unsigned int threads_nr, const char *name, + int thread_ring_size, + ThreadRequest *(*thread_request_init)(void), + void (*thread_request_uninit)(ThreadRequest *request), + void (*thread_request_handler)(ThreadRequest *request), + void (*thread_request_done)(ThreadRequest *request)); +void threads_destroy(Threads *threads); + +/* + * find a free request and associate it with a free thread. + * If no request or no thread is free, return NULL + */ +ThreadRequest *threads_submit_request_prepare(Threads *threads); +/* + * push the request to its thread's local ring and notify the thread + */ +void threads_submit_request_commit(Threads *threads, ThreadRequest *request); + +/* + * wait all threads to complete the request filled in their local rings + * to make sure there is no previous request exists. + */ +void threads_wait_done(Threads *threads); +#endif diff --git a/util/Makefile.objs b/util/Makefile.objs index 0820923c18..deb5c972d5 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -50,5 +50,6 @@ util-obj-y += range.o util-obj-y += stats64.o util-obj-y += systemd.o util-obj-y += iova-tree.o +util-obj-y += lockless-threads.o util-obj-$(CONFIG_LINUX) += vfio-helpers.o util-obj-$(CONFIG_OPENGL) += drm.o diff --git a/util/lockless-threads.c b/util/lockless-threads.c new file mode 100644 index 0000000000..50cf143c03 --- /dev/null +++ b/util/lockless-threads.c @@ -0,0 +1,373 @@ +/* + * Lockless Multithreads Implementation + * + * Implement lockless multithreads management. + * + * Note: currently only one producer is allowed. + * + * Copyright(C) 2018 Tencent Corporation. + * + * Author: + * Xiao Guangrong <xiaoguangrong@tencent.com> + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#include "qemu/osdep.h" +#include "qemu/host-utils.h" +#include "qemu/lockless-threads.h" + +struct ThreadLocal { + QemuThread thread; + + /* the event used to wake up the thread */ + QemuEvent ev; + + struct Threads *threads; + + /* local request ring which is filled by the user */ + Ptr_Ring request_ring; + + /* the index of the thread */ + int self; + + /* thread is useless and needs to exit */ + bool quit; +}; +typedef struct ThreadLocal ThreadLocal; + +/* + * the main data struct represents multithreads which is shared by + * all threads + */ +struct Threads { + const char *name; + unsigned int threads_nr; + /* the request is pushed to the thread with round-robin manner */ + unsigned int current_thread_index; + + int thread_ring_size; + int total_requests; + + /* the request is pre-allocated and linked in the list */ + int free_requests_nr; + QSLIST_HEAD(, ThreadRequest) free_requests; + + /* the constructor of request */ + ThreadRequest *(*thread_request_init)(void); + /* the destructor of request */ + void (*thread_request_uninit)(ThreadRequest *request); + /* the handler of the request which is called in the thread */ + void (*thread_request_handler)(ThreadRequest *request); + /* + * the handler to process the result which is called in the + * user's context + */ + void (*thread_request_done)(ThreadRequest *request); + + /* the thread push the result to this ring so it has multiple producers */ + QemuSpin done_ring_lock; + Ptr_Ring request_done_ring; + + ThreadLocal per_thread_data[0]; +}; +typedef struct Threads Threads; + +static void put_done_request(Threads *threads, ThreadRequest *request) +{ + int ret; + + qemu_spin_lock(&threads->done_ring_lock); + ret = ptr_ring_produce(&threads->request_done_ring, request); + /* there should be enough room to save all request. */ + assert(!ret); + qemu_spin_unlock(&threads->done_ring_lock); +} + +/* retry to see if there is avilable request before actually go to wait. */ +#define BUSY_WAIT_COUNT 1000 + +static ThreadRequest *thread_busy_wait_for_request(ThreadLocal *thread) +{ + ThreadRequest *request; + int count = 0; + + for (count = 0; count < BUSY_WAIT_COUNT; count++) { + request = ptr_ring_consume(&thread->request_ring); + if (request) { + return request; + } + + cpu_relax(); + } + + return NULL; +} + +static void *thread_run(void *opaque) +{ + ThreadLocal *self_data = (ThreadLocal *)opaque; + Threads *threads = self_data->threads; + void (*handler)(ThreadRequest *data) = threads->thread_request_handler; + ThreadRequest *request; + + for ( ; !atomic_read(&self_data->quit); ) { + qemu_event_reset(&self_data->ev); + + request = thread_busy_wait_for_request(self_data); + if (!request) { + qemu_event_wait(&self_data->ev); + continue; + } + handler(request); + put_done_request(threads, request); + } + + return NULL; +} + +static void add_free_request(Threads *threads, ThreadRequest *request) +{ + QSLIST_INSERT_HEAD(&threads->free_requests, request, node); + threads->free_requests_nr++; +} + +static ThreadRequest *get_and_remove_first_free_request(Threads *threads) +{ + ThreadRequest *request; + + if (QSLIST_EMPTY(&threads->free_requests)) { + return NULL; + } + + request = QSLIST_FIRST(&threads->free_requests); + QSLIST_REMOVE_HEAD(&threads->free_requests, node); + threads->free_requests_nr--; + return request; +} + +static void uninit_requests(Threads *threads, int free_nr) +{ + ThreadRequest *request; + + /* + * all requests should be released to the list if threads are being + * destroyed, i,e. should call threads_wait_done() first. + */ + assert(threads->free_requests_nr == free_nr); + + while ((request = get_and_remove_first_free_request(threads))) { + threads->thread_request_uninit(request); + } + + assert(ptr_ring_empty(&threads->request_done_ring)); + ptr_ring_cleanup(&threads->request_done_ring, NULL); +} + +static int init_requests(Threads *threads, int total_requests) +{ + ThreadRequest *request; + int i, free_nr = 0; + + if (ptr_ring_init(&threads->request_done_ring, total_requests) < 0) { + return -1; + } + ptr_ring_disable_batch(&threads->request_done_ring); + + QSLIST_INIT(&threads->free_requests); + for (i = 0; i < total_requests; i++) { + request = threads->thread_request_init(); + if (!request) { + goto cleanup; + } + + free_nr++; + add_free_request(threads, request); + } + return 0; + +cleanup: + uninit_requests(threads, free_nr); + return -1; +} + +static void uninit_thread_data(Threads *threads, int num) +{ + ThreadLocal *thread_local = threads->per_thread_data; + int i; + + for (i = 0; i < num; i++) { + thread_local[i].quit = true; + qemu_event_set(&thread_local[i].ev); + qemu_thread_join(&thread_local[i].thread); + qemu_event_destroy(&thread_local[i].ev); + assert(ptr_ring_empty(&thread_local[i].request_ring)); + + /* nothing is left in the ring. */ + ptr_ring_cleanup(&thread_local[i].request_ring, NULL); + } +} + +static int init_thread_data(Threads *threads, int threads_nr) +{ + ThreadLocal *thread_local = threads->per_thread_data; + char *name; + int i; + + for (i = 0; i < threads_nr; i++) { + if (ptr_ring_init(&thread_local[i].request_ring, + threads->thread_ring_size) < 0) { + goto exit; + } + ptr_ring_disable_batch(&thread_local[i].request_ring); + + qemu_event_init(&thread_local[i].ev, false); + thread_local[i].threads = threads; + thread_local[i].self = i; + name = g_strdup_printf("%s/%d", threads->name, thread_local[i].self); + qemu_thread_create(&thread_local[i].thread, name, + thread_run, &thread_local[i], QEMU_THREAD_JOINABLE); + g_free(name); + } + return 0; + + exit: + uninit_thread_data(threads, i); + return -1; +} + +Threads *threads_create(unsigned int threads_nr, const char *name, + int thread_ring_size, + ThreadRequest *(*thread_request_init)(void), + void (*thread_request_uninit)(ThreadRequest *request), + void (*thread_request_handler)(ThreadRequest *request), + void (*thread_request_done)(ThreadRequest *request)) +{ + Threads *threads; + int total_requests; + + threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal)); + threads->name = name; + threads->thread_request_init = thread_request_init; + threads->thread_request_uninit = thread_request_uninit; + threads->thread_request_handler = thread_request_handler; + threads->thread_request_done = thread_request_done; + qemu_spin_init(&threads->done_ring_lock); + + threads->thread_ring_size = thread_ring_size; + total_requests = threads->thread_ring_size * threads_nr; + if (init_requests(threads, total_requests) < 0) { + goto exit; + } + threads->total_requests = total_requests; + + if (init_thread_data(threads, threads_nr) < 0) { + goto exit; + } + threads->threads_nr = threads_nr; + return threads; + +exit: + threads_destroy(threads); + return NULL; +} + +void threads_destroy(Threads *threads) +{ + uninit_thread_data(threads, threads->threads_nr); + uninit_requests(threads, threads->total_requests); + g_free(threads); +} + +static int find_free_thread(Threads *threads, int range) +{ + int current_index, index, try = 0; + + current_index = threads->current_thread_index % threads->threads_nr; + index = current_index; + + do { + index = index % threads->threads_nr; + if (!ptr_ring_full(&threads->per_thread_data[index].request_ring)) { + threads->current_thread_index = index; + return index; + } + + if (++try > range) { + return -1; + } + } while (++index != current_index); + + return -1; +} + +ThreadRequest *threads_submit_request_prepare(Threads *threads) +{ + ThreadRequest *request; + int index; + + /* seek a free one in all threads. */ + index = find_free_thread(threads, threads->threads_nr); + if (index < 0) { + return NULL; + } + + /* try to get the request from the list */ + request = get_and_remove_first_free_request(threads); + if (request) { + goto got_request; + } + + /* get the request already been handled by the threads */ + request = ptr_ring_consume(&threads->request_done_ring); + if (request) { + threads->thread_request_done(request); + goto got_request; + } + + return NULL; + +got_request: + request->thread_index = index; + return request; +} + +void threads_submit_request_commit(Threads *threads, ThreadRequest *request) +{ + int ret, index = request->thread_index; + ThreadLocal *thread_local = &threads->per_thread_data[index]; + + ret = ptr_ring_produce(&thread_local->request_ring, request); + + /* + * we have detected that the thread's ring is not full in + * threads_submit_request_prepare(), there should be free + * room in the ring + */ + assert(!ret); + /* new request arrived, notify the thread */ + qemu_event_set(&thread_local->ev); + + /* we have used this entry, search from the next one. */ + threads->current_thread_index = ++index; +} + +void threads_wait_done(Threads *threads) +{ + ThreadRequest *requests[DEFAULT_THREAD_RING_SIZE * 2]; + int nr; + +retry: + nr = ptr_ring_consume_batched(&threads->request_done_ring, + (void **)requests, ARRAY_SIZE(requests)); + while (nr--) { + threads->thread_request_done(requests[nr]); + add_free_request(threads, requests[nr]); + } + + if (threads->free_requests_nr != threads->total_requests) { + cpu_relax(); + goto retry; + } +}