diff mbox series

[for-next,v3,1/7] RDMA/rxe: Convert triple tasklets to use workqueue

Message ID d2f6b3aca61fe1858a97cda94691eece6b0e60bd.1671772917.git.matsuda-daisuke@fujitsu.com (mailing list archive)
State Changes Requested
Headers show
Series On-Demand Paging on SoftRoCE | expand

Commit Message

Daisuke Matsuda (Fujitsu) Dec. 23, 2022, 6:51 a.m. UTC
In order to implement On-Demand Paging on the rxe driver, triple tasklets
(requester, responder, and completer) must be allowed to sleep so that they
can trigger page fault when pages being accessed are not present.

This patch replaces the tasklets with a workqueue, but still allows direct-
call of works from softirq context if it is obvious that MRs are not going
to be accessed and there is no work being processed in the workqueue.

As counterparts to tasklet_disable() and tasklet_enable() are missing for
workqueues, an atomic value is introduced to prevent work items from being
scheduled while qp reset is in progress.

The way to initialize/destroy workqueue is picked up from the
implementation of Ian Ziemba and Bob Pearson at HPE.

Link: https://lore.kernel.org/all/20221018043345.4033-1-rpearsonhpe@gmail.com/
Signed-off-by: Daisuke Matsuda <matsuda-daisuke@fujitsu.com>
---
 drivers/infiniband/sw/rxe/rxe.c       |  9 ++++-
 drivers/infiniband/sw/rxe/rxe_comp.c  |  2 +-
 drivers/infiniband/sw/rxe/rxe_param.h |  2 +-
 drivers/infiniband/sw/rxe/rxe_qp.c    |  2 +-
 drivers/infiniband/sw/rxe/rxe_req.c   |  2 +-
 drivers/infiniband/sw/rxe/rxe_resp.c  |  2 +-
 drivers/infiniband/sw/rxe/rxe_task.c  | 52 ++++++++++++++++++++-------
 drivers/infiniband/sw/rxe/rxe_task.h  | 15 ++++++--
 8 files changed, 65 insertions(+), 21 deletions(-)

Comments

Bob Pearson Dec. 28, 2022, 4:56 p.m. UTC | #1
On 12/23/22 00:51, Daisuke Matsuda wrote:
> In order to implement On-Demand Paging on the rxe driver, triple tasklets
> (requester, responder, and completer) must be allowed to sleep so that they
> can trigger page fault when pages being accessed are not present.
> 
> This patch replaces the tasklets with a workqueue, but still allows direct-
> call of works from softirq context if it is obvious that MRs are not going
> to be accessed and there is no work being processed in the workqueue.

There are already at least two patch sets that do this waiting to get upstream.
Bob

