Message ID | cover.1668157436.git.matsuda-daisuke@fujitsu.com (mailing list archive) |
---|---|
Headers | show |
Series | On-Demand Paging on SoftRoCE | expand |
On Fri, Nov 11, 2022 at 06:22:21PM +0900, Daisuke Matsuda wrote: > This patch series implements the On-Demand Paging feature on SoftRoCE(rxe) > driver, which has been available only in mlx5 driver[1] so far. <...> > Daisuke Matsuda (7): > IB/mlx5: Change ib_umem_odp_map_dma_single_page() to retain umem_mutex > RDMA/rxe: Convert the triple tasklets to workqueues > RDMA/rxe: Cleanup code for responder Atomic operations > RDMA/rxe: Add page invalidation support > RDMA/rxe: Allow registering MRs for On-Demand Paging > RDMA/rxe: Add support for Send/Recv/Write/Read operations with ODP > RDMA/rxe: Add support for the traditional Atomic operations with ODP It is a shame that such cool feature is not progressing. RXE folks, can you please review it? Thanks
On Fri, Nov 18, 2022 5:34 PM Hillf Danton wrote: Hi Hillf, Thank you for taking a look. As I wrote in the cover letter, a large part of this patch shall be temporary, and Bob Pearson's workqueue implementation is likely to be adopted instead unless there are any problems with it. [PATCH for-next v3 00/13] Implement work queues for rdma_rxe Cf. https://lore.kernel.org/linux-rdma/20221029031009.64467-1-rpearsonhpe@gmail.com/ I appreciate your insightful comments. If his workqueue is rejected in the end, then I will fix them for submission. Otherwise, I am going to rebase my work onto his patches in the next version. Thanks, Daisuke > On 11 Nov 2022 18:22:23 +0900 Daisuke Matsuda <matsuda-daisuke@fujitsu.com> > > +/* > > + * this locking is due to a potential race where > > + * a second caller finds the work already running > > + * but looks just after the last call to func > > + */ > > +void rxe_do_work(struct work_struct *w) > > +{ > > + int cont; > > + int ret; > > + > > + struct rxe_work *work = container_of(w, typeof(*work), work); > > + unsigned int iterations = RXE_MAX_ITERATIONS; > > + > > + spin_lock_bh(&work->state_lock); > > + switch (work->state) { > > + case WQ_STATE_START: > > + work->state = WQ_STATE_BUSY; > > + spin_unlock_bh(&work->state_lock); > > + break; > > + > > + case WQ_STATE_BUSY: > > + work->state = WQ_STATE_ARMED; > > + fallthrough; > > + case WQ_STATE_ARMED: > > + spin_unlock_bh(&work->state_lock); > > + return; > > + > > + default: > > + spin_unlock_bh(&work->state_lock); > > + pr_warn("%s failed with bad state %d\n", __func__, work->state); > > + return; > > + } > > + > > + do { > > + cont = 0; > > + ret = work->func(work->arg); > > + > > + spin_lock_bh(&work->state_lock); > > + switch (work->state) { > > + case WQ_STATE_BUSY: > > + if (ret) { > > + work->state = WQ_STATE_START; > > + } else if (iterations--) { > > + cont = 1; > > + } else { > > + /* reschedule the work and exit > > + * the loop to give up the cpu > > + */ > > Unlike tasklet, workqueue work is unable to be a CPU hog with PREEMPT > enabled, otherwise cond_resched() is enough. > > > + queue_work(work->worker, &work->work); > > Nit, s/worker/workq/ for example as worker, work and workqueue are > different things in the domain of WQ. > > > + work->state = WQ_STATE_START; > > + } > > + break; > > + > > + /* someone tried to run the work since the last time we called > > + * func, so we will call one more time regardless of the > > + * return value > > + */ > > + case WQ_STATE_ARMED: > > + work->state = WQ_STATE_BUSY; > > + cont = 1; > > + break; > > + > > + default: > > + pr_warn("%s failed with bad state %d\n", __func__, > > + work->state); > > + } > > + spin_unlock_bh(&work->state_lock); > > + } while (cont); > > + > > + work->ret = ret; > > +} > > + > [...] > > +void rxe_run_work(struct rxe_work *work, int sched) > > +{ > > + if (work->destroyed) > > + return; > > + > > + /* busy-loop while qp reset is in progress */ > > + while (atomic_read(&work->suspended)) > > + continue; > > Feel free to add a one-line comment specifying the reasons for busy loop > instead of taking a nap, given it may take two seconds to flush WQ. > > > + > > + if (sched) > > + queue_work(work->worker, &work->work); > > + else > > + rxe_do_work(&work->work); > > +} > > + > > +void rxe_disable_work(struct rxe_work *work) > > +{ > > + atomic_inc(&work->suspended); > > + flush_workqueue(work->worker); > > +}