@@ -107,7 +107,7 @@ void tpm_backend_deliver_request(TPMBackend *s, TPMBackendCmd *cmd)
s->cmd = cmd;
object_ref(OBJECT(s));
- thread_pool_submit_aio(tpm_backend_worker_thread, s,
+ thread_pool_submit_aio(tpm_backend_worker_thread, s, NULL,
tpm_backend_request_completed, s);
}
@@ -167,7 +167,7 @@ static BlockAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
acb->aio_offset = offset;
trace_file_paio_submit(acb, opaque, offset, count, type);
- return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
+ return thread_pool_submit_aio(aio_worker, acb, NULL, cb, opaque);
}
int qemu_ftruncate64(int fd, int64_t length)
@@ -41,5 +41,6 @@ static int coroutine_enter_func(void *arg)
void co_run_in_worker_bh(void *opaque)
{
Coroutine *co = opaque;
- thread_pool_submit_aio(coroutine_enter_func, co, coroutine_enter_cb, co);
+ thread_pool_submit_aio(coroutine_enter_func, co, NULL,
+ coroutine_enter_cb, co);
}
@@ -517,7 +517,7 @@ static int spapr_nvdimm_flush_post_load(void *opaque, int version_id)
}
QLIST_FOREACH(state, &s_nvdimm->pending_nvdimm_flush_states, node) {
- thread_pool_submit_aio(flush_worker_cb, state,
+ thread_pool_submit_aio(flush_worker_cb, state, NULL,
spapr_nvdimm_flush_completion_cb, state);
}
@@ -698,7 +698,7 @@ static target_ulong h_scm_flush(PowerPCCPU *cpu, SpaprMachineState *spapr,
state->drcidx = drc_index;
- thread_pool_submit_aio(flush_worker_cb, state,
+ thread_pool_submit_aio(flush_worker_cb, state, NULL,
spapr_nvdimm_flush_completion_cb, state);
continue_token = state->continue_token;
@@ -87,7 +87,7 @@ static void virtio_pmem_flush(VirtIODevice *vdev, VirtQueue *vq)
req_data->fd = memory_region_get_fd(&backend->mr);
req_data->pmem = pmem;
req_data->vdev = vdev;
- thread_pool_submit_aio(worker_cb, req_data, done_cb, req_data);
+ thread_pool_submit_aio(worker_cb, req_data, NULL, done_cb, req_data);
}
static void virtio_pmem_get_config(VirtIODevice *vdev, uint8_t *config)
@@ -33,10 +33,12 @@ void thread_pool_free(ThreadPool *pool);
* thread_pool_submit* API: submit I/O requests in the thread's
* current AioContext.
*/
-BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
+BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
+ void *arg, GDestroyNotify arg_destroy,
BlockCompletionFunc *cb, void *opaque);
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
-void thread_pool_submit(ThreadPoolFunc *func, void *arg);
+void thread_pool_submit(ThreadPoolFunc *func,
+ void *arg, GDestroyNotify arg_destroy);
void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
@@ -46,7 +46,7 @@ static void done_cb(void *opaque, int ret)
static void test_submit(void)
{
WorkerTestData data = { .n = 0 };
- thread_pool_submit(worker_cb, &data);
+ thread_pool_submit(worker_cb, &data, NULL);
while (data.n == 0) {
aio_poll(ctx, true);
}
@@ -56,7 +56,7 @@ static void test_submit(void)
static void test_submit_aio(void)
{
WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
- data.aiocb = thread_pool_submit_aio(worker_cb, &data,
+ data.aiocb = thread_pool_submit_aio(worker_cb, &data, NULL,
done_cb, &data);
/* The callbacks are not called until after the first wait. */
@@ -121,7 +121,7 @@ static void test_submit_many(void)
for (i = 0; i < 100; i++) {
data[i].n = 0;
data[i].ret = -EINPROGRESS;
- thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]);
+ thread_pool_submit_aio(worker_cb, &data[i], NULL, done_cb, &data[i]);
}
active = 100;
@@ -149,7 +149,7 @@ static void do_test_cancel(bool sync)
for (i = 0; i < 100; i++) {
data[i].n = 0;
data[i].ret = -EINPROGRESS;
- data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i],
+ data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i], NULL,
done_cb, &data[i]);
}
@@ -38,6 +38,7 @@ struct ThreadPoolElement {
ThreadPool *pool;
ThreadPoolFunc *func;
void *arg;
+ GDestroyNotify arg_destroy;
/* Moving state out of THREAD_QUEUED is protected by lock. After
* that, only the worker thread can write to it. Reads and writes
@@ -188,6 +189,10 @@ restart:
elem->ret);
QLIST_REMOVE(elem, all);
+ if (elem->arg_destroy) {
+ elem->arg_destroy(elem->arg);
+ }
+
if (elem->common.cb) {
/* Read state before ret. */
smp_rmb();
@@ -238,7 +243,8 @@ static const AIOCBInfo thread_pool_aiocb_info = {
.cancel_async = thread_pool_cancel,
};
-BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
+BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
+ void *arg, GDestroyNotify arg_destroy,
BlockCompletionFunc *cb, void *opaque)
{
ThreadPoolElement *req;
@@ -251,6 +257,7 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
req->func = func;
req->arg = arg;
+ req->arg_destroy = arg_destroy;
req->state = THREAD_QUEUED;
req->pool = pool;
@@ -285,14 +292,15 @@ int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
{
ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
assert(qemu_in_coroutine());
- thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
+ thread_pool_submit_aio(func, arg, NULL, thread_pool_co_cb, &tpc);
qemu_coroutine_yield();
return tpc.ret;
}
-void thread_pool_submit(ThreadPoolFunc *func, void *arg)
+void thread_pool_submit(ThreadPoolFunc *func,
+ void *arg, GDestroyNotify arg_destroy)
{
- thread_pool_submit_aio(func, arg, NULL, NULL);
+ thread_pool_submit_aio(func, arg, arg_destroy, NULL, NULL);
}
void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)