> 
> As counterparts to tasklet_disable() and tasklet_enable() are missing for
> workqueues, an atomic value is introduced to prevent work items from being
> scheduled while qp reset is in progress.
> 
> The way to initialize/destroy workqueue is picked up from the
> implementation of Ian Ziemba and Bob Pearson at HPE.
> 
> Link: https://lore.kernel.org/all/20221018043345.4033-1-rpearsonhpe@gmail.com/
> Signed-off-by: Daisuke Matsuda <matsuda-daisuke@fujitsu.com>
> ---
>  drivers/infiniband/sw/rxe/rxe.c       |  9 ++++-
>  drivers/infiniband/sw/rxe/rxe_comp.c  |  2 +-
>  drivers/infiniband/sw/rxe/rxe_param.h |  2 +-
>  drivers/infiniband/sw/rxe/rxe_qp.c    |  2 +-
>  drivers/infiniband/sw/rxe/rxe_req.c   |  2 +-
>  drivers/infiniband/sw/rxe/rxe_resp.c  |  2 +-
>  drivers/infiniband/sw/rxe/rxe_task.c  | 52 ++++++++++++++++++++-------
>  drivers/infiniband/sw/rxe/rxe_task.h  | 15 ++++++--
>  8 files changed, 65 insertions(+), 21 deletions(-)
> 
> diff --git a/drivers/infiniband/sw/rxe/rxe.c b/drivers/infiniband/sw/rxe/rxe.c
> index 136c2efe3466..3c7e42e5b0c7 100644
> --- a/drivers/infiniband/sw/rxe/rxe.c
> +++ b/drivers/infiniband/sw/rxe/rxe.c
> @@ -210,10 +210,16 @@ static int __init rxe_module_init(void)
>  {
>  	int err;
>  
> -	err = rxe_net_init();
> +	err = rxe_alloc_wq();
>  	if (err)
>  		return err;
>  
> +	err = rxe_net_init();
> +	if (err) {
> +		rxe_destroy_wq();
> +		return err;
> +	}
> +
>  	rdma_link_register(&rxe_link_ops);
>  	pr_info("loaded\n");
>  	return 0;
> @@ -224,6 +230,7 @@ static void __exit rxe_module_exit(void)
>  	rdma_link_unregister(&rxe_link_ops);
>  	ib_unregister_driver(RDMA_DRIVER_RXE);
>  	rxe_net_exit();
> +	rxe_destroy_wq();
>  
>  	pr_info("unloaded\n");
>  }
> diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
> index 20737fec392b..046bbacce37c 100644
> --- a/drivers/infiniband/sw/rxe/rxe_comp.c
> +++ b/drivers/infiniband/sw/rxe/rxe_comp.c
> @@ -773,7 +773,7 @@ int rxe_completer(void *arg)
>  	}
>  
>  	/* A non-zero return value will cause rxe_do_task to
> -	 * exit its loop and end the tasklet. A zero return
> +	 * exit its loop and end the work item. A zero return
>  	 * will continue looping and return to rxe_completer
>  	 */
>  done:
> diff --git a/drivers/infiniband/sw/rxe/rxe_param.h b/drivers/infiniband/sw/rxe/rxe_param.h
> index a754fc902e3d..bd8050e99d6b 100644
> --- a/drivers/infiniband/sw/rxe/rxe_param.h
> +++ b/drivers/infiniband/sw/rxe/rxe_param.h
> @@ -112,7 +112,7 @@ enum rxe_device_param {
>  	RXE_INFLIGHT_SKBS_PER_QP_HIGH	= 64,
>  	RXE_INFLIGHT_SKBS_PER_QP_LOW	= 16,
>  
> -	/* Max number of interations of each tasklet
> +	/* Max number of interations of each work item
>  	 * before yielding the cpu to let other
>  	 * work make progress
>  	 */
> diff --git a/drivers/infiniband/sw/rxe/rxe_qp.c b/drivers/infiniband/sw/rxe/rxe_qp.c
> index ab72db68b58f..e033b2449dfe 100644
> --- a/drivers/infiniband/sw/rxe/rxe_qp.c
> +++ b/drivers/infiniband/sw/rxe/rxe_qp.c
> @@ -471,7 +471,7 @@ int rxe_qp_chk_attr(struct rxe_dev *rxe, struct rxe_qp *qp,
>  /* move the qp to the reset state */
>  static void rxe_qp_reset(struct rxe_qp *qp)
>  {
> -	/* stop tasks from running */
> +	/* flush workqueue and stop tasks from running */
>  	rxe_disable_task(&qp->resp.task);
>  
>  	/* stop request/comp */
> diff --git a/drivers/infiniband/sw/rxe/rxe_req.c b/drivers/infiniband/sw/rxe/rxe_req.c
> index 899c8779f800..2bcd287a2c3b 100644
> --- a/drivers/infiniband/sw/rxe/rxe_req.c
> +++ b/drivers/infiniband/sw/rxe/rxe_req.c
> @@ -830,7 +830,7 @@ int rxe_requester(void *arg)
>  	update_state(qp, &pkt);
>  
>  	/* A non-zero return value will cause rxe_do_task to
> -	 * exit its loop and end the tasklet. A zero return
> +	 * exit its loop and end the work item. A zero return
>  	 * will continue looping and return to rxe_requester
>  	 */
>  done:
> diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
> index c74972244f08..d9134a00a529 100644
> --- a/drivers/infiniband/sw/rxe/rxe_resp.c
> +++ b/drivers/infiniband/sw/rxe/rxe_resp.c
> @@ -1691,7 +1691,7 @@ int rxe_responder(void *arg)
>  	}
>  
>  	/* A non-zero return value will cause rxe_do_task to
> -	 * exit its loop and end the tasklet. A zero return
> +	 * exit its loop and end the work item. A zero return
>  	 * will continue looping and return to rxe_responder
>  	 */
>  done:
> diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
> index 60b90e33a884..b96f72aa9005 100644
> --- a/drivers/infiniband/sw/rxe/rxe_task.c
> +++ b/drivers/infiniband/sw/rxe/rxe_task.c
> @@ -6,6 +6,22 @@
>  
>  #include "rxe.h"
>  
> +static struct workqueue_struct *rxe_wq;
> +
> +int rxe_alloc_wq(void)
> +{
> +	rxe_wq = alloc_workqueue("rxe_wq", WQ_CPU_INTENSIVE, WQ_MAX_ACTIVE);
> +	if (!rxe_wq)
> +		return -ENOMEM;
> +
> +	return 0;
> +}
> +
> +void rxe_destroy_wq(void)
> +{
> +	destroy_workqueue(rxe_wq);
> +}
> +
>  int __rxe_do_task(struct rxe_task *task)
>  
>  {
> @@ -24,11 +40,11 @@ int __rxe_do_task(struct rxe_task *task)
>   * a second caller finds the task already running
>   * but looks just after the last call to func
>   */
> -static void do_task(struct tasklet_struct *t)
> +static void do_task(struct work_struct *w)
>  {
>  	int cont;
>  	int ret;
> -	struct rxe_task *task = from_tasklet(task, t, tasklet);
> +	struct rxe_task *task = container_of(w, typeof(*task), work);
>  	struct rxe_qp *qp = (struct rxe_qp *)task->arg;
>  	unsigned int iterations = RXE_MAX_ITERATIONS;
>  
> @@ -64,10 +80,10 @@ static void do_task(struct tasklet_struct *t)
>  			} else if (iterations--) {
>  				cont = 1;
>  			} else {
> -				/* reschedule the tasklet and exit
> +				/* reschedule the work item and exit
>  				 * the loop to give up the cpu
>  				 */
> -				tasklet_schedule(&task->tasklet);
> +				queue_work(task->workq, &task->work);
>  				task->state = TASK_STATE_START;
>  			}
>  			break;
> @@ -97,7 +113,8 @@ int rxe_init_task(struct rxe_task *task, void *arg, int (*func)(void *))
>  	task->func	= func;
>  	task->destroyed	= false;
>  
> -	tasklet_setup(&task->tasklet, do_task);
> +	INIT_WORK(&task->work, do_task);
> +	task->workq = rxe_wq;
>  
>  	task->state = TASK_STATE_START;
>  	spin_lock_init(&task->lock);
> @@ -111,17 +128,16 @@ void rxe_cleanup_task(struct rxe_task *task)
>  
>  	/*
>  	 * Mark the task, then wait for it to finish. It might be
> -	 * running in a non-tasklet (direct call) context.
> +	 * running in a non-workqueue (direct call) context.
>  	 */
>  	task->destroyed = true;
> +	flush_workqueue(task->workq);
>  
>  	do {
>  		spin_lock_bh(&task->lock);
>  		idle = (task->state == TASK_STATE_START);
>  		spin_unlock_bh(&task->lock);
>  	} while (!idle);
> -
> -	tasklet_kill(&task->tasklet);
>  }
>  
>  void rxe_run_task(struct rxe_task *task)
> @@ -129,7 +145,7 @@ void rxe_run_task(struct rxe_task *task)
>  	if (task->destroyed)
>  		return;
>  
> -	do_task(&task->tasklet);
> +	do_task(&task->work);
>  }
>  
>  void rxe_sched_task(struct rxe_task *task)
> @@ -137,15 +153,27 @@ void rxe_sched_task(struct rxe_task *task)
>  	if (task->destroyed)
>  		return;
>  
> -	tasklet_schedule(&task->tasklet);
> +	/*
> +	 * busy-loop while qp reset is in progress.
> +	 * This may be called from softirq context and thus cannot sleep.
> +	 */
> +	while (atomic_read(&task->suspended))
> +		cpu_relax();
> +
> +	queue_work(task->workq, &task->work);
>  }
>  
>  void rxe_disable_task(struct rxe_task *task)
>  {
> -	tasklet_disable(&task->tasklet);
> +	/* Alternative to tasklet_disable() */
> +	atomic_inc(&task->suspended);
> +	smp_mb__after_atomic();
> +	flush_workqueue(task->workq);
>  }
>  
>  void rxe_enable_task(struct rxe_task *task)
>  {
> -	tasklet_enable(&task->tasklet);
> +	/* Alternative to tasklet_enable() */
> +	smp_mb__before_atomic();
> +	atomic_dec(&task->suspended);
>  }
> diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
> index 7b88129702ac..9aa3f236e886 100644
> --- a/drivers/infiniband/sw/rxe/rxe_task.h
> +++ b/drivers/infiniband/sw/rxe/rxe_task.h
> @@ -19,15 +19,22 @@ enum {
>   * called again.
>   */
>  struct rxe_task {
> -	struct tasklet_struct	tasklet;
> +	struct workqueue_struct	*workq;
> +	struct work_struct	work;
>  	int			state;
>  	spinlock_t		lock;
>  	void			*arg;
>  	int			(*func)(void *arg);
>  	int			ret;
>  	bool			destroyed;
> +	/* used to {dis, en}able per-qp work items */
> +	atomic_t		suspended;
>  };
>  
> +int rxe_alloc_wq(void);
> +
> +void rxe_destroy_wq(void);
> +
>  /*
>   * init rxe_task structure
>   *	arg  => parameter to pass to fcn
> @@ -40,18 +47,20 @@ void rxe_cleanup_task(struct rxe_task *task);
>  
>  /*
>   * raw call to func in loop without any checking
> - * can call when tasklets are disabled
> + * can call when tasks are suspended
>   */
>  int __rxe_do_task(struct rxe_task *task);
>  
> +/* run a task without scheduling */
>  void rxe_run_task(struct rxe_task *task);
>  
> +/* schedule a task into workqueue */
>  void rxe_sched_task(struct rxe_task *task);
>  
>  /* keep a task from scheduling */
>  void rxe_disable_task(struct rxe_task *task);
>  
> -/* allow task to run */
> +/* allow a task to run again */
>  void rxe_enable_task(struct rxe_task *task);
>  
>  #endif /* RXE_TASK_H */
Zhu Yanjun Dec. 29, 2022, 1:23 a.m. UTC | #2
On Thu, Dec 29, 2022 at 12:56 AM Bob Pearson <rpearsonhpe@gmail.com> wrote:
>
> On 12/23/22 00:51, Daisuke Matsuda wrote:
> > In order to implement On-Demand Paging on the rxe driver, triple tasklets
> > (requester, responder, and completer) must be allowed to sleep so that they
> > can trigger page fault when pages being accessed are not present.
> >
> > This patch replaces the tasklets with a workqueue, but still allows direct-
> > call of works from softirq context if it is obvious that MRs are not going
> > to be accessed and there is no work being processed in the workqueue.
>
> There are already at least two patch sets that do this waiting to get upstream.

RXE accepted several patch sets just now. So it needs time to make
tests and check bugs.

Zhu Yanjun

> Bob
>
> >
> > As counterparts to tasklet_disable() and tasklet_enable() are missing for
> > workqueues, an atomic value is introduced to prevent work items from being
> > scheduled while qp reset is in progress.
> >
> > The way to initialize/destroy workqueue is picked up from the
> > implementation of Ian Ziemba and Bob Pearson at HPE.
> >
> > Link: https://lore.kernel.org/all/20221018043345.4033-1-rpearsonhpe@gmail.com/
> > Signed-off-by: Daisuke Matsuda <matsuda-daisuke@fujitsu.com>
> > ---
> >  drivers/infiniband/sw/rxe/rxe.c       |  9 ++++-
> >  drivers/infiniband/sw/rxe/rxe_comp.c  |  2 +-
> >  drivers/infiniband/sw/rxe/rxe_param.h |  2 +-
> >  drivers/infiniband/sw/rxe/rxe_qp.c    |  2 +-
> >  drivers/infiniband/sw/rxe/rxe_req.c   |  2 +-
> >  drivers/infiniband/sw/rxe/rxe_resp.c  |  2 +-
> >  drivers/infiniband/sw/rxe/rxe_task.c  | 52 ++++++++++++++++++++-------
> >  drivers/infiniband/sw/rxe/rxe_task.h  | 15 ++++++--
> >  8 files changed, 65 insertions(+), 21 deletions(-)
> >
> > diff --git a/drivers/infiniband/sw/rxe/rxe.c b/drivers/infiniband/sw/rxe/rxe.c
> > index 136c2efe3466..3c7e42e5b0c7 100644
> > --- a/drivers/infiniband/sw/rxe/rxe.c
> > +++ b/drivers/infiniband/sw/rxe/rxe.c
> > @@ -210,10 +210,16 @@ static int __init rxe_module_init(void)
> >  {
> >       int err;
> >
> > -     err = rxe_net_init();
> > +     err = rxe_alloc_wq();
> >       if (err)
> >               return err;
> >
> > +     err = rxe_net_init();
> > +     if (err) {
> > +             rxe_destroy_wq();
> > +             return err;
> > +     }
> > +
> >       rdma_link_register(&rxe_link_ops);
> >       pr_info("loaded\n");
> >       return 0;
> > @@ -224,6 +230,7 @@ static void __exit rxe_module_exit(void)
> >       rdma_link_unregister(&rxe_link_ops);
> >       ib_unregister_driver(RDMA_DRIVER_RXE);
> >       rxe_net_exit();
> > +     rxe_destroy_wq();
> >
> >       pr_info("unloaded\n");
> >  }
> > diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
> > index 20737fec392b..046bbacce37c 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_comp.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_comp.c
> > @@ -773,7 +773,7 @@ int rxe_completer(void *arg)
> >       }
> >
> >       /* A non-zero return value will cause rxe_do_task to
> > -      * exit its loop and end the tasklet. A zero return
> > +      * exit its loop and end the work item. A zero return
> >        * will continue looping and return to rxe_completer
> >        */
> >  done:
> > diff --git a/drivers/infiniband/sw/rxe/rxe_param.h b/drivers/infiniband/sw/rxe/rxe_param.h
> > index a754fc902e3d..bd8050e99d6b 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_param.h
> > +++ b/drivers/infiniband/sw/rxe/rxe_param.h
> > @@ -112,7 +112,7 @@ enum rxe_device_param {
> >       RXE_INFLIGHT_SKBS_PER_QP_HIGH   = 64,
> >       RXE_INFLIGHT_SKBS_PER_QP_LOW    = 16,
> >
> > -     /* Max number of interations of each tasklet
> > +     /* Max number of interations of each work item
> >        * before yielding the cpu to let other
> >        * work make progress
> >        */
> > diff --git a/drivers/infiniband/sw/rxe/rxe_qp.c b/drivers/infiniband/sw/rxe/rxe_qp.c
> > index ab72db68b58f..e033b2449dfe 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_qp.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_qp.c
> > @@ -471,7 +471,7 @@ int rxe_qp_chk_attr(struct rxe_dev *rxe, struct rxe_qp *qp,
> >  /* move the qp to the reset state */
> >  static void rxe_qp_reset(struct rxe_qp *qp)
> >  {
> > -     /* stop tasks from running */
> > +     /* flush workqueue and stop tasks from running */
> >       rxe_disable_task(&qp->resp.task);
> >
> >       /* stop request/comp */
> > diff --git a/drivers/infiniband/sw/rxe/rxe_req.c b/drivers/infiniband/sw/rxe/rxe_req.c
> > index 899c8779f800..2bcd287a2c3b 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_req.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_req.c
> > @@ -830,7 +830,7 @@ int rxe_requester(void *arg)
> >       update_state(qp, &pkt);
> >
> >       /* A non-zero return value will cause rxe_do_task to
> > -      * exit its loop and end the tasklet. A zero return
> > +      * exit its loop and end the work item. A zero return
> >        * will continue looping and return to rxe_requester
> >        */
> >  done:
> > diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
> > index c74972244f08..d9134a00a529 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_resp.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_resp.c
> > @@ -1691,7 +1691,7 @@ int rxe_responder(void *arg)
> >       }
> >
> >       /* A non-zero return value will cause rxe_do_task to
> > -      * exit its loop and end the tasklet. A zero return
> > +      * exit its loop and end the work item. A zero return
> >        * will continue looping and return to rxe_responder
> >        */
> >  done:
> > diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
> > index 60b90e33a884..b96f72aa9005 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_task.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_task.c
> > @@ -6,6 +6,22 @@
> >
> >  #include "rxe.h"
> >
> > +static struct workqueue_struct *rxe_wq;
> > +
> > +int rxe_alloc_wq(void)
> > +{
> > +     rxe_wq = alloc_workqueue("rxe_wq", WQ_CPU_INTENSIVE, WQ_MAX_ACTIVE);
> > +     if (!rxe_wq)
> > +             return -ENOMEM;
> > +
> > +     return 0;
> > +}
> > +
> > +void rxe_destroy_wq(void)
> > +{
> > +     destroy_workqueue(rxe_wq);
> > +}
> > +
> >  int __rxe_do_task(struct rxe_task *task)
> >
> >  {
> > @@ -24,11 +40,11 @@ int __rxe_do_task(struct rxe_task *task)
> >   * a second caller finds the task already running
> >   * but looks just after the last call to func
> >   */
> > -static void do_task(struct tasklet_struct *t)
> > +static void do_task(struct work_struct *w)
> >  {
> >       int cont;
> >       int ret;
> > -     struct rxe_task *task = from_tasklet(task, t, tasklet);
> > +     struct rxe_task *task = container_of(w, typeof(*task), work);
> >       struct rxe_qp *qp = (struct rxe_qp *)task->arg;
> >       unsigned int iterations = RXE_MAX_ITERATIONS;
> >
> > @@ -64,10 +80,10 @@ static void do_task(struct tasklet_struct *t)
> >                       } else if (iterations--) {
> >                               cont = 1;
> >                       } else {
> > -                             /* reschedule the tasklet and exit
> > +                             /* reschedule the work item and exit
> >                                * the loop to give up the cpu
> >                                */
> > -                             tasklet_schedule(&task->tasklet);
> > +                             queue_work(task->workq, &task->work);
> >                               task->state = TASK_STATE_START;
> >                       }
> >                       break;
> > @@ -97,7 +113,8 @@ int rxe_init_task(struct rxe_task *task, void *arg, int (*func)(void *))
> >       task->func      = func;
> >       task->destroyed = false;
> >
> > -     tasklet_setup(&task->tasklet, do_task);
> > +     INIT_WORK(&task->work, do_task);
> > +     task->workq = rxe_wq;
> >
> >       task->state = TASK_STATE_START;
> >       spin_lock_init(&task->lock);
> > @@ -111,17 +128,16 @@ void rxe_cleanup_task(struct rxe_task *task)
> >
> >       /*
> >        * Mark the task, then wait for it to finish. It might be
> > -      * running in a non-tasklet (direct call) context.
> > +      * running in a non-workqueue (direct call) context.
> >        */
> >       task->destroyed = true;
> > +     flush_workqueue(task->workq);
> >
> >       do {
> >               spin_lock_bh(&task->lock);
> >               idle = (task->state == TASK_STATE_START);
> >               spin_unlock_bh(&task->lock);
> >       } while (!idle);
> > -
> > -     tasklet_kill(&task->tasklet);
> >  }
> >
> >  void rxe_run_task(struct rxe_task *task)
> > @@ -129,7 +145,7 @@ void rxe_run_task(struct rxe_task *task)
> >       if (task->destroyed)
> >               return;
> >
> > -     do_task(&task->tasklet);
> > +     do_task(&task->work);
> >  }
> >
> >  void rxe_sched_task(struct rxe_task *task)
> > @@ -137,15 +153,27 @@ void rxe_sched_task(struct rxe_task *task)
> >       if (task->destroyed)
> >               return;
> >
> > -     tasklet_schedule(&task->tasklet);
> > +     /*
> > +      * busy-loop while qp reset is in progress.
> > +      * This may be called from softirq context and thus cannot sleep.
> > +      */
> > +     while (atomic_read(&task->suspended))
> > +             cpu_relax();
> > +
> > +     queue_work(task->workq, &task->work);
> >  }
> >
> >  void rxe_disable_task(struct rxe_task *task)
> >  {
> > -     tasklet_disable(&task->tasklet);
> > +     /* Alternative to tasklet_disable() */
> > +     atomic_inc(&task->suspended);
> > +     smp_mb__after_atomic();
> > +     flush_workqueue(task->workq);
> >  }
> >
> >  void rxe_enable_task(struct rxe_task *task)
> >  {
> > -     tasklet_enable(&task->tasklet);
> > +     /* Alternative to tasklet_enable() */
> > +     smp_mb__before_atomic();
> > +     atomic_dec(&task->suspended);
> >  }
> > diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
> > index 7b88129702ac..9aa3f236e886 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_task.h
> > +++ b/drivers/infiniband/sw/rxe/rxe_task.h
> > @@ -19,15 +19,22 @@ enum {
> >   * called again.
> >   */
> >  struct rxe_task {
> > -     struct tasklet_struct   tasklet;
> > +     struct workqueue_struct *workq;
> > +     struct work_struct      work;
> >       int                     state;
> >       spinlock_t              lock;
> >       void                    *arg;
> >       int                     (*func)(void *arg);
> >       int                     ret;
> >       bool                    destroyed;
> > +     /* used to {dis, en}able per-qp work items */
> > +     atomic_t                suspended;
> >  };
> >
> > +int rxe_alloc_wq(void);
> > +
> > +void rxe_destroy_wq(void);
> > +
> >  /*
> >   * init rxe_task structure
> >   *   arg  => parameter to pass to fcn
> > @@ -40,18 +47,20 @@ void rxe_cleanup_task(struct rxe_task *task);
> >
> >  /*
> >   * raw call to func in loop without any checking
> > - * can call when tasklets are disabled
> > + * can call when tasks are suspended
> >   */
> >  int __rxe_do_task(struct rxe_task *task);
> >
> > +/* run a task without scheduling */
> >  void rxe_run_task(struct rxe_task *task);
> >
> > +/* schedule a task into workqueue */
> >  void rxe_sched_task(struct rxe_task *task);
> >
> >  /* keep a task from scheduling */
> >  void rxe_disable_task(struct rxe_task *task);
> >
> > -/* allow task to run */
> > +/* allow a task to run again */
> >  void rxe_enable_task(struct rxe_task *task);
> >
> >  #endif /* RXE_TASK_H */
>
Leon Romanovsky Dec. 29, 2022, 7:02 a.m. UTC | #3
On Wed, Dec 28, 2022 at 10:56:11AM -0600, Bob Pearson wrote:
> On 12/23/22 00:51, Daisuke Matsuda wrote:
> > In order to implement On-Demand Paging on the rxe driver, triple tasklets
> > (requester, responder, and completer) must be allowed to sleep so that they
> > can trigger page fault when pages being accessed are not present.
> > 
> > This patch replaces the tasklets with a workqueue, but still allows direct-
> > call of works from softirq context if it is obvious that MRs are not going
> > to be accessed and there is no work being processed in the workqueue.
> 
> There are already at least two patch sets that do this waiting to get upstream

I don't see any unhandled RXE series, except of this one patch [1],
which is one out larger series.

[1] https://patchwork.kernel.org/project/linux-rdma/patch/20221029031331.64518-1-rpearsonhpe@gmail.com/

> Bob
Daisuke Matsuda (Fujitsu) Jan. 6, 2023, 2:26 a.m. UTC | #4
On Thu, Dec 29, 2022 1:56 AM Bob Pearson wrote:
> 
> On 12/23/22 00:51, Daisuke Matsuda wrote:
> > In order to implement On-Demand Paging on the rxe driver, triple tasklets
> > (requester, responder, and completer) must be allowed to sleep so that they
> > can trigger page fault when pages being accessed are not present.
> >
> > This patch replaces the tasklets with a workqueue, but still allows direct-
> > call of works from softirq context if it is obvious that MRs are not going
> > to be accessed and there is no work being processed in the workqueue.
> 
> There are already at least two patch sets that do this waiting to get upstream.
> Bob

I wrote my intention at the first part of the cover letter.
Cf. https://lore.kernel.org/lkml/cover.1671772917.git.matsuda-daisuke@fujitsu.com/

Your patch set introduces a soft lockup issue. It would take much more time to find
the root cause than to simply convert the tasklets to a workqueue with this patch.
My ODP patches have been stuck for almost 4 months because of this issue, and I am
not willing to wait any longer.

Daisuke

> 
> >
> > As counterparts to tasklet_disable() and tasklet_enable() are missing for
> > workqueues, an atomic value is introduced to prevent work items from being
> > scheduled while qp reset is in progress.
> >
> > The way to initialize/destroy workqueue is picked up from the
> > implementation of Ian Ziemba and Bob Pearson at HPE.
> >
> > Link: https://lore.kernel.org/all/20221018043345.4033-1-rpearsonhpe@gmail.com/
> > Signed-off-by: Daisuke Matsuda <matsuda-daisuke@fujitsu.com>
> > ---
> >  drivers/infiniband/sw/rxe/rxe.c       |  9 ++++-
> >  drivers/infiniband/sw/rxe/rxe_comp.c  |  2 +-
> >  drivers/infiniband/sw/rxe/rxe_param.h |  2 +-
> >  drivers/infiniband/sw/rxe/rxe_qp.c    |  2 +-
> >  drivers/infiniband/sw/rxe/rxe_req.c   |  2 +-
> >  drivers/infiniband/sw/rxe/rxe_resp.c  |  2 +-
> >  drivers/infiniband/sw/rxe/rxe_task.c  | 52 ++++++++++++++++++++-------
> >  drivers/infiniband/sw/rxe/rxe_task.h  | 15 ++++++--
> >  8 files changed, 65 insertions(+), 21 deletions(-)
> >
> > diff --git a/drivers/infiniband/sw/rxe/rxe.c b/drivers/infiniband/sw/rxe/rxe.c
> > index 136c2efe3466..3c7e42e5b0c7 100644
> > --- a/drivers/infiniband/sw/rxe/rxe.c
> > +++ b/drivers/infiniband/sw/rxe/rxe.c
> > @@ -210,10 +210,16 @@ static int __init rxe_module_init(void)
> >  {
> >  	int err;
> >
> > -	err = rxe_net_init();
> > +	err = rxe_alloc_wq();
> >  	if (err)
> >  		return err;
> >
> > +	err = rxe_net_init();
> > +	if (err) {
> > +		rxe_destroy_wq();
> > +		return err;
> > +	}
> > +
> >  	rdma_link_register(&rxe_link_ops);
> >  	pr_info("loaded\n");
> >  	return 0;
> > @@ -224,6 +230,7 @@ static void __exit rxe_module_exit(void)
> >  	rdma_link_unregister(&rxe_link_ops);
> >  	ib_unregister_driver(RDMA_DRIVER_RXE);
> >  	rxe_net_exit();
> > +	rxe_destroy_wq();
> >
> >  	pr_info("unloaded\n");
> >  }
> > diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
> > index 20737fec392b..046bbacce37c 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_comp.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_comp.c
> > @@ -773,7 +773,7 @@ int rxe_completer(void *arg)
> >  	}
> >
> >  	/* A non-zero return value will cause rxe_do_task to
> > -	 * exit its loop and end the tasklet. A zero return
> > +	 * exit its loop and end the work item. A zero return
> >  	 * will continue looping and return to rxe_completer
> >  	 */
> >  done:
> > diff --git a/drivers/infiniband/sw/rxe/rxe_param.h b/drivers/infiniband/sw/rxe/rxe_param.h
> > index a754fc902e3d..bd8050e99d6b 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_param.h
> > +++ b/drivers/infiniband/sw/rxe/rxe_param.h
> > @@ -112,7 +112,7 @@ enum rxe_device_param {
> >  	RXE_INFLIGHT_SKBS_PER_QP_HIGH	= 64,
> >  	RXE_INFLIGHT_SKBS_PER_QP_LOW	= 16,
> >
> > -	/* Max number of interations of each tasklet
> > +	/* Max number of interations of each work item
> >  	 * before yielding the cpu to let other
> >  	 * work make progress
> >  	 */
> > diff --git a/drivers/infiniband/sw/rxe/rxe_qp.c b/drivers/infiniband/sw/rxe/rxe_qp.c
> > index ab72db68b58f..e033b2449dfe 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_qp.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_qp.c
> > @@ -471,7 +471,7 @@ int rxe_qp_chk_attr(struct rxe_dev *rxe, struct rxe_qp *qp,
> >  /* move the qp to the reset state */
> >  static void rxe_qp_reset(struct rxe_qp *qp)
> >  {
> > -	/* stop tasks from running */
> > +	/* flush workqueue and stop tasks from running */
> >  	rxe_disable_task(&qp->resp.task);
> >
> >  	/* stop request/comp */
> > diff --git a/drivers/infiniband/sw/rxe/rxe_req.c b/drivers/infiniband/sw/rxe/rxe_req.c
> > index 899c8779f800..2bcd287a2c3b 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_req.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_req.c
> > @@ -830,7 +830,7 @@ int rxe_requester(void *arg)
> >  	update_state(qp, &pkt);
> >
> >  	/* A non-zero return value will cause rxe_do_task to
> > -	 * exit its loop and end the tasklet. A zero return
> > +	 * exit its loop and end the work item. A zero return
> >  	 * will continue looping and return to rxe_requester
> >  	 */
> >  done:
> > diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
> > index c74972244f08..d9134a00a529 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_resp.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_resp.c
> > @@ -1691,7 +1691,7 @@ int rxe_responder(void *arg)
> >  	}
> >
> >  	/* A non-zero return value will cause rxe_do_task to
> > -	 * exit its loop and end the tasklet. A zero return
> > +	 * exit its loop and end the work item. A zero return
> >  	 * will continue looping and return to rxe_responder
> >  	 */
> >  done:
> > diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
> > index 60b90e33a884..b96f72aa9005 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_task.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_task.c
> > @@ -6,6 +6,22 @@
> >
> >  #include "rxe.h"
> >
> > +static struct workqueue_struct *rxe_wq;
> > +
> > +int rxe_alloc_wq(void)
> > +{
> > +	rxe_wq = alloc_workqueue("rxe_wq", WQ_CPU_INTENSIVE, WQ_MAX_ACTIVE);
> > +	if (!rxe_wq)
> > +		return -ENOMEM;
> > +
> > +	return 0;
> > +}
> > +
> > +void rxe_destroy_wq(void)
> > +{
> > +	destroy_workqueue(rxe_wq);
> > +}
> > +
> >  int __rxe_do_task(struct rxe_task *task)
> >
> >  {
> > @@ -24,11 +40,11 @@ int __rxe_do_task(struct rxe_task *task)
> >   * a second caller finds the task already running
> >   * but looks just after the last call to func
> >   */
> > -static void do_task(struct tasklet_struct *t)
> > +static void do_task(struct work_struct *w)
> >  {
> >  	int cont;
> >  	int ret;
> > -	struct rxe_task *task = from_tasklet(task, t, tasklet);
> > +	struct rxe_task *task = container_of(w, typeof(*task), work);
> >  	struct rxe_qp *qp = (struct rxe_qp *)task->arg;
> >  	unsigned int iterations = RXE_MAX_ITERATIONS;
> >
> > @@ -64,10 +80,10 @@ static void do_task(struct tasklet_struct *t)
> >  			} else if (iterations--) {
> >  				cont = 1;
> >  			} else {
> > -				/* reschedule the tasklet and exit
> > +				/* reschedule the work item and exit
> >  				 * the loop to give up the cpu
> >  				 */
> > -				tasklet_schedule(&task->tasklet);
> > +				queue_work(task->workq, &task->work);
> >  				task->state = TASK_STATE_START;
> >  			}
> >  			break;
> > @@ -97,7 +113,8 @@ int rxe_init_task(struct rxe_task *task, void *arg, int (*func)(void *))
> >  	task->func	= func;
> >  	task->destroyed	= false;
> >
> > -	tasklet_setup(&task->tasklet, do_task);
> > +	INIT_WORK(&task->work, do_task);
> > +	task->workq = rxe_wq;
> >
> >  	task->state = TASK_STATE_START;
> >  	spin_lock_init(&task->lock);
> > @@ -111,17 +128,16 @@ void rxe_cleanup_task(struct rxe_task *task)
> >
> >  	/*
> >  	 * Mark the task, then wait for it to finish. It might be
> > -	 * running in a non-tasklet (direct call) context.
> > +	 * running in a non-workqueue (direct call) context.
> >  	 */
> >  	task->destroyed = true;
> > +	flush_workqueue(task->workq);
> >
> >  	do {
> >  		spin_lock_bh(&task->lock);
> >  		idle = (task->state == TASK_STATE_START);
> >  		spin_unlock_bh(&task->lock);
> >  	} while (!idle);
> > -
> > -	tasklet_kill(&task->tasklet);
> >  }
> >
> >  void rxe_run_task(struct rxe_task *task)
> > @@ -129,7 +145,7 @@ void rxe_run_task(struct rxe_task *task)
> >  	if (task->destroyed)
> >  		return;
> >
> > -	do_task(&task->tasklet);
> > +	do_task(&task->work);
> >  }
> >
> >  void rxe_sched_task(struct rxe_task *task)
> > @@ -137,15 +153,27 @@ void rxe_sched_task(struct rxe_task *task)
> >  	if (task->destroyed)
> >  		return;
> >
> > -	tasklet_schedule(&task->tasklet);
> > +	/*
> > +	 * busy-loop while qp reset is in progress.
> > +	 * This may be called from softirq context and thus cannot sleep.
> > +	 */
> > +	while (atomic_read(&task->suspended))
> > +		cpu_relax();
> > +
> > +	queue_work(task->workq, &task->work);
> >  }
> >
> >  void rxe_disable_task(struct rxe_task *task)
> >  {
> > -	tasklet_disable(&task->tasklet);
> > +	/* Alternative to tasklet_disable() */
> > +	atomic_inc(&task->suspended);
> > +	smp_mb__after_atomic();
> > +	flush_workqueue(task->workq);
> >  }
> >
> >  void rxe_enable_task(struct rxe_task *task)
> >  {
> > -	tasklet_enable(&task->tasklet);
> > +	/* Alternative to tasklet_enable() */
> > +	smp_mb__before_atomic();
> > +	atomic_dec(&task->suspended);
> >  }
> > diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
> > index 7b88129702ac..9aa3f236e886 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_task.h
> > +++ b/drivers/infiniband/sw/rxe/rxe_task.h
> > @@ -19,15 +19,22 @@ enum {
> >   * called again.
> >   */
> >  struct rxe_task {
> > -	struct tasklet_struct	tasklet;
> > +	struct workqueue_struct	*workq;
> > +	struct work_struct	work;
> >  	int			state;
> >  	spinlock_t		lock;
> >  	void			*arg;
> >  	int			(*func)(void *arg);
> >  	int			ret;
> >  	bool			destroyed;
> > +	/* used to {dis, en}able per-qp work items */
> > +	atomic_t		suspended;
> >  };
> >
> > +int rxe_alloc_wq(void);
> > +
> > +void rxe_destroy_wq(void);
> > +
> >  /*
> >   * init rxe_task structure
> >   *	arg  => parameter to pass to fcn
> > @@ -40,18 +47,20 @@ void rxe_cleanup_task(struct rxe_task *task);
> >
> >  /*
> >   * raw call to func in loop without any checking
> > - * can call when tasklets are disabled
> > + * can call when tasks are suspended
> >   */
> >  int __rxe_do_task(struct rxe_task *task);
> >
> > +/* run a task without scheduling */
> >  void rxe_run_task(struct rxe_task *task);
> >
> > +/* schedule a task into workqueue */
> >  void rxe_sched_task(struct rxe_task *task);
> >
> >  /* keep a task from scheduling */
> >  void rxe_disable_task(struct rxe_task *task);
> >
> > -/* allow task to run */
> > +/* allow a task to run again */
> >  void rxe_enable_task(struct rxe_task *task);
> >
> >  #endif /* RXE_TASK_H */
diff mbox series

Patch

diff --git a/drivers/infiniband/sw/rxe/rxe.c b/drivers/infiniband/sw/rxe/rxe.c
index 136c2efe3466..3c7e42e5b0c7 100644
--- a/drivers/infiniband/sw/rxe/rxe.c
+++ b/drivers/infiniband/sw/rxe/rxe.c
@@ -210,10 +210,16 @@  static int __init rxe_module_init(void)
 {
 	int err;
 
-	err = rxe_net_init();
+	err = rxe_alloc_wq();
 	if (err)
 		return err;
 
+	err = rxe_net_init();
+	if (err) {
+		rxe_destroy_wq();
+		return err;
+	}
+
 	rdma_link_register(&rxe_link_ops);
 	pr_info("loaded\n");
 	return 0;
@@ -224,6 +230,7 @@  static void __exit rxe_module_exit(void)
 	rdma_link_unregister(&rxe_link_ops);
 	ib_unregister_driver(RDMA_DRIVER_RXE);
 	rxe_net_exit();
+	rxe_destroy_wq();
 
 	pr_info("unloaded\n");
 }
diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
index 20737fec392b..046bbacce37c 100644
--- a/drivers/infiniband/sw/rxe/rxe_comp.c
+++ b/drivers/infiniband/sw/rxe/rxe_comp.c
@@ -773,7 +773,7 @@  int rxe_completer(void *arg)
 	}
 
 	/* A non-zero return value will cause rxe_do_task to
-	 * exit its loop and end the tasklet. A zero return
+	 * exit its loop and end the work item. A zero return
 	 * will continue looping and return to rxe_completer
 	 */
 done:
