diff mbox

[v1,5/5] thread-pool: atomic fixes from tsan

Message ID 1453976119-24372-6-git-send-email-alex.bennee@linaro.org (mailing list archive)
State New, archived
Headers show

Commit Message

Alex Bennée Jan. 28, 2016, 10:15 a.m. UTC
Mark changes in thread pool state as explicitly atomic. Also in the
test-thread-pool code make accesses to data.n explicitly atomic.

Signed-off-by: Alex Bennée <alex.bennee@linaro.org>
---
 tests/test-thread-pool.c |  8 ++++----
 thread-pool.c            | 12 ++++++------
 2 files changed, 10 insertions(+), 10 deletions(-)

Comments

Paolo Bonzini Jan. 28, 2016, 10:44 a.m. UTC | #1
On 28/01/2016 11:15, Alex Bennée wrote:
> Mark changes in thread pool state as explicitly atomic. Also in the
> test-thread-pool code make accesses to data.n explicitly atomic.
> 
> Signed-off-by: Alex Bennée <alex.bennee@linaro.org>
> ---
>  tests/test-thread-pool.c |  8 ++++----
>  thread-pool.c            | 12 ++++++------
>  2 files changed, 10 insertions(+), 10 deletions(-)

Acked-by: Paolo Bonzini <pbonzini@redhat.com>

> diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c
> index ccdee39..f51e284 100644
> --- a/tests/test-thread-pool.c
> +++ b/tests/test-thread-pool.c
> @@ -46,10 +46,10 @@ static void test_submit(void)
>  {
>      WorkerTestData data = { .n = 0 };
>      thread_pool_submit(pool, worker_cb, &data);
> -    while (data.n == 0) {
> +    while (atomic_read(&data.n) == 0) {
>          aio_poll(ctx, true);
>      }
> -    g_assert_cmpint(data.n, ==, 1);
> +    g_assert_cmpint(atomic_read(&data.n), ==, 1);
>  }
>  
>  static void test_submit_aio(void)
> @@ -128,7 +128,7 @@ static void test_submit_many(void)
>          aio_poll(ctx, true);
>      }
>      for (i = 0; i < 100; i++) {
> -        g_assert_cmpint(data[i].n, ==, 1);
> +        g_assert_cmpint(atomic_read(&data[i].n), ==, 1);
>          g_assert_cmpint(data[i].ret, ==, 0);
>      }
>  }
> @@ -183,7 +183,7 @@ static void do_test_cancel(bool sync)
>      g_assert_cmpint(num_canceled, <, 100);
>  
>      for (i = 0; i < 100; i++) {
> -        if (data[i].aiocb && data[i].n != 3) {
> +        if (data[i].aiocb && (atomic_read(&data[i].n) != 3)) {
>              if (sync) {
>                  /* Canceling the others will be a blocking operation.  */
>                  bdrv_aio_cancel(data[i].aiocb);
> diff --git a/thread-pool.c b/thread-pool.c
> index 402c778..431a6fb 100644
> --- a/thread-pool.c
> +++ b/thread-pool.c
> @@ -99,15 +99,15 @@ static void *worker_thread(void *opaque)
>  
>          req = QTAILQ_FIRST(&pool->request_list);
>          QTAILQ_REMOVE(&pool->request_list, req, reqs);
> -        req->state = THREAD_ACTIVE;
> +        atomic_set(&req->state, THREAD_ACTIVE);
>          qemu_mutex_unlock(&pool->lock);
>  
>          ret = req->func(req->arg);
>  
> -        req->ret = ret;
> +        atomic_set(&req->ret, ret);
>          /* Write ret before state.  */
>          smp_wmb();
> -        req->state = THREAD_DONE;
> +        atomic_set(&req->state, THREAD_DONE);
>  
>          qemu_mutex_lock(&pool->lock);
>  
> @@ -167,7 +167,7 @@ static void thread_pool_completion_bh(void *opaque)
>  
>  restart:
>      QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
> -        if (elem->state != THREAD_DONE) {
> +        if (atomic_read(&elem->state) != THREAD_DONE) {
>              continue;
>          }
>  
> @@ -184,7 +184,7 @@ restart:
>               */
>              qemu_bh_schedule(pool->completion_bh);
>  
> -            elem->common.cb(elem->common.opaque, elem->ret);
> +            elem->common.cb(elem->common.opaque, atomic_read(&elem->ret));
>              qemu_aio_unref(elem);
>              goto restart;
>          } else {
> @@ -201,7 +201,7 @@ static void thread_pool_cancel(BlockAIOCB *acb)
>      trace_thread_pool_cancel(elem, elem->common.opaque);
>  
>      qemu_mutex_lock(&pool->lock);
> -    if (elem->state == THREAD_QUEUED &&
> +    if (atomic_read(&elem->state) == THREAD_QUEUED &&
>          /* No thread has yet started working on elem. we can try to "steal"
>           * the item from the worker if we can get a signal from the
>           * semaphore.  Because this is non-blocking, we can do it with
>
diff mbox

Patch

diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c
index ccdee39..f51e284 100644
--- a/tests/test-thread-pool.c
+++ b/tests/test-thread-pool.c
@@ -46,10 +46,10 @@  static void test_submit(void)
 {
     WorkerTestData data = { .n = 0 };
     thread_pool_submit(pool, worker_cb, &data);
-    while (data.n == 0) {
+    while (atomic_read(&data.n) == 0) {
         aio_poll(ctx, true);
     }
-    g_assert_cmpint(data.n, ==, 1);
+    g_assert_cmpint(atomic_read(&data.n), ==, 1);
 }
 
 static void test_submit_aio(void)
@@ -128,7 +128,7 @@  static void test_submit_many(void)
         aio_poll(ctx, true);
     }
     for (i = 0; i < 100; i++) {
-        g_assert_cmpint(data[i].n, ==, 1);
+        g_assert_cmpint(atomic_read(&data[i].n), ==, 1);
         g_assert_cmpint(data[i].ret, ==, 0);
     }
 }
@@ -183,7 +183,7 @@  static void do_test_cancel(bool sync)
     g_assert_cmpint(num_canceled, <, 100);
 
     for (i = 0; i < 100; i++) {
-        if (data[i].aiocb && data[i].n != 3) {
+        if (data[i].aiocb && (atomic_read(&data[i].n) != 3)) {
             if (sync) {
                 /* Canceling the others will be a blocking operation.  */
                 bdrv_aio_cancel(data[i].aiocb);
diff --git a/thread-pool.c b/thread-pool.c
index 402c778..431a6fb 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -99,15 +99,15 @@  static void *worker_thread(void *opaque)
 
         req = QTAILQ_FIRST(&pool->request_list);
         QTAILQ_REMOVE(&pool->request_list, req, reqs);
-        req->state = THREAD_ACTIVE;
+        atomic_set(&req->state, THREAD_ACTIVE);
         qemu_mutex_unlock(&pool->lock);
 
         ret = req->func(req->arg);
 
-        req->ret = ret;
+        atomic_set(&req->ret, ret);
         /* Write ret before state.  */
         smp_wmb();
-        req->state = THREAD_DONE;
+        atomic_set(&req->state, THREAD_DONE);
 
         qemu_mutex_lock(&pool->lock);
 
@@ -167,7 +167,7 @@  static void thread_pool_completion_bh(void *opaque)
 
 restart:
     QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
-        if (elem->state != THREAD_DONE) {
+        if (atomic_read(&elem->state) != THREAD_DONE) {
             continue;
         }
 
@@ -184,7 +184,7 @@  restart:
              */
             qemu_bh_schedule(pool->completion_bh);
 
-            elem->common.cb(elem->common.opaque, elem->ret);
+            elem->common.cb(elem->common.opaque, atomic_read(&elem->ret));
             qemu_aio_unref(elem);
             goto restart;
         } else {
@@ -201,7 +201,7 @@  static void thread_pool_cancel(BlockAIOCB *acb)
     trace_thread_pool_cancel(elem, elem->common.opaque);
 
     qemu_mutex_lock(&pool->lock);
-    if (elem->state == THREAD_QUEUED &&
+    if (atomic_read(&elem->state) == THREAD_QUEUED &&
         /* No thread has yet started working on elem. we can try to "steal"
          * the item from the worker if we can get a signal from the
          * semaphore.  Because this is non-blocking, we can do it with