diff mbox series

PCI/DOE: Remove asynchronous task support

Message ID 20221119222527.1799836-1-ira.weiny@intel.com (mailing list archive)
State Superseded
Headers show
Series PCI/DOE: Remove asynchronous task support | expand

Commit Message

Ira Weiny Nov. 19, 2022, 10:25 p.m. UTC
From: Ira Weiny <ira.weiny@intel.com>

Gregory Price and Jonathan Cameron reported a bug within
pci_doe_submit_task().[1]  The issue was that work item initialization
needs to be done with either INIT_WORK_ONSTACK() or INIT_WORK()
depending on how the work item is allocated.

Initially, it was anticipated that DOE tasks were going to need to be
submitted asynchronously and the code was designed thusly.  Many
alternatives were discussed to fix the work initialization issue.[2]

However, all current users submit tasks synchronously and this has
therefore become an unneeded maintenance burden.  Remove the extra
maintenance burden by replacing asynchronous task submission with
a synchronous wait function.[3]

[1] https://lore.kernel.org/linux-cxl/20221014151045.24781-1-Jonathan.Cameron@huawei.com/T/#m88a7f50dcce52f30c8bf5c3dcc06fa9843b54a2d
[2] https://lore.kernel.org/linux-cxl/Y3kSDQDur+IUDs2O@iweiny-mobl/T/#m0f057773d9c75432fcfcc54a2604483fe82abe92
[3] https://lore.kernel.org/linux-cxl/Y3kSDQDur+IUDs2O@iweiny-mobl/T/#m32d3f9b208ef7486bc148d94a326b26b2d3e69ff

Reported-by: Gregory Price <gregory.price@memverge.com>
Reported-by: Jonathan Cameron <Jonathan.Cameron@huawei.com>
Suggested-by: Dan Williams <dan.j.williams@intel.com>
Suggested-by: "Li, Ming" <ming4.li@intel.com>
Signed-off-by: Ira Weiny <ira.weiny@intel.com>

---
Thanks to Dan for the bulk of the patch.
Thanks to Ming for pointing out the need for a lock to prevent more
than 1 task from being processed at a time.
---
 drivers/cxl/core/pci.c  | 16 ++------
 drivers/pci/doe.c       | 83 ++++++++++++++---------------------------
 include/linux/pci-doe.h | 10 +----
 3 files changed, 32 insertions(+), 77 deletions(-)


base-commit: b6e7fdfd6f6a8bf88fcdb4a45da52c42ba238c25

Comments

Ira Weiny Nov. 20, 2022, 1:57 p.m. UTC | #1
On Sun, Nov 20, 2022 at 10:27:35AM +0800, Hillf Danton wrote:
> On Sat, 19 Nov 2022 14:25:27 -0800 Ira Weiny <ira.weiny@intel.com>
> > @@ -529,8 +492,18 @@ int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
> >  		return -EIO;
> >  
> >  	task->doe_mb = doe_mb;
> > -	INIT_WORK(&task->work, doe_statemachine_work);
> > -	queue_work(doe_mb->work_queue, &task->work);
> > +
> > +again:
> > +	if (!mutex_trylock(&doe_mb->exec_lock)) {
> > +		if (wait_event_timeout(task->doe_mb->wq,
> > +				test_bit(PCI_DOE_FLAG_CANCEL, &doe_mb->flags),
> > +				PCI_DOE_POLL_INTERVAL))
> > +			return -EIO;
> 
> Is EIO worth a line of pr_warn()?

Maybe but I'm not sure it is worth it.  This was paralleling the original code
which called pci_doe_flush_mb() to shut down the mailbox.  So this is likely to
never happen.  The callers could print something if needed.

> 
> > +		goto again;
> > +	}
> > +	exec_task(task);
> > +	mutex_unlock(&doe_mb->exec_lock);
> > +
> 
> If it is likely to take two minutes to acquire the exec_lock after
> rounds of trying again, trylock + wait timeout barely make sense given EIO.

I'm not sure where 2 minutes come from?

#define PCI_DOE_TIMEOUT HZ
#define PCI_DOE_POLL_INTERVAL   (PCI_DOE_TIMEOUT / 128)

It is also not anticipated that more than 1 task is being given to the mailbox
but the protection needs to be there because exec_task() will get confused if
more than 1 thread submits at the same time.

All this said I've now convinced myself that there is a race in the use of
PCI_DOE_FLAG_CANCEL even with the existing code.

I believe that if the pci device goes away the doe_mb structure may get free'ed
prior to other threads having a chance to check doe_mb->flags.  Worse yet the
work queue itself (doe_mb->wq) may become invalid...

I don't believe this can currently happen because anyone using the doe_mb
structure has a reference to the pci device.

With this patch I think all the doe_mb->flags and the wait queue can go away.
pci_doe_wait() can be replaced with a simple msleep_interruptible().

Let me work through that a bit.

Ira

