@@ -17,6 +17,12 @@
* mca is created. It holds a pointer to the qp and is added to a list
* of qp's that are attached to the mcg. The qp_list is used to replicate
* mcast packets in the rxe receive path.
+ *
+ * The highest performance operations are mca list traversal when
+ * processing incoming multicast packets which need to be fanned out
+ * to the attached qp's. This list is protected by RCU locking for read
+ * operations and a spinlock in the rxe_dev struct for write operations.
+ * The red-black tree is protected by the same spinlock.
*/
#include "rxe.h"
@@ -299,7 +305,7 @@ static void rxe_destroy_mcg(struct rxe_mcg *mcg)
* Returns: 0 on success else an error
*/
static int __rxe_init_mca(struct rxe_qp *qp, struct rxe_mcg *mcg,
- struct rxe_mca *mca)
+ struct rxe_mca *mca)
{
struct rxe_dev *rxe = to_rdev(qp->ibqp.device);
int n;
@@ -322,7 +328,12 @@ static int __rxe_init_mca(struct rxe_qp *qp, struct rxe_mcg *mcg,
rxe_add_ref(qp);
mca->qp = qp;
- list_add_tail(&mca->qp_list, &mcg->qp_list);
+ kref_get(&mcg->ref_cnt);
+ mca->mcg = mcg;
+
+ init_completion(&mca->complete);
+
+ list_add_tail_rcu(&mca->qp_list, &mcg->qp_list);
return 0;
}
@@ -343,14 +354,14 @@ static int rxe_attach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
int err;
/* check to see if the qp is already a member of the group */
- spin_lock_irqsave(&rxe->mcg_lock, flags);
- list_for_each_entry(mca, &mcg->qp_list, qp_list) {
+ rcu_read_lock();
+ list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) {
if (mca->qp == qp) {
- spin_unlock_irqrestore(&rxe->mcg_lock, flags);
+ rcu_read_unlock();
return 0;
}
}
- spin_unlock_irqrestore(&rxe->mcg_lock, flags);
+ rcu_read_unlock();
/* speculative alloc new mca without using GFP_ATOMIC */
mca = kzalloc(sizeof(*mca), GFP_KERNEL);
@@ -375,6 +386,20 @@ static int rxe_attach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
return err;
}
+/**
+ * __rxe_destroy_mca - free mca resources
+ * @head: rcu_head embedded in mca
+ */
+static void rxe_destroy_mca(struct rcu_head *head)
+{
+ struct rxe_mca *mca = container_of(head, typeof(*mca), rcu);
+
+ atomic_dec(&mca->qp->mcg_num);
+ rxe_drop_ref(mca->qp);
+
+ complete(&mca->complete);
+}
+
/**
* __rxe_cleanup_mca - cleanup mca object holding lock
* @mca: mca object
@@ -384,14 +409,12 @@ static int rxe_attach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
*/
static void __rxe_cleanup_mca(struct rxe_mca *mca, struct rxe_mcg *mcg)
{
- list_del(&mca->qp_list);
-
+ mca->mcg = NULL;
+ kref_put(&mcg->ref_cnt, rxe_cleanup_mcg);
atomic_dec(&mcg->qp_num);
atomic_dec(&mcg->rxe->mcg_attach);
- atomic_dec(&mca->qp->mcg_num);
- rxe_drop_ref(mca->qp);
- kfree(mca);
+ list_del_rcu(&mca->qp_list);
}
/**
@@ -404,11 +427,10 @@ static void __rxe_cleanup_mca(struct rxe_mca *mca, struct rxe_mcg *mcg)
static int rxe_detach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
{
struct rxe_dev *rxe = mcg->rxe;
- struct rxe_mca *mca, *tmp;
- unsigned long flags;
+ struct rxe_mca *mca;
- spin_lock_irqsave(&rxe->mcg_lock, flags);
- list_for_each_entry_safe(mca, tmp, &mcg->qp_list, qp_list) {
+ spin_lock_bh(&rxe->mcg_lock);
+ list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) {
if (mca->qp == qp) {
__rxe_cleanup_mca(mca, mcg);
@@ -420,14 +442,25 @@ static int rxe_detach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
*/
if (atomic_read(&mcg->qp_num) <= 0)
__rxe_destroy_mcg(mcg);
+ spin_unlock_bh(&rxe->mcg_lock);
+
+ /* schedule rxe_destroy_mca and then wait for
+ * completion before returning to rdma-core.
+ * Having an outstanding call_rcu() causes
+ * rdma-core to fail. It may be simpler to
+ * just call synchronize_rcu() and then
+ * rxe_destroy_rcu(), but this works OK.
+ */
+ call_rcu(&mca->rcu, rxe_destroy_mca);
+ wait_for_completion(&mca->complete);
+ kfree(mca);
- spin_unlock_irqrestore(&rxe->mcg_lock, flags);
return 0;
}
}
/* we didn't find the qp on the list */
- spin_unlock_irqrestore(&rxe->mcg_lock, flags);
+ spin_unlock_bh(&rxe->mcg_lock);
return -EINVAL;
}
@@ -265,15 +265,15 @@ static void rxe_rcv_mcast_pkt(struct rxe_dev *rxe, struct sk_buff *skb)
qp_array = kmalloc_array(nmax, sizeof(qp), GFP_KERNEL);
n = 0;
- spin_lock_bh(&rxe->mcg_lock);
- list_for_each_entry(mca, &mcg->qp_list, qp_list) {
+ rcu_read_lock();
+ list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) {
/* protect the qp pointers in the list */
rxe_add_ref(mca->qp);
qp_array[n++] = mca->qp;
if (n == nmax)
break;
}
- spin_unlock_bh(&rxe->mcg_lock);
+ rcu_read_unlock();
nmax = n;
kref_put(&mcg->ref_cnt, rxe_cleanup_mcg);
@@ -364,7 +364,10 @@ struct rxe_mcg {
struct rxe_mca {
struct list_head qp_list;
+ struct rcu_head rcu;
struct rxe_qp *qp;
+ struct rxe_mcg *mcg;
+ struct completion complete;
};
struct rxe_port {
Replace spinlock with rcu read locks for read side operations on mca in rxe_recv.c and rxe_mcast.c. Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com> --- drivers/infiniband/sw/rxe/rxe_mcast.c | 67 ++++++++++++++++++++------- drivers/infiniband/sw/rxe/rxe_recv.c | 6 +-- drivers/infiniband/sw/rxe/rxe_verbs.h | 3 ++ 3 files changed, 56 insertions(+), 20 deletions(-)