diff --git a/drivers/infiniband/sw/rxe/rxe_param.h b/drivers/infiniband/sw/rxe/rxe_param.h
index a754fc902e3d..bd8050e99d6b 100644
--- a/drivers/infiniband/sw/rxe/rxe_param.h
+++ b/drivers/infiniband/sw/rxe/rxe_param.h
@@ -112,7 +112,7 @@  enum rxe_device_param {
 	RXE_INFLIGHT_SKBS_PER_QP_HIGH	= 64,
 	RXE_INFLIGHT_SKBS_PER_QP_LOW	= 16,
 
-	/* Max number of interations of each tasklet
+	/* Max number of interations of each work item
 	 * before yielding the cpu to let other
 	 * work make progress
 	 */
diff --git a/drivers/infiniband/sw/rxe/rxe_qp.c b/drivers/infiniband/sw/rxe/rxe_qp.c
index ab72db68b58f..e033b2449dfe 100644
--- a/drivers/infiniband/sw/rxe/rxe_qp.c
+++ b/drivers/infiniband/sw/rxe/rxe_qp.c
@@ -471,7 +471,7 @@  int rxe_qp_chk_attr(struct rxe_dev *rxe, struct rxe_qp *qp,
 /* move the qp to the reset state */
 static void rxe_qp_reset(struct rxe_qp *qp)
 {
-	/* stop tasks from running */
+	/* flush workqueue and stop tasks from running */
 	rxe_disable_task(&qp->resp.task);
 
 	/* stop request/comp */
diff --git a/drivers/infiniband/sw/rxe/rxe_req.c b/drivers/infiniband/sw/rxe/rxe_req.c
index 899c8779f800..2bcd287a2c3b 100644
--- a/drivers/infiniband/sw/rxe/rxe_req.c
+++ b/drivers/infiniband/sw/rxe/rxe_req.c
@@ -830,7 +830,7 @@  int rxe_requester(void *arg)
 	update_state(qp, &pkt);
 
 	/* A non-zero return value will cause rxe_do_task to
-	 * exit its loop and end the tasklet. A zero return
+	 * exit its loop and end the work item. A zero return
 	 * will continue looping and return to rxe_requester
 	 */
 done:
diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
index c74972244f08..d9134a00a529 100644
--- a/drivers/infiniband/sw/rxe/rxe_resp.c
+++ b/drivers/infiniband/sw/rxe/rxe_resp.c
@@ -1691,7 +1691,7 @@  int rxe_responder(void *arg)
 	}
 
 	/* A non-zero return value will cause rxe_do_task to
-	 * exit its loop and end the tasklet. A zero return
+	 * exit its loop and end the work item. A zero return
 	 * will continue looping and return to rxe_responder
 	 */
 done:
diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
index 60b90e33a884..b96f72aa9005 100644
--- a/drivers/infiniband/sw/rxe/rxe_task.c
+++ b/drivers/infiniband/sw/rxe/rxe_task.c
@@ -6,6 +6,22 @@ 
 
 #include "rxe.h"
 
+static struct workqueue_struct *rxe_wq;
+
+int rxe_alloc_wq(void)
+{
+	rxe_wq = alloc_workqueue("rxe_wq", WQ_CPU_INTENSIVE, WQ_MAX_ACTIVE);
+	if (!rxe_wq)
+		return -ENOMEM;
+
+	return 0;
+}
+
+void rxe_destroy_wq(void)
+{
+	destroy_workqueue(rxe_wq);
+}
+
 int __rxe_do_task(struct rxe_task *task)
 
 {
@@ -24,11 +40,11 @@  int __rxe_do_task(struct rxe_task *task)
  * a second caller finds the task already running
  * but looks just after the last call to func
  */
-static void do_task(struct tasklet_struct *t)
+static void do_task(struct work_struct *w)
 {
 	int cont;
 	int ret;
-	struct rxe_task *task = from_tasklet(task, t, tasklet);
+	struct rxe_task *task = container_of(w, typeof(*task), work);
 	struct rxe_qp *qp = (struct rxe_qp *)task->arg;
 	unsigned int iterations = RXE_MAX_ITERATIONS;
 
@@ -64,10 +80,10 @@  static void do_task(struct tasklet_struct *t)
 			} else if (iterations--) {
 				cont = 1;
 			} else {
-				/* reschedule the tasklet and exit
+				/* reschedule the work item and exit
 				 * the loop to give up the cpu
 				 */
-				tasklet_schedule(&task->tasklet);
+				queue_work(task->workq, &task->work);
 				task->state = TASK_STATE_START;
 			}
 			break;
@@ -97,7 +113,8 @@  int rxe_init_task(struct rxe_task *task, void *arg, int (*func)(void *))
 	task->func	= func;
 	task->destroyed	= false;
 
-	tasklet_setup(&task->tasklet, do_task);
+	INIT_WORK(&task->work, do_task);
+	task->workq = rxe_wq;
 
 	task->state = TASK_STATE_START;
 	spin_lock_init(&task->lock);
@@ -111,17 +128,16 @@  void rxe_cleanup_task(struct rxe_task *task)
 
 	/*
 	 * Mark the task, then wait for it to finish. It might be
-	 * running in a non-tasklet (direct call) context.
+	 * running in a non-workqueue (direct call) context.
 	 */
 	task->destroyed = true;
+	flush_workqueue(task->workq);
 
 	do {
 		spin_lock_bh(&task->lock);
 		idle = (task->state == TASK_STATE_START);
 		spin_unlock_bh(&task->lock);
 	} while (!idle);
-
-	tasklet_kill(&task->tasklet);
 }
 
 void rxe_run_task(struct rxe_task *task)
@@ -129,7 +145,7 @@  void rxe_run_task(struct rxe_task *task)
 	if (task->destroyed)
 		return;
 
-	do_task(&task->tasklet);
+	do_task(&task->work);
 }
 
 void rxe_sched_task(struct rxe_task *task)
@@ -137,15 +153,27 @@  void rxe_sched_task(struct rxe_task *task)
 	if (task->destroyed)
 		return;
 
-	tasklet_schedule(&task->tasklet);
+	/*
+	 * busy-loop while qp reset is in progress.
+	 * This may be called from softirq context and thus cannot sleep.
+	 */
+	while (atomic_read(&task->suspended))
+		cpu_relax();
+
+	queue_work(task->workq, &task->work);
 }
 
 void rxe_disable_task(struct rxe_task *task)
 {
-	tasklet_disable(&task->tasklet);
+	/* Alternative to tasklet_disable() */
+	atomic_inc(&task->suspended);
+	smp_mb__after_atomic();
+	flush_workqueue(task->workq);
 }
 
 void rxe_enable_task(struct rxe_task *task)
 {
-	tasklet_enable(&task->tasklet);
+	/* Alternative to tasklet_enable() */
+	smp_mb__before_atomic();
+	atomic_dec(&task->suspended);
 }
diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
index 7b88129702ac..9aa3f236e886 100644
--- a/drivers/infiniband/sw/rxe/rxe_task.h
+++ b/drivers/infiniband/sw/rxe/rxe_task.h
@@ -19,15 +19,22 @@  enum {
  * called again.
  */
 struct rxe_task {
-	struct tasklet_struct	tasklet;
+	struct workqueue_struct	*workq;
+	struct work_struct	work;
 	int			state;
 	spinlock_t		lock;
 	void			*arg;
 	int			(*func)(void *arg);
 	int			ret;
 	bool			destroyed;
+	/* used to {dis, en}able per-qp work items */
+	atomic_t		suspended;
 };
 
+int rxe_alloc_wq(void);
+
+void rxe_destroy_wq(void);
+
 /*
  * init rxe_task structure
  *	arg  => parameter to pass to fcn
@@ -40,18 +47,20 @@  void rxe_cleanup_task(struct rxe_task *task);
 
 /*
  * raw call to func in loop without any checking
- * can call when tasklets are disabled
+ * can call when tasks are suspended
  */
 int __rxe_do_task(struct rxe_task *task);
 
+/* run a task without scheduling */
 void rxe_run_task(struct rxe_task *task);
 
+/* schedule a task into workqueue */
 void rxe_sched_task(struct rxe_task *task);
 
 /* keep a task from scheduling */
 void rxe_disable_task(struct rxe_task *task);
 
-/* allow task to run */
+/* allow a task to run again */
 void rxe_enable_task(struct rxe_task *task);
 
 #endif /* RXE_TASK_H */