> 
> Hillf
> 
> /**
>  * wait_event_timeout - sleep until a condition gets true or a timeout elapses
>  * @wq_head: the waitqueue to wait on
>  * @condition: a C expression for the event to wait for
>  * @timeout: timeout, in jiffies
>  *
>  * The process is put to sleep (TASK_UNINTERRUPTIBLE) until the
>  * @condition evaluates to true. The @condition is checked each time
>  * the waitqueue @wq_head is woken up.
>  *
>  * wake_up() has to be called after changing any variable that could
>  * change the result of the wait condition.
>  *
>  * Returns:
>  * 0 if the @condition evaluated to %false after the @timeout elapsed,
>  * 1 if the @condition evaluated to %true after the @timeout elapsed,
>  * or the remaining jiffies (at least 1) if the @condition evaluated
>  * to %true before the @timeout elapsed.
>  */
Li, Ming4 Nov. 21, 2022, 1:39 a.m. UTC | #2
On 11/20/2022 6:25 AM, ira.weiny@intel.com wrote:
> From: Ira Weiny <ira.weiny@intel.com>
> 
> Gregory Price and Jonathan Cameron reported a bug within
> pci_doe_submit_task().[1]  The issue was that work item initialization
> needs to be done with either INIT_WORK_ONSTACK() or INIT_WORK()
> depending on how the work item is allocated.
> 
> Initially, it was anticipated that DOE tasks were going to need to be
> submitted asynchronously and the code was designed thusly.  Many
> alternatives were discussed to fix the work initialization issue.[2]
> 
> However, all current users submit tasks synchronously and this has
> therefore become an unneeded maintenance burden.  Remove the extra
> maintenance burden by replacing asynchronous task submission with
> a synchronous wait function.[3]
> 
> [1] https://lore.kernel.org/linux-cxl/20221014151045.24781-1-Jonathan.Cameron@huawei.com/T/#m88a7f50dcce52f30c8bf5c3dcc06fa9843b54a2d
> [2] https://lore.kernel.org/linux-cxl/Y3kSDQDur+IUDs2O@iweiny-mobl/T/#m0f057773d9c75432fcfcc54a2604483fe82abe92
> [3] https://lore.kernel.org/linux-cxl/Y3kSDQDur+IUDs2O@iweiny-mobl/T/#m32d3f9b208ef7486bc148d94a326b26b2d3e69ff
> 
> Reported-by: Gregory Price <gregory.price@memverge.com>
> Reported-by: Jonathan Cameron <Jonathan.Cameron@huawei.com>
> Suggested-by: Dan Williams <dan.j.williams@intel.com>
> Suggested-by: "Li, Ming" <ming4.li@intel.com>
> Signed-off-by: Ira Weiny <ira.weiny@intel.com>
> 
> ---
> Thanks to Dan for the bulk of the patch.
> Thanks to Ming for pointing out the need for a lock to prevent more
> than 1 task from being processed at a time.
> ---
>  drivers/cxl/core/pci.c  | 16 ++------
>  drivers/pci/doe.c       | 83 ++++++++++++++---------------------------
>  include/linux/pci-doe.h | 10 +----
>  3 files changed, 32 insertions(+), 77 deletions(-)
> 
> diff --git a/drivers/cxl/core/pci.c b/drivers/cxl/core/pci.c
> index 9240df53ed87..58977e0712b6 100644
> --- a/drivers/cxl/core/pci.c
> +++ b/drivers/cxl/core/pci.c
> @@ -490,21 +490,14 @@ static struct pci_doe_mb *find_cdat_doe(struct device *uport)
>  		    CXL_DOE_TABLE_ACCESS_TABLE_TYPE_CDATA) |		\
>  	 FIELD_PREP(CXL_DOE_TABLE_ACCESS_ENTRY_HANDLE, (entry_handle)))
>  
> -static void cxl_doe_task_complete(struct pci_doe_task *task)
> -{
> -	complete(task->private);
> -}
> -
>  struct cdat_doe_task {
>  	u32 request_pl;
>  	u32 response_pl[32];
> -	struct completion c;
>  	struct pci_doe_task task;
>  };
>  
>  #define DECLARE_CDAT_DOE_TASK(req, cdt)                       \
>  struct cdat_doe_task cdt = {                                  \
> -	.c = COMPLETION_INITIALIZER_ONSTACK(cdt.c),           \
>  	.request_pl = req,				      \
>  	.task = {                                             \
>  		.prot.vid = PCI_DVSEC_VENDOR_ID_CXL,        \
> @@ -513,8 +506,6 @@ struct cdat_doe_task cdt = {                                  \
>  		.request_pl_sz = sizeof(cdt.request_pl),      \
>  		.response_pl = cdt.response_pl,               \
>  		.response_pl_sz = sizeof(cdt.response_pl),    \
> -		.complete = cxl_doe_task_complete,            \
> -		.private = &cdt.c,                            \
>  	}                                                     \
>  }
>  
> @@ -525,12 +516,12 @@ static int cxl_cdat_get_length(struct device *dev,
>  	DECLARE_CDAT_DOE_TASK(CDAT_DOE_REQ(0), t);
>  	int rc;
>  
> -	rc = pci_doe_submit_task(cdat_doe, &t.task);
> +	rc = pci_doe_submit_task_wait(cdat_doe, &t.task);
>  	if (rc < 0) {
>  		dev_err(dev, "DOE submit failed: %d", rc);
>  		return rc;
>  	}
> -	wait_for_completion(&t.c);
> +
>  	if (t.task.rv < sizeof(u32))
>  		return -EIO;
>  
> @@ -554,12 +545,11 @@ static int cxl_cdat_read_table(struct device *dev,
>  		u32 *entry;
>  		int rc;
>  
> -		rc = pci_doe_submit_task(cdat_doe, &t.task);
> +		rc = pci_doe_submit_task_wait(cdat_doe, &t.task);
>  		if (rc < 0) {
>  			dev_err(dev, "DOE submit failed: %d", rc);
>  			return rc;
>  		}
> -		wait_for_completion(&t.c);
>  		/* 1 DW header + 1 DW data min */
>  		if (t.task.rv < (2 * sizeof(u32)))
>  			return -EIO;
> diff --git a/drivers/pci/doe.c b/drivers/pci/doe.c
> index e402f05068a5..41a75112b39b 100644
> --- a/drivers/pci/doe.c
> +++ b/drivers/pci/doe.c
> @@ -18,7 +18,6 @@
>  #include <linux/mutex.h>
>  #include <linux/pci.h>
>  #include <linux/pci-doe.h>
> -#include <linux/workqueue.h>
>  
>  #define PCI_DOE_PROTOCOL_DISCOVERY 0
>  
> @@ -40,7 +39,7 @@
>   * @cap_offset: Capability offset
>   * @prots: Array of protocols supported (encoded as long values)
>   * @wq: Wait queue for work item
> - * @work_queue: Queue of pci_doe_work items
> + * @exec_lock: Lock to ensure 1 task is processed at a time
>   * @flags: Bit array of PCI_DOE_FLAG_* flags
>   */
>  struct pci_doe_mb {
> @@ -49,7 +48,7 @@ struct pci_doe_mb {
>  	struct xarray prots;
>  
>  	wait_queue_head_t wq;
> -	struct workqueue_struct *work_queue;
> +	struct mutex exec_lock;
>  	unsigned long flags;
>  };
>  
> @@ -211,7 +210,6 @@ static int pci_doe_recv_resp(struct pci_doe_mb *doe_mb, struct pci_doe_task *tas
>  static void signal_task_complete(struct pci_doe_task *task, int rv)
>  {
>  	task->rv = rv;
> -	task->complete(task);
>  }
>  
>  static void signal_task_abort(struct pci_doe_task *task, int rv)
> @@ -231,10 +229,8 @@ static void signal_task_abort(struct pci_doe_task *task, int rv)
>  	signal_task_complete(task, rv);
>  }
>  
> -static void doe_statemachine_work(struct work_struct *work)
> +static void exec_task(struct pci_doe_task *task)
>  {
> -	struct pci_doe_task *task = container_of(work, struct pci_doe_task,
> -						 work);
>  	struct pci_doe_mb *doe_mb = task->doe_mb;
>  	struct pci_dev *pdev = doe_mb->pdev;
>  	int offset = doe_mb->cap_offset;
> @@ -295,18 +291,12 @@ static void doe_statemachine_work(struct work_struct *work)
>  	signal_task_complete(task, rc);
>  }
>  
> -static void pci_doe_task_complete(struct pci_doe_task *task)
> -{
> -	complete(task->private);
> -}
> -
>  static int pci_doe_discovery(struct pci_doe_mb *doe_mb, u8 *index, u16 *vid,
>  			     u8 *protocol)
>  {
>  	u32 request_pl = FIELD_PREP(PCI_DOE_DATA_OBJECT_DISC_REQ_3_INDEX,
>  				    *index);
>  	u32 response_pl;
> -	DECLARE_COMPLETION_ONSTACK(c);
>  	struct pci_doe_task task = {
>  		.prot.vid = PCI_VENDOR_ID_PCI_SIG,
>  		.prot.type = PCI_DOE_PROTOCOL_DISCOVERY,
> @@ -314,17 +304,13 @@ static int pci_doe_discovery(struct pci_doe_mb *doe_mb, u8 *index, u16 *vid,
>  		.request_pl_sz = sizeof(request_pl),
>  		.response_pl = &response_pl,
>  		.response_pl_sz = sizeof(response_pl),
> -		.complete = pci_doe_task_complete,
> -		.private = &c,
>  	};
>  	int rc;
>  
> -	rc = pci_doe_submit_task(doe_mb, &task);
> +	rc = pci_doe_submit_task_wait(doe_mb, &task);
>  	if (rc < 0)
>  		return rc;
>  
> -	wait_for_completion(&c);
> -
>  	if (task.rv != sizeof(response_pl))
>  		return -EIO;
>  
> @@ -376,13 +362,6 @@ static void pci_doe_xa_destroy(void *mb)
>  	xa_destroy(&doe_mb->prots);
>  }
>  
> -static void pci_doe_destroy_workqueue(void *mb)
> -{
> -	struct pci_doe_mb *doe_mb = mb;
> -
> -	destroy_workqueue(doe_mb->work_queue);
> -}
> -
>  static void pci_doe_flush_mb(void *mb)
>  {
>  	struct pci_doe_mb *doe_mb = mb;
> @@ -390,12 +369,9 @@ static void pci_doe_flush_mb(void *mb)
>  	/* Stop all pending work items from starting */
>  	set_bit(PCI_DOE_FLAG_DEAD, &doe_mb->flags);
>  
> -	/* Cancel an in progress work item, if necessary */
> +	/* Cancel the in progress task and waiting tasks, if necessary */
>  	set_bit(PCI_DOE_FLAG_CANCEL, &doe_mb->flags);
>  	wake_up(&doe_mb->wq);
should use wake_up_all() to wake up all waiting tasks here?

> -
> -	/* Flush all work items */
> -	flush_workqueue(doe_mb->work_queue);
>  }
>  
>  /**
> @@ -423,25 +399,13 @@ struct pci_doe_mb *pcim_doe_create_mb(struct pci_dev *pdev, u16 cap_offset)
>  	doe_mb->pdev = pdev;
>  	doe_mb->cap_offset = cap_offset;
>  	init_waitqueue_head(&doe_mb->wq);
> +	mutex_init(&doe_mb->exec_lock);
>  
>  	xa_init(&doe_mb->prots);
>  	rc = devm_add_action(dev, pci_doe_xa_destroy, doe_mb);
>  	if (rc)
>  		return ERR_PTR(rc);
>  
> -	doe_mb->work_queue = alloc_ordered_workqueue("%s %s DOE [%x]", 0,
> -						dev_driver_string(&pdev->dev),
> -						pci_name(pdev),
> -						doe_mb->cap_offset);
> -	if (!doe_mb->work_queue) {
> -		pci_err(pdev, "[%x] failed to allocate work queue\n",
> -			doe_mb->cap_offset);
> -		return ERR_PTR(-ENOMEM);
> -	}
> -	rc = devm_add_action_or_reset(dev, pci_doe_destroy_workqueue, doe_mb);
> -	if (rc)
> -		return ERR_PTR(rc);
> -
>  	/* Reset the mailbox by issuing an abort */
>  	rc = pci_doe_abort(doe_mb);
>  	if (rc) {
> @@ -496,23 +460,22 @@ bool pci_doe_supports_prot(struct pci_doe_mb *doe_mb, u16 vid, u8 type)
>  EXPORT_SYMBOL_GPL(pci_doe_supports_prot);
>  
>  /**
> - * pci_doe_submit_task() - Submit a task to be processed by the state machine
> + * pci_doe_submit_task_wait() - Submit and execute a task
>   *
>   * @doe_mb: DOE mailbox capability to submit to
> - * @task: task to be queued
> - *
> - * Submit a DOE task (request/response) to the DOE mailbox to be processed.
> - * Returns upon queueing the task object.  If the queue is full this function
> - * will sleep until there is room in the queue.
> + * @task: task to be run
>   *
> - * task->complete will be called when the state machine is done processing this
> - * task.
> + * Submit and run DOE task (request/response) to the DOE mailbox to be
> + * processed.
>   *
>   * Excess data will be discarded.
>   *
> - * RETURNS: 0 when task has been successfully queued, -ERRNO on error
> + * Context: non-interrupt
> + *
> + * RETURNS: 0 when task was executed, the @task->rv holds the status
> + * result of the executed opertion, -ERRNO on failure to submit.
>   */
> -int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
> +int pci_doe_submit_task_wait(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
>  {
>  	if (!pci_doe_supports_prot(doe_mb, task->prot.vid, task->prot.type))
>  		return -EINVAL;
> @@ -529,8 +492,18 @@ int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
>  		return -EIO;
>  
>  	task->doe_mb = doe_mb;
> -	INIT_WORK(&task->work, doe_statemachine_work);
> -	queue_work(doe_mb->work_queue, &task->work);
> +
> +again:
> +	if (!mutex_trylock(&doe_mb->exec_lock)) {
> +		if (wait_event_timeout(task->doe_mb->wq,
> +				test_bit(PCI_DOE_FLAG_CANCEL, &doe_mb->flags),
> +				PCI_DOE_POLL_INTERVAL))
> +			return -EIO;

We already implemented a pci_doe_wait(), I think we can use it to instead of this wait_event_timeout.

Thanks
Ming

> +		goto again;
> +	}
> +	exec_task(task);
> +	mutex_unlock(&doe_mb->exec_lock);
> +
>  	return 0;
>  }
> -EXPORT_SYMBOL_GPL(pci_doe_submit_task);
> +EXPORT_SYMBOL_GPL(pci_doe_submit_task_wait);
> diff --git a/include/linux/pci-doe.h b/include/linux/pci-doe.h
> index ed9b4df792b8..c94122a66221 100644
> --- a/include/linux/pci-doe.h
> +++ b/include/linux/pci-doe.h
> @@ -30,8 +30,6 @@ struct pci_doe_mb;
>   * @response_pl_sz: Size of the response payload (bytes)
>   * @rv: Return value.  Length of received response or error (bytes)
>   * @complete: Called when task is complete
> - * @private: Private data for the consumer
> - * @work: Used internally by the mailbox
>   * @doe_mb: Used internally by the mailbox
>   *
>   * The payload sizes and rv are specified in bytes with the following
> @@ -50,11 +48,6 @@ struct pci_doe_task {
>  	u32 *response_pl;
>  	size_t response_pl_sz;
>  	int rv;
> -	void (*complete)(struct pci_doe_task *task);
> -	void *private;
> -
> -	/* No need for the user to initialize these fields */
> -	struct work_struct work;
>  	struct pci_doe_mb *doe_mb;
>  };
>  
> @@ -72,6 +65,5 @@ struct pci_doe_task {
>  
>  struct pci_doe_mb *pcim_doe_create_mb(struct pci_dev *pdev, u16 cap_offset);
>  bool pci_doe_supports_prot(struct pci_doe_mb *doe_mb, u16 vid, u8 type);
> -int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task);
> -
> +int pci_doe_submit_task_wait(struct pci_doe_mb *doe_mb, struct pci_doe_task *task);
>  #endif
> 
> base-commit: b6e7fdfd6f6a8bf88fcdb4a45da52c42ba238c25
Zhuo, Qiuxu Nov. 21, 2022, 2:01 a.m. UTC | #3
> From: Ira Weiny <ira.weiny@intel.com>
> ...
> 
> @@ -423,25 +399,13 @@ struct pci_doe_mb
> *pcim_doe_create_mb(struct pci_dev *pdev, u16 cap_offset)
>  	doe_mb->pdev = pdev;
>  	doe_mb->cap_offset = cap_offset;
>  	init_waitqueue_head(&doe_mb->wq);
> +	mutex_init(&doe_mb->exec_lock);

In real world, not sure whether there is a case that pcim_doe_create_mb() is invoked by 
multiple drivers to create multiple DOE mailbox instances? If there is such a case, we may 
need to ensure there is only one DOE mailbox instance for a physical DOE of pdev @cap_offset.

-Qiuxu
Li, Ming4 Nov. 21, 2022, 2:07 a.m. UTC | #4
On 11/21/2022 9:39 AM, Li, Ming wrote:
> On 11/20/2022 6:25 AM, ira.weiny@intel.com wrote:
>> From: Ira Weiny <ira.weiny@intel.com>
>>
>> Gregory Price and Jonathan Cameron reported a bug within
>> pci_doe_submit_task().[1]  The issue was that work item initialization
>> needs to be done with either INIT_WORK_ONSTACK() or INIT_WORK()
>> depending on how the work item is allocated.
>>
>> Initially, it was anticipated that DOE tasks were going to need to be
>> submitted asynchronously and the code was designed thusly.  Many
>> alternatives were discussed to fix the work initialization issue.[2]
>>
>> However, all current users submit tasks synchronously and this has
>> therefore become an unneeded maintenance burden.  Remove the extra
>> maintenance burden by replacing asynchronous task submission with
>> a synchronous wait function.[3]
>>
>> [1] https://lore.kernel.org/linux-cxl/20221014151045.24781-1-Jonathan.Cameron@huawei.com/T/#m88a7f50dcce52f30c8bf5c3dcc06fa9843b54a2d
>> [2] https://lore.kernel.org/linux-cxl/Y3kSDQDur+IUDs2O@iweiny-mobl/T/#m0f057773d9c75432fcfcc54a2604483fe82abe92
>> [3] https://lore.kernel.org/linux-cxl/Y3kSDQDur+IUDs2O@iweiny-mobl/T/#m32d3f9b208ef7486bc148d94a326b26b2d3e69ff
>>
>> Reported-by: Gregory Price <gregory.price@memverge.com>
>> Reported-by: Jonathan Cameron <Jonathan.Cameron@huawei.com>
>> Suggested-by: Dan Williams <dan.j.williams@intel.com>
>> Suggested-by: "Li, Ming" <ming4.li@intel.com>
>> Signed-off-by: Ira Weiny <ira.weiny@intel.com>
>>
>> ---
>> Thanks to Dan for the bulk of the patch.
>> Thanks to Ming for pointing out the need for a lock to prevent more
>> than 1 task from being processed at a time.
>> ---
>>  drivers/cxl/core/pci.c  | 16 ++------
>>  drivers/pci/doe.c       | 83 ++++++++++++++---------------------------
>>  include/linux/pci-doe.h | 10 +----
>>  3 files changed, 32 insertions(+), 77 deletions(-)
>>
>> diff --git a/drivers/cxl/core/pci.c b/drivers/cxl/core/pci.c
>> index 9240df53ed87..58977e0712b6 100644
>> --- a/drivers/cxl/core/pci.c
>> +++ b/drivers/cxl/core/pci.c
>> @@ -490,21 +490,14 @@ static struct pci_doe_mb *find_cdat_doe(struct device *uport)
>>  		    CXL_DOE_TABLE_ACCESS_TABLE_TYPE_CDATA) |		\
>>  	 FIELD_PREP(CXL_DOE_TABLE_ACCESS_ENTRY_HANDLE, (entry_handle)))
>>  
>> -static void cxl_doe_task_complete(struct pci_doe_task *task)
>> -{
>> -	complete(task->private);
>> -}
>> -
>>  struct cdat_doe_task {
>>  	u32 request_pl;
>>  	u32 response_pl[32];
>> -	struct completion c;
>>  	struct pci_doe_task task;
>>  };
>>  
>>  #define DECLARE_CDAT_DOE_TASK(req, cdt)                       \
>>  struct cdat_doe_task cdt = {                                  \
>> -	.c = COMPLETION_INITIALIZER_ONSTACK(cdt.c),           \
>>  	.request_pl = req,				      \
>>  	.task = {                                             \
>>  		.prot.vid = PCI_DVSEC_VENDOR_ID_CXL,        \
>> @@ -513,8 +506,6 @@ struct cdat_doe_task cdt = {                                  \
>>  		.request_pl_sz = sizeof(cdt.request_pl),      \
>>  		.response_pl = cdt.response_pl,               \
>>  		.response_pl_sz = sizeof(cdt.response_pl),    \
>> -		.complete = cxl_doe_task_complete,            \
>> -		.private = &cdt.c,                            \
>>  	}                                                     \
>>  }
>>  
>> @@ -525,12 +516,12 @@ static int cxl_cdat_get_length(struct device *dev,
>>  	DECLARE_CDAT_DOE_TASK(CDAT_DOE_REQ(0), t);
>>  	int rc;
>>  
>> -	rc = pci_doe_submit_task(cdat_doe, &t.task);
>> +	rc = pci_doe_submit_task_wait(cdat_doe, &t.task);
>>  	if (rc < 0) {
>>  		dev_err(dev, "DOE submit failed: %d", rc);
>>  		return rc;
>>  	}
>> -	wait_for_completion(&t.c);
>> +
>>  	if (t.task.rv < sizeof(u32))
>>  		return -EIO;
>>  
>> @@ -554,12 +545,11 @@ static int cxl_cdat_read_table(struct device *dev,
>>  		u32 *entry;
>>  		int rc;
>>  
>> -		rc = pci_doe_submit_task(cdat_doe, &t.task);
>> +		rc = pci_doe_submit_task_wait(cdat_doe, &t.task);
>>  		if (rc < 0) {
>>  			dev_err(dev, "DOE submit failed: %d", rc);
>>  			return rc;
>>  		}
>> -		wait_for_completion(&t.c);
>>  		/* 1 DW header + 1 DW data min */
>>  		if (t.task.rv < (2 * sizeof(u32)))
>>  			return -EIO;
>> diff --git a/drivers/pci/doe.c b/drivers/pci/doe.c
>> index e402f05068a5..41a75112b39b 100644
>> --- a/drivers/pci/doe.c
>> +++ b/drivers/pci/doe.c
>> @@ -18,7 +18,6 @@
>>  #include <linux/mutex.h>
>>  #include <linux/pci.h>
>>  #include <linux/pci-doe.h>
>> -#include <linux/workqueue.h>
>>  
>>  #define PCI_DOE_PROTOCOL_DISCOVERY 0
>>  
>> @@ -40,7 +39,7 @@
>>   * @cap_offset: Capability offset
>>   * @prots: Array of protocols supported (encoded as long values)
>>   * @wq: Wait queue for work item
>> - * @work_queue: Queue of pci_doe_work items
>> + * @exec_lock: Lock to ensure 1 task is processed at a time
>>   * @flags: Bit array of PCI_DOE_FLAG_* flags
>>   */
>>  struct pci_doe_mb {
>> @@ -49,7 +48,7 @@ struct pci_doe_mb {
>>  	struct xarray prots;
>>  
>>  	wait_queue_head_t wq;
>> -	struct workqueue_struct *work_queue;
>> +	struct mutex exec_lock;
>>  	unsigned long flags;
>>  };
>>  
>> @@ -211,7 +210,6 @@ static int pci_doe_recv_resp(struct pci_doe_mb *doe_mb, struct pci_doe_task *tas
>>  static void signal_task_complete(struct pci_doe_task *task, int rv)
>>  {
>>  	task->rv = rv;
>> -	task->complete(task);
>>  }
>>  
>>  static void signal_task_abort(struct pci_doe_task *task, int rv)
>> @@ -231,10 +229,8 @@ static void signal_task_abort(struct pci_doe_task *task, int rv)
>>  	signal_task_complete(task, rv);
>>  }
>>  
>> -static void doe_statemachine_work(struct work_struct *work)
>> +static void exec_task(struct pci_doe_task *task)
>>  {
>> -	struct pci_doe_task *task = container_of(work, struct pci_doe_task,
>> -						 work);
>>  	struct pci_doe_mb *doe_mb = task->doe_mb;
>>  	struct pci_dev *pdev = doe_mb->pdev;
>>  	int offset = doe_mb->cap_offset;
>> @@ -295,18 +291,12 @@ static void doe_statemachine_work(struct work_struct *work)
>>  	signal_task_complete(task, rc);
>>  }
>>  
>> -static void pci_doe_task_complete(struct pci_doe_task *task)
>> -{
>> -	complete(task->private);
>> -}
>> -
>>  static int pci_doe_discovery(struct pci_doe_mb *doe_mb, u8 *index, u16 *vid,
>>  			     u8 *protocol)
>>  {
>>  	u32 request_pl = FIELD_PREP(PCI_DOE_DATA_OBJECT_DISC_REQ_3_INDEX,
>>  				    *index);
>>  	u32 response_pl;
>> -	DECLARE_COMPLETION_ONSTACK(c);
>>  	struct pci_doe_task task = {
>>  		.prot.vid = PCI_VENDOR_ID_PCI_SIG,
>>  		.prot.type = PCI_DOE_PROTOCOL_DISCOVERY,
>> @@ -314,17 +304,13 @@ static int pci_doe_discovery(struct pci_doe_mb *doe_mb, u8 *index, u16 *vid,
>>  		.request_pl_sz = sizeof(request_pl),
>>  		.response_pl = &response_pl,
>>  		.response_pl_sz = sizeof(response_pl),
>> -		.complete = pci_doe_task_complete,
>> -		.private = &c,
>>  	};
>>  	int rc;
>>  
>> -	rc = pci_doe_submit_task(doe_mb, &task);
>> +	rc = pci_doe_submit_task_wait(doe_mb, &task);
>>  	if (rc < 0)
>>  		return rc;
>>  
>> -	wait_for_completion(&c);
>> -
>>  	if (task.rv != sizeof(response_pl))
>>  		return -EIO;
>>  
>> @@ -376,13 +362,6 @@ static void pci_doe_xa_destroy(void *mb)
>>  	xa_destroy(&doe_mb->prots);
>>  }
>>  
>> -static void pci_doe_destroy_workqueue(void *mb)
>> -{
>> -	struct pci_doe_mb *doe_mb = mb;
>> -
>> -	destroy_workqueue(doe_mb->work_queue);
>> -}
>> -
>>  static void pci_doe_flush_mb(void *mb)
>>  {
>>  	struct pci_doe_mb *doe_mb = mb;
>> @@ -390,12 +369,9 @@ static void pci_doe_flush_mb(void *mb)
>>  	/* Stop all pending work items from starting */
>>  	set_bit(PCI_DOE_FLAG_DEAD, &doe_mb->flags);
>>  
>> -	/* Cancel an in progress work item, if necessary */
>> +	/* Cancel the in progress task and waiting tasks, if necessary */
>>  	set_bit(PCI_DOE_FLAG_CANCEL, &doe_mb->flags);
>>  	wake_up(&doe_mb->wq);
> should use wake_up_all() to wake up all waiting tasks here?
> 
>> -
>> -	/* Flush all work items */
>> -	flush_workqueue(doe_mb->work_queue);
>>  }
>>  
>>  /**
>> @@ -423,25 +399,13 @@ struct pci_doe_mb *pcim_doe_create_mb(struct pci_dev *pdev, u16 cap_offset)
>>  	doe_mb->pdev = pdev;
>>  	doe_mb->cap_offset = cap_offset;
>>  	init_waitqueue_head(&doe_mb->wq);
>> +	mutex_init(&doe_mb->exec_lock);
>>  
>>  	xa_init(&doe_mb->prots);
>>  	rc = devm_add_action(dev, pci_doe_xa_destroy, doe_mb);
>>  	if (rc)
>>  		return ERR_PTR(rc);
>>  
>> -	doe_mb->work_queue = alloc_ordered_workqueue("%s %s DOE [%x]", 0,
>> -						dev_driver_string(&pdev->dev),
>> -						pci_name(pdev),
>> -						doe_mb->cap_offset);
>> -	if (!doe_mb->work_queue) {
>> -		pci_err(pdev, "[%x] failed to allocate work queue\n",
>> -			doe_mb->cap_offset);
>> -		return ERR_PTR(-ENOMEM);
>> -	}
>> -	rc = devm_add_action_or_reset(dev, pci_doe_destroy_workqueue, doe_mb);
>> -	if (rc)
>> -		return ERR_PTR(rc);
>> -
>>  	/* Reset the mailbox by issuing an abort */
>>  	rc = pci_doe_abort(doe_mb);
>>  	if (rc) {
>> @@ -496,23 +460,22 @@ bool pci_doe_supports_prot(struct pci_doe_mb *doe_mb, u16 vid, u8 type)
>>  EXPORT_SYMBOL_GPL(pci_doe_supports_prot);
>>  
>>  /**
>> - * pci_doe_submit_task() - Submit a task to be processed by the state machine
>> + * pci_doe_submit_task_wait() - Submit and execute a task
>>   *
>>   * @doe_mb: DOE mailbox capability to submit to
>> - * @task: task to be queued
>> - *
>> - * Submit a DOE task (request/response) to the DOE mailbox to be processed.
>> - * Returns upon queueing the task object.  If the queue is full this function
>> - * will sleep until there is room in the queue.
>> + * @task: task to be run
>>   *
>> - * task->complete will be called when the state machine is done processing this
>> - * task.
>> + * Submit and run DOE task (request/response) to the DOE mailbox to be
>> + * processed.
>>   *
>>   * Excess data will be discarded.
>>   *
>> - * RETURNS: 0 when task has been successfully queued, -ERRNO on error
>> + * Context: non-interrupt
>> + *
>> + * RETURNS: 0 when task was executed, the @task->rv holds the status
>> + * result of the executed opertion, -ERRNO on failure to submit.
>>   */
>> -int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
>> +int pci_doe_submit_task_wait(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
>>  {
>>  	if (!pci_doe_supports_prot(doe_mb, task->prot.vid, task->prot.type))
>>  		return -EINVAL;
>> @@ -529,8 +492,18 @@ int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
>>  		return -EIO;
>>  
>>  	task->doe_mb = doe_mb;
>> -	INIT_WORK(&task->work, doe_statemachine_work);
>> -	queue_work(doe_mb->work_queue, &task->work);
>> +
>> +again:
>> +	if (!mutex_trylock(&doe_mb->exec_lock)) {
>> +		if (wait_event_timeout(task->doe_mb->wq,
>> +				test_bit(PCI_DOE_FLAG_CANCEL, &doe_mb->flags),
>> +				PCI_DOE_POLL_INTERVAL))
>> +			return -EIO;
> 
> We already implemented a pci_doe_wait(), I think we can use it to instead of this wait_event_timeout.
> 
> Thanks
> Ming
> 

This wait_event_timeout() only check PCI_DOE_FLAG_CANCEL, that means it only detects the signal which the doe_mb has being destroyed.
If current doe task is done correctly, I think we should wake up next task. Current implementation just waits utill timeout happens and try it again.
Besides, If two threads are waiting a same doe_mb, thread #1 waited firstly, thread #2 waited secondly, there is a chance that thread #2 is processed before thread #1.

Thanks
Ming

>> +		goto again;
>> +	}
>> +	exec_task(task);
>> +	mutex_unlock(&doe_mb->exec_lock);
>> +
>>  	return 0;
>>  }
>> -EXPORT_SYMBOL_GPL(pci_doe_submit_task);
>> +EXPORT_SYMBOL_GPL(pci_doe_submit_task_wait);
>> diff --git a/include/linux/pci-doe.h b/include/linux/pci-doe.h
>> index ed9b4df792b8..c94122a66221 100644
>> --- a/include/linux/pci-doe.h
>> +++ b/include/linux/pci-doe.h
>> @@ -30,8 +30,6 @@ struct pci_doe_mb;
>>   * @response_pl_sz: Size of the response payload (bytes)
>>   * @rv: Return value.  Length of received response or error (bytes)
>>   * @complete: Called when task is complete
>> - * @private: Private data for the consumer
>> - * @work: Used internally by the mailbox
>>   * @doe_mb: Used internally by the mailbox
>>   *
>>   * The payload sizes and rv are specified in bytes with the following
>> @@ -50,11 +48,6 @@ struct pci_doe_task {
>>  	u32 *response_pl;
>>  	size_t response_pl_sz;
>>  	int rv;
>> -	void (*complete)(struct pci_doe_task *task);
>> -	void *private;
>> -
>> -	/* No need for the user to initialize these fields */
>> -	struct work_struct work;
>>  	struct pci_doe_mb *doe_mb;
>>  };
>>  
>> @@ -72,6 +65,5 @@ struct pci_doe_task {
>>  
>>  struct pci_doe_mb *pcim_doe_create_mb(struct pci_dev *pdev, u16 cap_offset);
>>  bool pci_doe_supports_prot(struct pci_doe_mb *doe_mb, u16 vid, u8 type);
>> -int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task);
>> -
>> +int pci_doe_submit_task_wait(struct pci_doe_mb *doe_mb, struct pci_doe_task *task);
>>  #endif
>>
>> base-commit: b6e7fdfd6f6a8bf88fcdb4a45da52c42ba238c25
Jonathan Cameron Nov. 21, 2022, 11:07 a.m. UTC | #5
On Mon, 21 Nov 2022 02:01:32 +0000
"Zhuo, Qiuxu" <qiuxu.zhuo@intel.com> wrote:

> > From: Ira Weiny <ira.weiny@intel.com>  
> > ...
> > 
> > @@ -423,25 +399,13 @@ struct pci_doe_mb
> > *pcim_doe_create_mb(struct pci_dev *pdev, u16 cap_offset)
> >  	doe_mb->pdev = pdev;
> >  	doe_mb->cap_offset = cap_offset;
> >  	init_waitqueue_head(&doe_mb->wq);
> > +	mutex_init(&doe_mb->exec_lock);  
> 
> In real world, not sure whether there is a case that pcim_doe_create_mb() is invoked by 
> multiple drivers to create multiple DOE mailbox instances? If there is such a case, we may 
> need to ensure there is only one DOE mailbox instance for a physical DOE of pdev @cap_offset.

I think if that happened we'd have a lot of mess.  The main PCI driver for a given
EP, switch port etc needs to handle this part.

Sub drivers can then do similar to
https://elixir.bootlin.com/linux/latest/source/drivers/cxl/core/pci.c#L465
to find a DOE instance that supports what they need and use it.
The DOE code 'should' work fine when doing this - the request/response pairs
will be serialized.

We have discussed moving that 'find' logic and the xarray into the PCI core
and that will need to happen to support CMA etc. For the first submission it
was easier to just do it in the CXL drivers..

Jonathan

> 
> -Qiuxu
Jonathan Cameron Nov. 21, 2022, 11:19 a.m. UTC | #6
On Sat, 19 Nov 2022 14:25:27 -0800
ira.weiny@intel.com wrote:

> From: Ira Weiny <ira.weiny@intel.com>
> 
> Gregory Price and Jonathan Cameron reported a bug within
> pci_doe_submit_task().[1]  The issue was that work item initialization
> needs to be done with either INIT_WORK_ONSTACK() or INIT_WORK()
> depending on how the work item is allocated.
> 
> Initially, it was anticipated that DOE tasks were going to need to be
> submitted asynchronously and the code was designed thusly.  Many
> alternatives were discussed to fix the work initialization issue.[2]
> 
> However, all current users submit tasks synchronously and this has
> therefore become an unneeded maintenance burden.  Remove the extra
> maintenance burden by replacing asynchronous task submission with
> a synchronous wait function.[3]
> 
> [1] https://lore.kernel.org/linux-cxl/20221014151045.24781-1-Jonathan.Cameron@huawei.com/T/#m88a7f50dcce52f30c8bf5c3dcc06fa9843b54a2d
> [2] https://lore.kernel.org/linux-cxl/Y3kSDQDur+IUDs2O@iweiny-mobl/T/#m0f057773d9c75432fcfcc54a2604483fe82abe92
> [3] https://lore.kernel.org/linux-cxl/Y3kSDQDur+IUDs2O@iweiny-mobl/T/#m32d3f9b208ef7486bc148d94a326b26b2d3e69ff
> 
> Reported-by: Gregory Price <gregory.price@memverge.com>
> Reported-by: Jonathan Cameron <Jonathan.Cameron@huawei.com>
> Suggested-by: Dan Williams <dan.j.williams@intel.com>
> Suggested-by: "Li, Ming" <ming4.li@intel.com>
> Signed-off-by: Ira Weiny <ira.weiny@intel.com>

This is very similar to the very the first approach we long time back now.
Ends up queueing multiple users on a mutex.  THat was the bit that people didn't
like about those early proposals.  Ah well, I always thought it was simpler and
cleaner even though there are potentially fairness problems if there are enough
users.

So on that note,

Reviewed-by: Jonathan Cameron <Jonathan.Cameron@huawei.com>

@Lukas, I assume we don't care about the async support for SPDM going forwards?

Thanks,

Jonathan


> 
> ---
> Thanks to Dan for the bulk of the patch.
> Thanks to Ming for pointing out the need for a lock to prevent more
> than 1 task from being processed at a time.
> ---
>  drivers/cxl/core/pci.c  | 16 ++------
>  drivers/pci/doe.c       | 83 ++++++++++++++---------------------------
>  include/linux/pci-doe.h | 10 +----
>  3 files changed, 32 insertions(+), 77 deletions(-)
> 
> diff --git a/drivers/cxl/core/pci.c b/drivers/cxl/core/pci.c
> index 9240df53ed87..58977e0712b6 100644
> --- a/drivers/cxl/core/pci.c
> +++ b/drivers/cxl/core/pci.c
> @@ -490,21 +490,14 @@ static struct pci_doe_mb *find_cdat_doe(struct device *uport)
>  		    CXL_DOE_TABLE_ACCESS_TABLE_TYPE_CDATA) |		\
>  	 FIELD_PREP(CXL_DOE_TABLE_ACCESS_ENTRY_HANDLE, (entry_handle)))
>  
> -static void cxl_doe_task_complete(struct pci_doe_task *task)
> -{
> -	complete(task->private);
> -}
> -
>  struct cdat_doe_task {
>  	u32 request_pl;
>  	u32 response_pl[32];
> -	struct completion c;
>  	struct pci_doe_task task;
>  };
>  
>  #define DECLARE_CDAT_DOE_TASK(req, cdt)                       \
>  struct cdat_doe_task cdt = {                                  \
> -	.c = COMPLETION_INITIALIZER_ONSTACK(cdt.c),           \
>  	.request_pl = req,				      \
>  	.task = {                                             \
>  		.prot.vid = PCI_DVSEC_VENDOR_ID_CXL,        \
> @@ -513,8 +506,6 @@ struct cdat_doe_task cdt = {                                  \
>  		.request_pl_sz = sizeof(cdt.request_pl),      \
>  		.response_pl = cdt.response_pl,               \
>  		.response_pl_sz = sizeof(cdt.response_pl),    \
> -		.complete = cxl_doe_task_complete,            \
> -		.private = &cdt.c,                            \
>  	}                                                     \
>  }
>  
> @@ -525,12 +516,12 @@ static int cxl_cdat_get_length(struct device *dev,
>  	DECLARE_CDAT_DOE_TASK(CDAT_DOE_REQ(0), t);
>  	int rc;
>  
> -	rc = pci_doe_submit_task(cdat_doe, &t.task);
> +	rc = pci_doe_submit_task_wait(cdat_doe, &t.task);
>  	if (rc < 0) {
>  		dev_err(dev, "DOE submit failed: %d", rc);
>  		return rc;
>  	}
> -	wait_for_completion(&t.c);
> +
>  	if (t.task.rv < sizeof(u32))
>  		return -EIO;
>  
> @@ -554,12 +545,11 @@ static int cxl_cdat_read_table(struct device *dev,
>  		u32 *entry;
>  		int rc;
>  
> -		rc = pci_doe_submit_task(cdat_doe, &t.task);
> +		rc = pci_doe_submit_task_wait(cdat_doe, &t.task);
>  		if (rc < 0) {
>  			dev_err(dev, "DOE submit failed: %d", rc);
>  			return rc;
>  		}
> -		wait_for_completion(&t.c);
>  		/* 1 DW header + 1 DW data min */
>  		if (t.task.rv < (2 * sizeof(u32)))
>  			return -EIO;
> diff --git a/drivers/pci/doe.c b/drivers/pci/doe.c
> index e402f05068a5..41a75112b39b 100644
> --- a/drivers/pci/doe.c
> +++ b/drivers/pci/doe.c
> @@ -18,7 +18,6 @@
>  #include <linux/mutex.h>
>  #include <linux/pci.h>
>  #include <linux/pci-doe.h>
> -#include <linux/workqueue.h>
>  
>  #define PCI_DOE_PROTOCOL_DISCOVERY 0
>  
> @@ -40,7 +39,7 @@
>   * @cap_offset: Capability offset
>   * @prots: Array of protocols supported (encoded as long values)
>   * @wq: Wait queue for work item
> - * @work_queue: Queue of pci_doe_work items
> + * @exec_lock: Lock to ensure 1 task is processed at a time
>   * @flags: Bit array of PCI_DOE_FLAG_* flags
>   */
>  struct pci_doe_mb {
> @@ -49,7 +48,7 @@ struct pci_doe_mb {
>  	struct xarray prots;
>  
>  	wait_queue_head_t wq;
> -	struct workqueue_struct *work_queue;
> +	struct mutex exec_lock;
>  	unsigned long flags;
>  };
>  
> @@ -211,7 +210,6 @@ static int pci_doe_recv_resp(struct pci_doe_mb *doe_mb, struct pci_doe_task *tas
>  static void signal_task_complete(struct pci_doe_task *task, int rv)
>  {
>  	task->rv = rv;
> -	task->complete(task);
>  }
>  
>  static void signal_task_abort(struct pci_doe_task *task, int rv)
> @@ -231,10 +229,8 @@ static void signal_task_abort(struct pci_doe_task *task, int rv)
>  	signal_task_complete(task, rv);
>  }
>  
> -static void doe_statemachine_work(struct work_struct *work)
> +static void exec_task(struct pci_doe_task *task)
>  {
> -	struct pci_doe_task *task = container_of(work, struct pci_doe_task,
> -						 work);
>  	struct pci_doe_mb *doe_mb = task->doe_mb;
>  	struct pci_dev *pdev = doe_mb->pdev;
>  	int offset = doe_mb->cap_offset;
> @@ -295,18 +291,12 @@ static void doe_statemachine_work(struct work_struct *work)
>  	signal_task_complete(task, rc);
>  }
>  
> -static void pci_doe_task_complete(struct pci_doe_task *task)
> -{
> -	complete(task->private);
> -}
> -
>  static int pci_doe_discovery(struct pci_doe_mb *doe_mb, u8 *index, u16 *vid,
>  			     u8 *protocol)
>  {
>  	u32 request_pl = FIELD_PREP(PCI_DOE_DATA_OBJECT_DISC_REQ_3_INDEX,
>  				    *index);
>  	u32 response_pl;
> -	DECLARE_COMPLETION_ONSTACK(c);
>  	struct pci_doe_task task = {
>  		.prot.vid = PCI_VENDOR_ID_PCI_SIG,
>  		.prot.type = PCI_DOE_PROTOCOL_DISCOVERY,
> @@ -314,17 +304,13 @@ static int pci_doe_discovery(struct pci_doe_mb *doe_mb, u8 *index, u16 *vid,
>  		.request_pl_sz = sizeof(request_pl),
>  		.response_pl = &response_pl,
>  		.response_pl_sz = sizeof(response_pl),
> -		.complete = pci_doe_task_complete,
> -		.private = &c,
>  	};
>  	int rc;
>  
> -	rc = pci_doe_submit_task(doe_mb, &task);
> +	rc = pci_doe_submit_task_wait(doe_mb, &task);
>  	if (rc < 0)
>  		return rc;
>  
> -	wait_for_completion(&c);
> -
>  	if (task.rv != sizeof(response_pl))
>  		return -EIO;
>  
> @@ -376,13 +362,6 @@ static void pci_doe_xa_destroy(void *mb)
>  	xa_destroy(&doe_mb->prots);
>  }
>  
> -static void pci_doe_destroy_workqueue(void *mb)
> -{
> -	struct pci_doe_mb *doe_mb = mb;
> -
> -	destroy_workqueue(doe_mb->work_queue);
> -}
> -
>  static void pci_doe_flush_mb(void *mb)
>  {
>  	struct pci_doe_mb *doe_mb = mb;
> @@ -390,12 +369,9 @@ static void pci_doe_flush_mb(void *mb)
>  	/* Stop all pending work items from starting */
>  	set_bit(PCI_DOE_FLAG_DEAD, &doe_mb->flags);
>  
> -	/* Cancel an in progress work item, if necessary */
> +	/* Cancel the in progress task and waiting tasks, if necessary */
>  	set_bit(PCI_DOE_FLAG_CANCEL, &doe_mb->flags);
>  	wake_up(&doe_mb->wq);
> -
> -	/* Flush all work items */
> -	flush_workqueue(doe_mb->work_queue);
>  }
>  
>  /**
> @@ -423,25 +399,13 @@ struct pci_doe_mb *pcim_doe_create_mb(struct pci_dev *pdev, u16 cap_offset)
>  	doe_mb->pdev = pdev;
>  	doe_mb->cap_offset = cap_offset;
>  	init_waitqueue_head(&doe_mb->wq);
> +	mutex_init(&doe_mb->exec_lock);
>  
>  	xa_init(&doe_mb->prots);
>  	rc = devm_add_action(dev, pci_doe_xa_destroy, doe_mb);
>  	if (rc)
>  		return ERR_PTR(rc);
>  
> -	doe_mb->work_queue = alloc_ordered_workqueue("%s %s DOE [%x]", 0,
> -						dev_driver_string(&pdev->dev),
> -						pci_name(pdev),
> -						doe_mb->cap_offset);
> -	if (!doe_mb->work_queue) {
> -		pci_err(pdev, "[%x] failed to allocate work queue\n",
> -			doe_mb->cap_offset);
> -		return ERR_PTR(-ENOMEM);
> -	}
> -	rc = devm_add_action_or_reset(dev, pci_doe_destroy_workqueue, doe_mb);
> -	if (rc)
> -		return ERR_PTR(rc);
> -
>  	/* Reset the mailbox by issuing an abort */
>  	rc = pci_doe_abort(doe_mb);
>  	if (rc) {
> @@ -496,23 +460,22 @@ bool pci_doe_supports_prot(struct pci_doe_mb *doe_mb, u16 vid, u8 type)
>  EXPORT_SYMBOL_GPL(pci_doe_supports_prot);
>  
>  /**
> - * pci_doe_submit_task() - Submit a task to be processed by the state machine
> + * pci_doe_submit_task_wait() - Submit and execute a task
>   *
>   * @doe_mb: DOE mailbox capability to submit to
> - * @task: task to be queued
> - *
> - * Submit a DOE task (request/response) to the DOE mailbox to be processed.
> - * Returns upon queueing the task object.  If the queue is full this function
> - * will sleep until there is room in the queue.
> + * @task: task to be run
>   *
> - * task->complete will be called when the state machine is done processing this
> - * task.
> + * Submit and run DOE task (request/response) to the DOE mailbox to be
> + * processed.
>   *
>   * Excess data will be discarded.
>   *
> - * RETURNS: 0 when task has been successfully queued, -ERRNO on error
> + * Context: non-interrupt
> + *
> + * RETURNS: 0 when task was executed, the @task->rv holds the status
> + * result of the executed opertion, -ERRNO on failure to submit.
>   */
> -int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
> +int pci_doe_submit_task_wait(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
>  {
>  	if (!pci_doe_supports_prot(doe_mb, task->prot.vid, task->prot.type))
>  		return -EINVAL;
> @@ -529,8 +492,18 @@ int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
>  		return -EIO;
>  
>  	task->doe_mb = doe_mb;
> -	INIT_WORK(&task->work, doe_statemachine_work);
> -	queue_work(doe_mb->work_queue, &task->work);
> +
> +again:
> +	if (!mutex_trylock(&doe_mb->exec_lock)) {
> +		if (wait_event_timeout(task->doe_mb->wq,
> +				test_bit(PCI_DOE_FLAG_CANCEL, &doe_mb->flags),
> +				PCI_DOE_POLL_INTERVAL))
> +			return -EIO;
> +		goto again;
> +	}
> +	exec_task(task);
> +	mutex_unlock(&doe_mb->exec_lock);
> +
>  	return 0;
>  }
> -EXPORT_SYMBOL_GPL(pci_doe_submit_task);
> +EXPORT_SYMBOL_GPL(pci_doe_submit_task_wait);
Zhuo, Qiuxu Nov. 21, 2022, 2:17 p.m. UTC | #7
> From: Jonathan Cameron <Jonathan.Cameron@Huawei.com>
> ...
> On Mon, 21 Nov 2022 02:01:32 +0000
> "Zhuo, Qiuxu" <qiuxu.zhuo@intel.com> wrote:
> 
> > > From: Ira Weiny <ira.weiny@intel.com>
> > > ...
> > >
> > > @@ -423,25 +399,13 @@ struct pci_doe_mb
> *pcim_doe_create_mb(struct
> > > pci_dev *pdev, u16 cap_offset)
> > >  	doe_mb->pdev = pdev;
> > >  	doe_mb->cap_offset = cap_offset;
> > >  	init_waitqueue_head(&doe_mb->wq);
> > > +	mutex_init(&doe_mb->exec_lock);
> >
> > In real world, not sure whether there is a case that
> > pcim_doe_create_mb() is invoked by multiple drivers to create multiple
> > DOE mailbox instances? If there is such a case, we may need to ensure
> there is only one DOE mailbox instance for a physical DOE of pdev
> @cap_offset.
> 
> I think if that happened we'd have a lot of mess.  The main PCI driver for a
> given EP, switch port etc needs to handle this part.
> 
> Sub drivers can then do similar to
> https://elixir.bootlin.com/linux/latest/source/drivers/cxl/core/pci.c#L465
> to find a DOE instance that supports what they need and use it.
> The DOE code 'should' work fine when doing this - the request/response
> pairs will be serialized.
> 
> We have discussed moving that 'find' logic and the xarray into the PCI core
> and that will need to happen to support CMA etc. For the first submission it
> was easier to just do it in the CXL drivers..

For the 1st submission, yes, it's easier in current way.

> 
> Jonathan

It's good that this potential issue has been noticed. I think moving the 'find' logic and the xarray 
from CXL to the PCI core should save a lot of such duplicated works for other drivers using DOE.

One more though:
For a driver, I think it's only interested in getting a DOE mailbox from a PCI device with specified VID+protocol and using it.
The driver doesn't care how is the DOE mailbox instance created and the driver also doesn't want to maintain it.
After using the DOE mailbox instance then the driver puts it back. 
A pair of get-put APIs implemented in the PCI core like below might make drivers' lives easy 
Dan Williams Nov. 21, 2022, 3:24 p.m. UTC | #8
ira.weiny@ wrote:
> From: Ira Weiny <ira.weiny@intel.com>
> 
> Gregory Price and Jonathan Cameron reported a bug within
> pci_doe_submit_task().[1]  The issue was that work item initialization
> needs to be done with either INIT_WORK_ONSTACK() or INIT_WORK()
> depending on how the work item is allocated.
> 
> Initially, it was anticipated that DOE tasks were going to need to be
> submitted asynchronously and the code was designed thusly.  Many
> alternatives were discussed to fix the work initialization issue.[2]
> 
> However, all current users submit tasks synchronously and this has
> therefore become an unneeded maintenance burden.  Remove the extra
> maintenance burden by replacing asynchronous task submission with
> a synchronous wait function.[3]
> 
> [1] https://lore.kernel.org/linux-cxl/20221014151045.24781-1-Jonathan.Cameron@huawei.com/T/#m88a7f50dcce52f30c8bf5c3dcc06fa9843b54a2d
> [2] https://lore.kernel.org/linux-cxl/Y3kSDQDur+IUDs2O@iweiny-mobl/T/#m0f057773d9c75432fcfcc54a2604483fe82abe92
> [3] https://lore.kernel.org/linux-cxl/Y3kSDQDur+IUDs2O@iweiny-mobl/T/#m32d3f9b208ef7486bc148d94a326b26b2d3e69ff
> 
> Reported-by: Gregory Price <gregory.price@memverge.com>
> Reported-by: Jonathan Cameron <Jonathan.Cameron@huawei.com>
> Suggested-by: Dan Williams <dan.j.williams@intel.com>
> Suggested-by: "Li, Ming" <ming4.li@intel.com>
> Signed-off-by: Ira Weiny <ira.weiny@intel.com>
> 
> ---
> Thanks to Dan for the bulk of the patch.
> Thanks to Ming for pointing out the need for a lock to prevent more
> than 1 task from being processed at a time.
> ---
>  drivers/cxl/core/pci.c  | 16 ++------
>  drivers/pci/doe.c       | 83 ++++++++++++++---------------------------
>  include/linux/pci-doe.h | 10 +----
>  3 files changed, 32 insertions(+), 77 deletions(-)
> 
> diff --git a/drivers/cxl/core/pci.c b/drivers/cxl/core/pci.c
> index 9240df53ed87..58977e0712b6 100644
> --- a/drivers/cxl/core/pci.c
> +++ b/drivers/cxl/core/pci.c
> @@ -490,21 +490,14 @@ static struct pci_doe_mb *find_cdat_doe(struct device *uport)
>  		    CXL_DOE_TABLE_ACCESS_TABLE_TYPE_CDATA) |		\
>  	 FIELD_PREP(CXL_DOE_TABLE_ACCESS_ENTRY_HANDLE, (entry_handle)))
>  
> -static void cxl_doe_task_complete(struct pci_doe_task *task)
> -{
> -	complete(task->private);
> -}
> -
>  struct cdat_doe_task {
>  	u32 request_pl;
>  	u32 response_pl[32];
> -	struct completion c;
>  	struct pci_doe_task task;
>  };
>  
>  #define DECLARE_CDAT_DOE_TASK(req, cdt)                       \
>  struct cdat_doe_task cdt = {                                  \
> -	.c = COMPLETION_INITIALIZER_ONSTACK(cdt.c),           \
>  	.request_pl = req,				      \
>  	.task = {                                             \
>  		.prot.vid = PCI_DVSEC_VENDOR_ID_CXL,        \
> @@ -513,8 +506,6 @@ struct cdat_doe_task cdt = {                                  \
>  		.request_pl_sz = sizeof(cdt.request_pl),      \
>  		.response_pl = cdt.response_pl,               \
>  		.response_pl_sz = sizeof(cdt.response_pl),    \
> -		.complete = cxl_doe_task_complete,            \
> -		.private = &cdt.c,                            \
>  	}                                                     \
>  }
>  
> @@ -525,12 +516,12 @@ static int cxl_cdat_get_length(struct device *dev,
>  	DECLARE_CDAT_DOE_TASK(CDAT_DOE_REQ(0), t);
>  	int rc;
>  
> -	rc = pci_doe_submit_task(cdat_doe, &t.task);
> +	rc = pci_doe_submit_task_wait(cdat_doe, &t.task);
>  	if (rc < 0) {
>  		dev_err(dev, "DOE submit failed: %d", rc);
>  		return rc;
>  	}
> -	wait_for_completion(&t.c);
> +
>  	if (t.task.rv < sizeof(u32))
>  		return -EIO;
>  
> @@ -554,12 +545,11 @@ static int cxl_cdat_read_table(struct device *dev,
>  		u32 *entry;
>  		int rc;
>  
> -		rc = pci_doe_submit_task(cdat_doe, &t.task);
> +		rc = pci_doe_submit_task_wait(cdat_doe, &t.task);
>  		if (rc < 0) {
>  			dev_err(dev, "DOE submit failed: %d", rc);
>  			return rc;
>  		}
> -		wait_for_completion(&t.c);
>  		/* 1 DW header + 1 DW data min */
>  		if (t.task.rv < (2 * sizeof(u32)))
>  			return -EIO;
> diff --git a/drivers/pci/doe.c b/drivers/pci/doe.c
> index e402f05068a5..41a75112b39b 100644
> --- a/drivers/pci/doe.c
> +++ b/drivers/pci/doe.c
> @@ -18,7 +18,6 @@
>  #include <linux/mutex.h>
>  #include <linux/pci.h>
>  #include <linux/pci-doe.h>
> -#include <linux/workqueue.h>
>  
>  #define PCI_DOE_PROTOCOL_DISCOVERY 0
>  
> @@ -40,7 +39,7 @@
>   * @cap_offset: Capability offset
>   * @prots: Array of protocols supported (encoded as long values)
>   * @wq: Wait queue for work item
> - * @work_queue: Queue of pci_doe_work items
> + * @exec_lock: Lock to ensure 1 task is processed at a time
>   * @flags: Bit array of PCI_DOE_FLAG_* flags
>   */
>  struct pci_doe_mb {
> @@ -49,7 +48,7 @@ struct pci_doe_mb {
>  	struct xarray prots;
>  
>  	wait_queue_head_t wq;
> -	struct workqueue_struct *work_queue;
> +	struct mutex exec_lock;
>  	unsigned long flags;
>  };
>  
> @@ -211,7 +210,6 @@ static int pci_doe_recv_resp(struct pci_doe_mb *doe_mb, struct pci_doe_task *tas
>  static void signal_task_complete(struct pci_doe_task *task, int rv)
>  {
>  	task->rv = rv;
> -	task->complete(task);
>  }
>  
>  static void signal_task_abort(struct pci_doe_task *task, int rv)
> @@ -231,10 +229,8 @@ static void signal_task_abort(struct pci_doe_task *task, int rv)
>  	signal_task_complete(task, rv);
>  }
>  
> -static void doe_statemachine_work(struct work_struct *work)
> +static void exec_task(struct pci_doe_task *task)
>  {
> -	struct pci_doe_task *task = container_of(work, struct pci_doe_task,
> -						 work);
>  	struct pci_doe_mb *doe_mb = task->doe_mb;
>  	struct pci_dev *pdev = doe_mb->pdev;
>  	int offset = doe_mb->cap_offset;
> @@ -295,18 +291,12 @@ static void doe_statemachine_work(struct work_struct *work)
>  	signal_task_complete(task, rc);
>  }
>  
> -static void pci_doe_task_complete(struct pci_doe_task *task)
> -{
> -	complete(task->private);
> -}
> -
>  static int pci_doe_discovery(struct pci_doe_mb *doe_mb, u8 *index, u16 *vid,
>  			     u8 *protocol)
>  {
>  	u32 request_pl = FIELD_PREP(PCI_DOE_DATA_OBJECT_DISC_REQ_3_INDEX,
>  				    *index);
>  	u32 response_pl;
> -	DECLARE_COMPLETION_ONSTACK(c);
>  	struct pci_doe_task task = {
>  		.prot.vid = PCI_VENDOR_ID_PCI_SIG,
>  		.prot.type = PCI_DOE_PROTOCOL_DISCOVERY,
> @@ -314,17 +304,13 @@ static int pci_doe_discovery(struct pci_doe_mb *doe_mb, u8 *index, u16 *vid,
>  		.request_pl_sz = sizeof(request_pl),
>  		.response_pl = &response_pl,
>  		.response_pl_sz = sizeof(response_pl),
> -		.complete = pci_doe_task_complete,
> -		.private = &c,
>  	};
>  	int rc;
>  
> -	rc = pci_doe_submit_task(doe_mb, &task);
> +	rc = pci_doe_submit_task_wait(doe_mb, &task);
>  	if (rc < 0)
>  		return rc;
>  
> -	wait_for_completion(&c);
> -
>  	if (task.rv != sizeof(response_pl))
>  		return -EIO;
>  
> @@ -376,13 +362,6 @@ static void pci_doe_xa_destroy(void *mb)
>  	xa_destroy(&doe_mb->prots);
>  }
>  
> -static void pci_doe_destroy_workqueue(void *mb)
> -{
> -	struct pci_doe_mb *doe_mb = mb;
> -
> -	destroy_workqueue(doe_mb->work_queue);
> -}
> -
>  static void pci_doe_flush_mb(void *mb)
>  {
>  	struct pci_doe_mb *doe_mb = mb;
> @@ -390,12 +369,9 @@ static void pci_doe_flush_mb(void *mb)
>  	/* Stop all pending work items from starting */
>  	set_bit(PCI_DOE_FLAG_DEAD, &doe_mb->flags);
>  
> -	/* Cancel an in progress work item, if necessary */
> +	/* Cancel the in progress task and waiting tasks, if necessary */
>  	set_bit(PCI_DOE_FLAG_CANCEL, &doe_mb->flags);
>  	wake_up(&doe_mb->wq);
> -
> -	/* Flush all work items */
> -	flush_workqueue(doe_mb->work_queue);
>  }
>  
>  /**
> @@ -423,25 +399,13 @@ struct pci_doe_mb *pcim_doe_create_mb(struct pci_dev *pdev, u16 cap_offset)
>  	doe_mb->pdev = pdev;
>  	doe_mb->cap_offset = cap_offset;
>  	init_waitqueue_head(&doe_mb->wq);
> +	mutex_init(&doe_mb->exec_lock);
>  
>  	xa_init(&doe_mb->prots);
>  	rc = devm_add_action(dev, pci_doe_xa_destroy, doe_mb);
>  	if (rc)
>  		return ERR_PTR(rc);
>  
> -	doe_mb->work_queue = alloc_ordered_workqueue("%s %s DOE [%x]", 0,
> -						dev_driver_string(&pdev->dev),
> -						pci_name(pdev),
> -						doe_mb->cap_offset);
> -	if (!doe_mb->work_queue) {
> -		pci_err(pdev, "[%x] failed to allocate work queue\n",
> -			doe_mb->cap_offset);
> -		return ERR_PTR(-ENOMEM);
> -	}
> -	rc = devm_add_action_or_reset(dev, pci_doe_destroy_workqueue, doe_mb);
> -	if (rc)
> -		return ERR_PTR(rc);
> -
>  	/* Reset the mailbox by issuing an abort */
>  	rc = pci_doe_abort(doe_mb);
>  	if (rc) {
> @@ -496,23 +460,22 @@ bool pci_doe_supports_prot(struct pci_doe_mb *doe_mb, u16 vid, u8 type)
>  EXPORT_SYMBOL_GPL(pci_doe_supports_prot);
>  
>  /**
> - * pci_doe_submit_task() - Submit a task to be processed by the state machine
> + * pci_doe_submit_task_wait() - Submit and execute a task
>   *
>   * @doe_mb: DOE mailbox capability to submit to
> - * @task: task to be queued
> - *
> - * Submit a DOE task (request/response) to the DOE mailbox to be processed.
> - * Returns upon queueing the task object.  If the queue is full this function
> - * will sleep until there is room in the queue.
> + * @task: task to be run
>   *
> - * task->complete will be called when the state machine is done processing this
> - * task.
> + * Submit and run DOE task (request/response) to the DOE mailbox to be
> + * processed.
>   *
>   * Excess data will be discarded.
>   *
> - * RETURNS: 0 when task has been successfully queued, -ERRNO on error
> + * Context: non-interrupt
> + *
> + * RETURNS: 0 when task was executed, the @task->rv holds the status
> + * result of the executed opertion, -ERRNO on failure to submit.
>   */
> -int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
> +int pci_doe_submit_task_wait(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
>  {
>  	if (!pci_doe_supports_prot(doe_mb, task->prot.vid, task->prot.type))
>  		return -EINVAL;
> @@ -529,8 +492,18 @@ int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
>  		return -EIO;
>  
>  	task->doe_mb = doe_mb;
> -	INIT_WORK(&task->work, doe_statemachine_work);
> -	queue_work(doe_mb->work_queue, &task->work);
> +
> +again:
> +	if (!mutex_trylock(&doe_mb->exec_lock)) {

Nit, lock data, not code. This is not a lock of exec_task() it is a lock of
the doe register state. So I would just call it doe_mb->lock.

> +		if (wait_event_timeout(task->doe_mb->wq,

At the risk of going all the way back to the original proposal, again
apologies for my premature async advocacy, I do not understand why this
chooses to have a trylock+wait_event instead of just queuing in
mutex_lock_interruptible()?

The mutex will attempt to maintain fairness in its own waitqueue. If the
current task in exec_task() sees PCI_DOE_FLAG_CANCEL, it will drop out
and release the lock and then all waiters can check PCI_DOE_FLAG_CANCEL
before exec_task().

At a minimum this needs a comment about why the built-in mutex waitqueue
is not sufficient for this case. Otherwise, this looks like open-coded
locking to me.
Davidlohr Bueso Nov. 21, 2022, 5:19 p.m. UTC | #9
On Mon, 21 Nov 2022, Dan Williams wrote:

>The mutex will attempt to maintain fairness in its own waitqueue. If the
>current task in exec_task() sees PCI_DOE_FLAG_CANCEL, it will drop out
>and release the lock and then all waiters can check PCI_DOE_FLAG_CANCEL
>before exec_task().

Yes, and try-locking is hacky by nature. In addition, relying on the mutex
queuing will often be more optimal as it tries to avoid blocking altogether
via mcs (which is also cacheline friendly).

Thanks,
Davidlohr
Jonathan Cameron Nov. 21, 2022, 5:41 p.m. UTC | #10
On Mon, 21 Nov 2022 14:17:37 +0000
"Zhuo, Qiuxu" <qiuxu.zhuo@intel.com> wrote:

> > From: Jonathan Cameron <Jonathan.Cameron@Huawei.com>
> > ...
> > On Mon, 21 Nov 2022 02:01:32 +0000
> > "Zhuo, Qiuxu" <qiuxu.zhuo@intel.com> wrote:
> >   
> > > > From: Ira Weiny <ira.weiny@intel.com>  
> > > > ...  
> > > >
> > > > @@ -423,25 +399,13 @@ struct pci_doe_mb  
> > *pcim_doe_create_mb(struct  
> > > > pci_dev *pdev, u16 cap_offset)
> > > >  	doe_mb->pdev = pdev;
> > > >  	doe_mb->cap_offset = cap_offset;
> > > >  	init_waitqueue_head(&doe_mb->wq);
> > > > +	mutex_init(&doe_mb->exec_lock);  
> > >
> > > In real world, not sure whether there is a case that
> > > pcim_doe_create_mb() is invoked by multiple drivers to create multiple
> > > DOE mailbox instances? If there is such a case, we may need to ensure  
> > there is only one DOE mailbox instance for a physical DOE of pdev
> > @cap_offset.
> > 
> > I think if that happened we'd have a lot of mess.  The main PCI driver for a
> > given EP, switch port etc needs to handle this part.
> > 
> > Sub drivers can then do similar to
> > https://elixir.bootlin.com/linux/latest/source/drivers/cxl/core/pci.c#L465
> > to find a DOE instance that supports what they need and use it.
> > The DOE code 'should' work fine when doing this - the request/response
> > pairs will be serialized.
> > 
> > We have discussed moving that 'find' logic and the xarray into the PCI core
> > and that will need to happen to support CMA etc. For the first submission it
> > was easier to just do it in the CXL drivers..  
> 
> For the 1st submission, yes, it's easier in current way.
> 
> > 
> > Jonathan  
> 
> It's good that this potential issue has been noticed. I think moving the 'find' logic and the xarray 
> from CXL to the PCI core should save a lot of such duplicated works for other drivers using DOE.
> 
> One more though:
> For a driver, I think it's only interested in getting a DOE mailbox from a PCI device with specified VID+protocol and using it.
> The driver doesn't care how is the DOE mailbox instance created and the driver also doesn't want to maintain it.
> After using the DOE mailbox instance then the driver puts it back. 
> A pair of get-put APIs implemented in the PCI core like below might make drivers' lives easy 
Jonathan Cameron Nov. 21, 2022, 5:52 p.m. UTC | #11
On Sun, 20 Nov 2022 05:57:22 -0800
Ira Weiny <ira.weiny@intel.com> wrote:

> On Sun, Nov 20, 2022 at 10:27:35AM +0800, Hillf Danton wrote:
> > On Sat, 19 Nov 2022 14:25:27 -0800 Ira Weiny <ira.weiny@intel.com>  
> > > @@ -529,8 +492,18 @@ int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
> > >  		return -EIO;
> > >  
> > >  	task->doe_mb = doe_mb;
> > > -	INIT_WORK(&task->work, doe_statemachine_work);
> > > -	queue_work(doe_mb->work_queue, &task->work);
> > > +
> > > +again:
> > > +	if (!mutex_trylock(&doe_mb->exec_lock)) {
> > > +		if (wait_event_timeout(task->doe_mb->wq,
> > > +				test_bit(PCI_DOE_FLAG_CANCEL, &doe_mb->flags),
> > > +				PCI_DOE_POLL_INTERVAL))
> > > +			return -EIO;  
> > 
> > Is EIO worth a line of pr_warn()?  
> 
> Maybe but I'm not sure it is worth it.  This was paralleling the original code
> which called pci_doe_flush_mb() to shut down the mailbox.  So this is likely to
> never happen.  The callers could print something if needed.
> 
> >   
> > > +		goto again;
> > > +	}
> > > +	exec_task(task);
> > > +	mutex_unlock(&doe_mb->exec_lock);
> > > +  
> > 
> > If it is likely to take two minutes to acquire the exec_lock after
> > rounds of trying again, trylock + wait timeout barely make sense given EIO.  
> 
> I'm not sure where 2 minutes come from?
> 
> #define PCI_DOE_TIMEOUT HZ
> #define PCI_DOE_POLL_INTERVAL   (PCI_DOE_TIMEOUT / 128)
> 
> It is also not anticipated that more than 1 task is being given to the mailbox
> but the protection needs to be there because exec_task() will get confused if
> more than 1 thread submits at the same time.

Given multiple protocols can be on the same DOE and they may be handled by
either subdrivers or indeed driven by userspace interface, there is a high
chance that more than one task will be queued up (once we have a few more
supported protocols).

> 
> All this said I've now convinced myself that there is a race in the use of
> PCI_DOE_FLAG_CANCEL even with the existing code.
> 
> I believe that if the pci device goes away the doe_mb structure may get free'ed
> prior to other threads having a chance to check doe_mb->flags.  Worse yet the
> work queue itself (doe_mb->wq) may become invalid...
> 
> I don't believe this can currently happen because anyone using the doe_mb
> structure has a reference to the pci device.
> 
> With this patch I think all the doe_mb->flags and the wait queue can go away.
> pci_doe_wait() can be replaced with a simple msleep_interruptible().
> 
> Let me work through that a bit.
> 
> Ira
> 
> > 
> > Hillf
> > 
> > /**
> >  * wait_event_timeout - sleep until a condition gets true or a timeout elapses
> >  * @wq_head: the waitqueue to wait on
> >  * @condition: a C expression for the event to wait for
> >  * @timeout: timeout, in jiffies
> >  *
> >  * The process is put to sleep (TASK_UNINTERRUPTIBLE) until the
> >  * @condition evaluates to true. The @condition is checked each time
> >  * the waitqueue @wq_head is woken up.
> >  *
> >  * wake_up() has to be called after changing any variable that could
> >  * change the result of the wait condition.
> >  *
> >  * Returns:
> >  * 0 if the @condition evaluated to %false after the @timeout elapsed,
> >  * 1 if the @condition evaluated to %true after the @timeout elapsed,
> >  * or the remaining jiffies (at least 1) if the @condition evaluated
> >  * to %true before the @timeout elapsed.
> >  */
Ira Weiny Nov. 21, 2022, 10:59 p.m. UTC | #12
On Mon, Nov 21, 2022 at 10:07:56AM +0800, Li, Ming wrote:
> On 11/21/2022 9:39 AM, Li, Ming wrote:

[snip]

> >> @@ -529,8 +492,18 @@ int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
> >>  		return -EIO;
> >>  
> >>  	task->doe_mb = doe_mb;
> >> -	INIT_WORK(&task->work, doe_statemachine_work);
> >> -	queue_work(doe_mb->work_queue, &task->work);
> >> +
> >> +again:
> >> +	if (!mutex_trylock(&doe_mb->exec_lock)) {
> >> +		if (wait_event_timeout(task->doe_mb->wq,
> >> +				test_bit(PCI_DOE_FLAG_CANCEL, &doe_mb->flags),
> >> +				PCI_DOE_POLL_INTERVAL))
> >> +			return -EIO;
> > 
> > We already implemented a pci_doe_wait(), I think we can use it to instead of this wait_event_timeout.
> > 
> > Thanks
> > Ming
> > 
> 
> This wait_event_timeout() only check PCI_DOE_FLAG_CANCEL, that means it only detects the signal which the doe_mb has being destroyed.
> If current doe task is done correctly, I think we should wake up next task. Current implementation just waits utill timeout happens and try it again.
> Besides, If two threads are waiting a same doe_mb, thread #1 waited firstly, thread #2 waited secondly, there is a chance that thread #2 is processed before thread #1.
> 

Agreed.

However, the real problem is that the doe_mb is probably free'ed at this point
and all this is going to crash and burn anyway.  The implementation of
PCI_DOE_FLAG_CANCEL was fundamentally flawed even for the current work queue
implementation.

This patch incorrectly tried to use that mechanism but upon looking closer I
see it does not work.

I saw in another thread Jonathan discussing some sort of get/put on the doe_mb.
That is not currently necessary as the creators of doe_mb objects currently
hold references to the PCI device any time they call submit.

:-(

For now all PCI_DOE_FLAG_CANCEL stuff needs to go away,
Ira

> Thanks
> Ming
> 
> >> +		goto again;
> >> +	}
> >> +	exec_task(task);
> >> +	mutex_unlock(&doe_mb->exec_lock);
> >> +
> >>  	return 0;
> >>  }
> >> -EXPORT_SYMBOL_GPL(pci_doe_submit_task);
> >> +EXPORT_SYMBOL_GPL(pci_doe_submit_task_wait);
> >> diff --git a/include/linux/pci-doe.h b/include/linux/pci-doe.h
> >> index ed9b4df792b8..c94122a66221 100644
> >> --- a/include/linux/pci-doe.h
> >> +++ b/include/linux/pci-doe.h
> >> @@ -30,8 +30,6 @@ struct pci_doe_mb;
> >>   * @response_pl_sz: Size of the response payload (bytes)
> >>   * @rv: Return value.  Length of received response or error (bytes)
> >>   * @complete: Called when task is complete
> >> - * @private: Private data for the consumer
> >> - * @work: Used internally by the mailbox
> >>   * @doe_mb: Used internally by the mailbox
> >>   *
> >>   * The payload sizes and rv are specified in bytes with the following
> >> @@ -50,11 +48,6 @@ struct pci_doe_task {
> >>  	u32 *response_pl;
> >>  	size_t response_pl_sz;
> >>  	int rv;
> >> -	void (*complete)(struct pci_doe_task *task);
> >> -	void *private;
> >> -
> >> -	/* No need for the user to initialize these fields */
> >> -	struct work_struct work;
> >>  	struct pci_doe_mb *doe_mb;
> >>  };
> >>  
> >> @@ -72,6 +65,5 @@ struct pci_doe_task {
> >>  
> >>  struct pci_doe_mb *pcim_doe_create_mb(struct pci_dev *pdev, u16 cap_offset);
> >>  bool pci_doe_supports_prot(struct pci_doe_mb *doe_mb, u16 vid, u8 type);
> >> -int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task);
> >> -
> >> +int pci_doe_submit_task_wait(struct pci_doe_mb *doe_mb, struct pci_doe_task *task);
> >>  #endif
> >>
> >> base-commit: b6e7fdfd6f6a8bf88fcdb4a45da52c42ba238c25
Jonathan Cameron Nov. 22, 2022, 9:46 a.m. UTC | #13
On Mon, 21 Nov 2022 14:59:46 -0800
Ira Weiny <ira.weiny@intel.com> wrote:

> On Mon, Nov 21, 2022 at 10:07:56AM +0800, Li, Ming wrote:
> > On 11/21/2022 9:39 AM, Li, Ming wrote:  
> 
> [snip]
> 
> > >> @@ -529,8 +492,18 @@ int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
> > >>  		return -EIO;
> > >>  
> > >>  	task->doe_mb = doe_mb;
> > >> -	INIT_WORK(&task->work, doe_statemachine_work);
> > >> -	queue_work(doe_mb->work_queue, &task->work);
> > >> +
> > >> +again:
> > >> +	if (!mutex_trylock(&doe_mb->exec_lock)) {
> > >> +		if (wait_event_timeout(task->doe_mb->wq,
> > >> +				test_bit(PCI_DOE_FLAG_CANCEL, &doe_mb->flags),
> > >> +				PCI_DOE_POLL_INTERVAL))
> > >> +			return -EIO;  
> > > 
> > > We already implemented a pci_doe_wait(), I think we can use it to instead of this wait_event_timeout.
> > > 
> > > Thanks
> > > Ming
> > >   
> > 
> > This wait_event_timeout() only check PCI_DOE_FLAG_CANCEL, that means it only detects the signal which the doe_mb has being destroyed.
> > If current doe task is done correctly, I think we should wake up next task. Current implementation just waits utill timeout happens and try it again.
> > Besides, If two threads are waiting a same doe_mb, thread #1 waited firstly, thread #2 waited secondly, there is a chance that thread #2 is processed before thread #1.
> >   
> 
> Agreed.
> 
> However, the real problem is that the doe_mb is probably free'ed at this point
> and all this is going to crash and burn anyway.  The implementation of
> PCI_DOE_FLAG_CANCEL was fundamentally flawed even for the current work queue
> implementation.
> 
> This patch incorrectly tried to use that mechanism but upon looking closer I
> see it does not work.
> 
> I saw in another thread Jonathan discussing some sort of get/put on the doe_mb.
> That is not currently necessary as the creators of doe_mb objects currently
> hold references to the PCI device any time they call submit.

The get / put would only matter if we wanted to manage the DOE resources separately
from those of the PCI device.  It may well never make sense to do so as they
aren't substantial anyway.
> 
> :-(
> 
> For now all PCI_DOE_FLAG_CANCEL stuff needs to go away,
> Ira
> 
> > Thanks
> > Ming
> >   
> > >> +		goto again;
> > >> +	}
> > >> +	exec_task(task);
> > >> +	mutex_unlock(&doe_mb->exec_lock);
> > >> +
> > >>  	return 0;
> > >>  }
> > >> -EXPORT_SYMBOL_GPL(pci_doe_submit_task);
> > >> +EXPORT_SYMBOL_GPL(pci_doe_submit_task_wait);
> > >> diff --git a/include/linux/pci-doe.h b/include/linux/pci-doe.h
> > >> index ed9b4df792b8..c94122a66221 100644
> > >> --- a/include/linux/pci-doe.h
> > >> +++ b/include/linux/pci-doe.h
> > >> @@ -30,8 +30,6 @@ struct pci_doe_mb;
> > >>   * @response_pl_sz: Size of the response payload (bytes)
> > >>   * @rv: Return value.  Length of received response or error (bytes)
> > >>   * @complete: Called when task is complete
> > >> - * @private: Private data for the consumer
> > >> - * @work: Used internally by the mailbox
> > >>   * @doe_mb: Used internally by the mailbox
> > >>   *
> > >>   * The payload sizes and rv are specified in bytes with the following
> > >> @@ -50,11 +48,6 @@ struct pci_doe_task {
> > >>  	u32 *response_pl;
> > >>  	size_t response_pl_sz;
> > >>  	int rv;
> > >> -	void (*complete)(struct pci_doe_task *task);
> > >> -	void *private;
> > >> -
> > >> -	/* No need for the user to initialize these fields */
> > >> -	struct work_struct work;
> > >>  	struct pci_doe_mb *doe_mb;
> > >>  };
> > >>  
> > >> @@ -72,6 +65,5 @@ struct pci_doe_task {
> > >>  
> > >>  struct pci_doe_mb *pcim_doe_create_mb(struct pci_dev *pdev, u16 cap_offset);
> > >>  bool pci_doe_supports_prot(struct pci_doe_mb *doe_mb, u16 vid, u8 type);
> > >> -int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task);
> > >> -
> > >> +int pci_doe_submit_task_wait(struct pci_doe_mb *doe_mb, struct pci_doe_task *task);
> > >>  #endif
> > >>
> > >> base-commit: b6e7fdfd6f6a8bf88fcdb4a45da52c42ba238c25
Ira Weiny Nov. 22, 2022, 3:55 p.m. UTC | #14
On Tue, Nov 22, 2022 at 09:46:27AM +0000, Jonathan Cameron wrote:
> On Mon, 21 Nov 2022 14:59:46 -0800
> Ira Weiny <ira.weiny@intel.com> wrote:
> 
> > On Mon, Nov 21, 2022 at 10:07:56AM +0800, Li, Ming wrote:
> > > On 11/21/2022 9:39 AM, Li, Ming wrote:  
> > 
> > [snip]
> > 
> > > >> @@ -529,8 +492,18 @@ int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
> > > >>  		return -EIO;
> > > >>  
> > > >>  	task->doe_mb = doe_mb;
> > > >> -	INIT_WORK(&task->work, doe_statemachine_work);
> > > >> -	queue_work(doe_mb->work_queue, &task->work);
> > > >> +
> > > >> +again:
> > > >> +	if (!mutex_trylock(&doe_mb->exec_lock)) {
> > > >> +		if (wait_event_timeout(task->doe_mb->wq,
> > > >> +				test_bit(PCI_DOE_FLAG_CANCEL, &doe_mb->flags),
> > > >> +				PCI_DOE_POLL_INTERVAL))
> > > >> +			return -EIO;  
> > > > 
> > > > We already implemented a pci_doe_wait(), I think we can use it to instead of this wait_event_timeout.
> > > > 
> > > > Thanks
> > > > Ming
> > > >   
> > > 
> > > This wait_event_timeout() only check PCI_DOE_FLAG_CANCEL, that means it only detects the signal which the doe_mb has being destroyed.
> > > If current doe task is done correctly, I think we should wake up next task. Current implementation just waits utill timeout happens and try it again.
> > > Besides, If two threads are waiting a same doe_mb, thread #1 waited firstly, thread #2 waited secondly, there is a chance that thread #2 is processed before thread #1.
> > >   
> > 
> > Agreed.
> > 
> > However, the real problem is that the doe_mb is probably free'ed at this point
> > and all this is going to crash and burn anyway.  The implementation of
> > PCI_DOE_FLAG_CANCEL was fundamentally flawed even for the current work queue
> > implementation.
> > 
> > This patch incorrectly tried to use that mechanism but upon looking closer I
> > see it does not work.
> > 
> > I saw in another thread Jonathan discussing some sort of get/put on the doe_mb.
> > That is not currently necessary as the creators of doe_mb objects currently
> > hold references to the PCI device any time they call submit.
> 
> The get / put would only matter if we wanted to manage the DOE resources separately
> from those of the PCI device.  It may well never make sense to do so as they
> aren't substantial anyway.

Agreed.  See the new series:

https://lore.kernel.org/all/20221122155324.1878416-1-ira.weiny@intel.com/

Thanks,
Ira

> > 
> > :-(
> > 
> > For now all PCI_DOE_FLAG_CANCEL stuff needs to go away,
> > Ira
> > 
> > > Thanks
> > > Ming
> > >   
> > > >> +		goto again;
> > > >> +	}
> > > >> +	exec_task(task);
> > > >> +	mutex_unlock(&doe_mb->exec_lock);
> > > >> +
> > > >>  	return 0;
> > > >>  }
> > > >> -EXPORT_SYMBOL_GPL(pci_doe_submit_task);
> > > >> +EXPORT_SYMBOL_GPL(pci_doe_submit_task_wait);
> > > >> diff --git a/include/linux/pci-doe.h b/include/linux/pci-doe.h
> > > >> index ed9b4df792b8..c94122a66221 100644
> > > >> --- a/include/linux/pci-doe.h
> > > >> +++ b/include/linux/pci-doe.h
> > > >> @@ -30,8 +30,6 @@ struct pci_doe_mb;
> > > >>   * @response_pl_sz: Size of the response payload (bytes)
> > > >>   * @rv: Return value.  Length of received response or error (bytes)
> > > >>   * @complete: Called when task is complete
> > > >> - * @private: Private data for the consumer
> > > >> - * @work: Used internally by the mailbox
> > > >>   * @doe_mb: Used internally by the mailbox
> > > >>   *
> > > >>   * The payload sizes and rv are specified in bytes with the following
> > > >> @@ -50,11 +48,6 @@ struct pci_doe_task {
> > > >>  	u32 *response_pl;
> > > >>  	size_t response_pl_sz;
> > > >>  	int rv;
> > > >> -	void (*complete)(struct pci_doe_task *task);
> > > >> -	void *private;
> > > >> -
> > > >> -	/* No need for the user to initialize these fields */
> > > >> -	struct work_struct work;
> > > >>  	struct pci_doe_mb *doe_mb;
> > > >>  };
> > > >>  
> > > >> @@ -72,6 +65,5 @@ struct pci_doe_task {
> > > >>  
> > > >>  struct pci_doe_mb *pcim_doe_create_mb(struct pci_dev *pdev, u16 cap_offset);
> > > >>  bool pci_doe_supports_prot(struct pci_doe_mb *doe_mb, u16 vid, u8 type);
> > > >> -int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task);
> > > >> -
> > > >> +int pci_doe_submit_task_wait(struct pci_doe_mb *doe_mb, struct pci_doe_task *task);
> > > >>  #endif
> > > >>
> > > >> base-commit: b6e7fdfd6f6a8bf88fcdb4a45da52c42ba238c25  
>
Lukas Wunner Nov. 22, 2022, 7:28 p.m. UTC | #15
On Mon, Nov 21, 2022 at 11:19:25AM +0000, Jonathan Cameron wrote:
> On Sat, 19 Nov 2022 14:25:27 -0800 ira.weiny@intel.com wrote:
> > Initially, it was anticipated that DOE tasks were going to need to be
> > submitted asynchronously and the code was designed thusly.  Many
> > alternatives were discussed to fix the work initialization issue.[2]
> > 
> > However, all current users submit tasks synchronously and this has
> > therefore become an unneeded maintenance burden.  Remove the extra
> > maintenance burden by replacing asynchronous task submission with
> > a synchronous wait function.[3]
[...]
> @Lukas, I assume we don't care about the async support for SPDM going forwards?

We don't.  However:

While I wouldn't have put in the asynchronous support in the first place,
now that it exists, it wouldn't delete it either.

I would just keep it internal to doe.c and only expose a synchronous
API call, which does the pci_doe_task allocation internally on the
stack, uses the appropriate INIT_WORK variant and waits for completion.

Actually I was going to do just that... I'm working on the DOE code
but the ongoing patch submissions make things difficult for me
because I have to shoot at a moving target.

The simplest solution would probably just be the object_is_on_stack()
check and the second simplest would be the synchronous API call outlined
above.

Thanks,

Lukas
Lukas Wunner Nov. 22, 2022, 7:48 p.m. UTC | #16
On Mon, Nov 21, 2022 at 05:41:48PM +0000, Jonathan Cameron wrote:
> On Mon, 21 Nov 2022 14:17:37 +0000 "Zhuo, Qiuxu" <qiuxu.zhuo@intel.com> wrote:
> > It's good that this potential issue has been noticed. I think moving
> > the 'find' logic and the xarray from CXL to the PCI core should save
> > a lot of such duplicated works for other drivers using DOE.
> > 
> > One more though:
> > For a driver, I think it's only interested in getting a DOE mailbox
> > from a PCI device with specified VID+protocol and using it.
> > The driver doesn't care how is the DOE mailbox instance created and
> > the driver also doesn't want to maintain it.

Totally agree on all of your above points Qiuxu.


> > After using the DOE mailbox instance then the driver puts it back.

That won't be necessary I think.  The PCI core allocates all existing
DOE mailboxes and enumerates the supported protocols.  Drivers just
ask the PCI core for a mailbox supporting a specific protocol and
are free to use that as long as the PCI device exists.


> There is also a dance around interrupts (once those are supported
> for DOEs in general).  Until the PCI driver has requested interrupts
> we can't use them for DOE, but we may want to poll it before that
> stage then switch over.

Thomas Gleixner has returned to his patch sets for dynamic MSI-X
allocation.  We'll be able to leverage that to request an interrupt
in the PCI core for DOE before a driver is bound.  And a driver
can then get additional MSI-X vectors if needed.  Will only work
for MSI-X though, not MSI.

Thanks,

Lukas
Dan Williams Nov. 22, 2022, 8:12 p.m. UTC | #17
Lukas Wunner wrote:
> On Mon, Nov 21, 2022 at 11:19:25AM +0000, Jonathan Cameron wrote:
> > On Sat, 19 Nov 2022 14:25:27 -0800 ira.weiny@intel.com wrote:
> > > Initially, it was anticipated that DOE tasks were going to need to be
> > > submitted asynchronously and the code was designed thusly.  Many
> > > alternatives were discussed to fix the work initialization issue.[2]
> > > 
> > > However, all current users submit tasks synchronously and this has
> > > therefore become an unneeded maintenance burden.  Remove the extra
> > > maintenance burden by replacing asynchronous task submission with
> > > a synchronous wait function.[3]
> [...]
> > @Lukas, I assume we don't care about the async support for SPDM going forwards?
> 
> We don't.  However:
> 
> While I wouldn't have put in the asynchronous support in the first place,
> now that it exists, it wouldn't delete it either.
> 
> I would just keep it internal to doe.c and only expose a synchronous
> API call, which does the pci_doe_task allocation internally on the
> stack, uses the appropriate INIT_WORK variant and waits for completion.

This was my first instinct as well, but after typing it up for a bit
came to the conclusion the design would need a bigger rework.

The problem is having the work items in the task vs having a separate
command-queue where tasks are submitted, like block / usb and other
drivers that take command submissions.

> Actually I was going to do just that... I'm working on the DOE code
> but the ongoing patch submissions make things difficult for me
> because I have to shoot at a moving target.
> 
> The simplest solution would probably just be the object_is_on_stack()
> check and the second simplest would be the synchronous API call outlined
> above.

The explicit separation of INIT_WORK() and INIT_WORK_ONSTACK() serves a
purpose. It makes it clear that the work context is scoped to submission
function. By hiding the difference it hides bugs where submitters get
the other async setup details of the submission wrong.
diff mbox series

Patch

diff --git a/drivers/cxl/core/pci.c b/drivers/cxl/core/pci.c
index 9240df53ed87..58977e0712b6 100644
--- a/drivers/cxl/core/pci.c
+++ b/drivers/cxl/core/pci.c
@@ -490,21 +490,14 @@  static struct pci_doe_mb *find_cdat_doe(struct device *uport)
 		    CXL_DOE_TABLE_ACCESS_TABLE_TYPE_CDATA) |		\
 	 FIELD_PREP(CXL_DOE_TABLE_ACCESS_ENTRY_HANDLE, (entry_handle)))
 
-static void cxl_doe_task_complete(struct pci_doe_task *task)
-{
-	complete(task->private);
-}
-
 struct cdat_doe_task {
 	u32 request_pl;
 	u32 response_pl[32];
-	struct completion c;
 	struct pci_doe_task task;
 };
 
 #define DECLARE_CDAT_DOE_TASK(req, cdt)                       \
 struct cdat_doe_task cdt = {                                  \
-	.c = COMPLETION_INITIALIZER_ONSTACK(cdt.c),           \
 	.request_pl = req,				      \
 	.task = {                                             \
 		.prot.vid = PCI_DVSEC_VENDOR_ID_CXL,        \
@@ -513,8 +506,6 @@  struct cdat_doe_task cdt = {                                  \
 		.request_pl_sz = sizeof(cdt.request_pl),      \
 		.response_pl = cdt.response_pl,               \
 		.response_pl_sz = sizeof(cdt.response_pl),    \
-		.complete = cxl_doe_task_complete,            \
-		.private = &cdt.c,                            \
 	}                                                     \
 }
 
@@ -525,12 +516,12 @@  static int cxl_cdat_get_length(struct device *dev,
 	DECLARE_CDAT_DOE_TASK(CDAT_DOE_REQ(0), t);
 	int rc;
 
-	rc = pci_doe_submit_task(cdat_doe, &t.task);
+	rc = pci_doe_submit_task_wait(cdat_doe, &t.task);
 	if (rc < 0) {
 		dev_err(dev, "DOE submit failed: %d", rc);
 		return rc;
 	}
-	wait_for_completion(&t.c);
+
 	if (t.task.rv < sizeof(u32))
 		return -EIO;
 
@@ -554,12 +545,11 @@  static int cxl_cdat_read_table(struct device *dev,
 		u32 *entry;
 		int rc;
 
-		rc = pci_doe_submit_task(cdat_doe, &t.task);
+		rc = pci_doe_submit_task_wait(cdat_doe, &t.task);
 		if (rc < 0) {
 			dev_err(dev, "DOE submit failed: %d", rc);
 			return rc;
 		}
-		wait_for_completion(&t.c);
 		/* 1 DW header + 1 DW data min */
 		if (t.task.rv < (2 * sizeof(u32)))
 			return -EIO;
diff --git a/drivers/pci/doe.c b/drivers/pci/doe.c
index e402f05068a5..41a75112b39b 100644
--- a/drivers/pci/doe.c
+++ b/drivers/pci/doe.c
@@ -18,7 +18,6 @@ 
 #include <linux/mutex.h>
 #include <linux/pci.h>
 #include <linux/pci-doe.h>
-#include <linux/workqueue.h>
 
 #define PCI_DOE_PROTOCOL_DISCOVERY 0
 
@@ -40,7 +39,7 @@ 
  * @cap_offset: Capability offset
  * @prots: Array of protocols supported (encoded as long values)
  * @wq: Wait queue for work item
- * @work_queue: Queue of pci_doe_work items
+ * @exec_lock: Lock to ensure 1 task is processed at a time
  * @flags: Bit array of PCI_DOE_FLAG_* flags
  */
 struct pci_doe_mb {
@@ -49,7 +48,7 @@  struct pci_doe_mb {
 	struct xarray prots;
 
 	wait_queue_head_t wq;
-	struct workqueue_struct *work_queue;
+	struct mutex exec_lock;
 	unsigned long flags;
 };
 
@@ -211,7 +210,6 @@  static int pci_doe_recv_resp(struct pci_doe_mb *doe_mb, struct pci_doe_task *tas
 static void signal_task_complete(struct pci_doe_task *task, int rv)
 {
 	task->rv = rv;
-	task->complete(task);
 }
 
 static void signal_task_abort(struct pci_doe_task *task, int rv)
@@ -231,10 +229,8 @@  static void signal_task_abort(struct pci_doe_task *task, int rv)
 	signal_task_complete(task, rv);
 }
 
-static void doe_statemachine_work(struct work_struct *work)
+static void exec_task(struct pci_doe_task *task)
 {
-	struct pci_doe_task *task = container_of(work, struct pci_doe_task,
-						 work);
 	struct pci_doe_mb *doe_mb = task->doe_mb;
 	struct pci_dev *pdev = doe_mb->pdev;
 	int offset = doe_mb->cap_offset;
@@ -295,18 +291,12 @@  static void doe_statemachine_work(struct work_struct *work)
 	signal_task_complete(task, rc);
 }
 
-static void pci_doe_task_complete(struct pci_doe_task *task)
-{
-	complete(task->private);
-}
-
 static int pci_doe_discovery(struct pci_doe_mb *doe_mb, u8 *index, u16 *vid,
 			     u8 *protocol)
 {
 	u32 request_pl = FIELD_PREP(PCI_DOE_DATA_OBJECT_DISC_REQ_3_INDEX,
 				    *index);
 	u32 response_pl;
-	DECLARE_COMPLETION_ONSTACK(c);
 	struct pci_doe_task task = {
 		.prot.vid = PCI_VENDOR_ID_PCI_SIG,
 		.prot.type = PCI_DOE_PROTOCOL_DISCOVERY,
@@ -314,17 +304,13 @@  static int pci_doe_discovery(struct pci_doe_mb *doe_mb, u8 *index, u16 *vid,
 		.request_pl_sz = sizeof(request_pl),
 		.response_pl = &response_pl,
 		.response_pl_sz = sizeof(response_pl),
-		.complete = pci_doe_task_complete,
-		.private = &c,
 	};
 	int rc;
 
-	rc = pci_doe_submit_task(doe_mb, &task);
+	rc = pci_doe_submit_task_wait(doe_mb, &task);
 	if (rc < 0)
 		return rc;
 
-	wait_for_completion(&c);
-
 	if (task.rv != sizeof(response_pl))
 		return -EIO;
 
@@ -376,13 +362,6 @@  static void pci_doe_xa_destroy(void *mb)
 	xa_destroy(&doe_mb->prots);
 }
 
-static void pci_doe_destroy_workqueue(void *mb)
-{
-	struct pci_doe_mb *doe_mb = mb;
-
-	destroy_workqueue(doe_mb->work_queue);
-}
-
 static void pci_doe_flush_mb(void *mb)
 {
 	struct pci_doe_mb *doe_mb = mb;
@@ -390,12 +369,9 @@  static void pci_doe_flush_mb(void *mb)
 	/* Stop all pending work items from starting */
 	set_bit(PCI_DOE_FLAG_DEAD, &doe_mb->flags);
 
-	/* Cancel an in progress work item, if necessary */
+	/* Cancel the in progress task and waiting tasks, if necessary */
 	set_bit(PCI_DOE_FLAG_CANCEL, &doe_mb->flags);
 	wake_up(&doe_mb->wq);
-
-	/* Flush all work items */
-	flush_workqueue(doe_mb->work_queue);
 }
 
 /**
@@ -423,25 +399,13 @@  struct pci_doe_mb *pcim_doe_create_mb(struct pci_dev *pdev, u16 cap_offset)
 	doe_mb->pdev = pdev;
 	doe_mb->cap_offset = cap_offset;
 	init_waitqueue_head(&doe_mb->wq);
+	mutex_init(&doe_mb->exec_lock);
 
 	xa_init(&doe_mb->prots);
 	rc = devm_add_action(dev, pci_doe_xa_destroy, doe_mb);
 	if (rc)
 		return ERR_PTR(rc);
 
-	doe_mb->work_queue = alloc_ordered_workqueue("%s %s DOE [%x]", 0,
-						dev_driver_string(&pdev->dev),
-						pci_name(pdev),
-						doe_mb->cap_offset);
-	if (!doe_mb->work_queue) {
-		pci_err(pdev, "[%x] failed to allocate work queue\n",
-			doe_mb->cap_offset);
-		return ERR_PTR(-ENOMEM);
-	}
-	rc = devm_add_action_or_reset(dev, pci_doe_destroy_workqueue, doe_mb);
-	if (rc)
-		return ERR_PTR(rc);
-
 	/* Reset the mailbox by issuing an abort */
 	rc = pci_doe_abort(doe_mb);
 	if (rc) {
@@ -496,23 +460,22 @@  bool pci_doe_supports_prot(struct pci_doe_mb *doe_mb, u16 vid, u8 type)
 EXPORT_SYMBOL_GPL(pci_doe_supports_prot);
 
 /**
- * pci_doe_submit_task() - Submit a task to be processed by the state machine
+ * pci_doe_submit_task_wait() - Submit and execute a task
  *
  * @doe_mb: DOE mailbox capability to submit to
- * @task: task to be queued
- *
- * Submit a DOE task (request/response) to the DOE mailbox to be processed.
- * Returns upon queueing the task object.  If the queue is full this function
- * will sleep until there is room in the queue.
+ * @task: task to be run
  *
- * task->complete will be called when the state machine is done processing this
- * task.
+ * Submit and run DOE task (request/response) to the DOE mailbox to be
+ * processed.
  *
  * Excess data will be discarded.
  *
- * RETURNS: 0 when task has been successfully queued, -ERRNO on error
+ * Context: non-interrupt
+ *
+ * RETURNS: 0 when task was executed, the @task->rv holds the status
+ * result of the executed opertion, -ERRNO on failure to submit.
  */
-int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
+int pci_doe_submit_task_wait(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
 {
 	if (!pci_doe_supports_prot(doe_mb, task->prot.vid, task->prot.type))
 		return -EINVAL;
@@ -529,8 +492,18 @@  int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
 		return -EIO;
 
 	task->doe_mb = doe_mb;
-	INIT_WORK(&task->work, doe_statemachine_work);
-	queue_work(doe_mb->work_queue, &task->work);
+
+again:
+	if (!mutex_trylock(&doe_mb->exec_lock)) {
+		if (wait_event_timeout(task->doe_mb->wq,
+				test_bit(PCI_DOE_FLAG_CANCEL, &doe_mb->flags),
+				PCI_DOE_POLL_INTERVAL))
+			return -EIO;
+		goto again;
+	}
+	exec_task(task);
+	mutex_unlock(&doe_mb->exec_lock);
+
 	return 0;
 }
-EXPORT_SYMBOL_GPL(pci_doe_submit_task);
+EXPORT_SYMBOL_GPL(pci_doe_submit_task_wait);
diff --git a/include/linux/pci-doe.h b/include/linux/pci-doe.h
index ed9b4df792b8..c94122a66221 100644
--- a/include/linux/pci-doe.h
+++ b/include/linux/pci-doe.h
@@ -30,8 +30,6 @@  struct pci_doe_mb;
  * @response_pl_sz: Size of the response payload (bytes)
  * @rv: Return value.  Length of received response or error (bytes)
  * @complete: Called when task is complete
- * @private: Private data for the consumer
- * @work: Used internally by the mailbox
  * @doe_mb: Used internally by the mailbox
  *
  * The payload sizes and rv are specified in bytes with the following
@@ -50,11 +48,6 @@  struct pci_doe_task {
 	u32 *response_pl;
 	size_t response_pl_sz;
 	int rv;
-	void (*complete)(struct pci_doe_task *task);
-	void *private;
-
-	/* No need for the user to initialize these fields */
-	struct work_struct work;
 	struct pci_doe_mb *doe_mb;
 };
 
@@ -72,6 +65,5 @@  struct pci_doe_task {
 
 struct pci_doe_mb *pcim_doe_create_mb(struct pci_dev *pdev, u16 cap_offset);
 bool pci_doe_supports_prot(struct pci_doe_mb *doe_mb, u16 vid, u8 type);
-int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task);
-
+int pci_doe_submit_task_wait(struct pci_doe_mb *doe_mb, struct pci_doe_task *task);
 #endif