@@ -17,6 +17,12 @@
* mca is created. It holds a pointer to the qp and is added to a list
* of qp's that are attached to the mcg. The qp_list is used to replicate
* mcast packets in the rxe receive path.
+ *
+ * The highest performance operations are mca list traversal when
+ * processing incoming multicast packets which need to be fanned out
+ * to the attached qp's. This list is protected by RCU locking for read
+ * operations and a spinlock in the rxe_dev struct for write operations.
+ * The red-black tree is protected by the same spinlock.
*/
#include "rxe.h"
@@ -284,7 +290,7 @@ static void rxe_destroy_mcg(struct rxe_mcg *mcg)
}
/**
- * __rxe_init_mca - initialize a new mca holding lock
+ * __rxe_init_mca_rcu - initialize a new mca holding lock
* @qp: qp object
* @mcg: mcg object
* @mca: empty space for new mca
@@ -294,8 +300,8 @@ static void rxe_destroy_mcg(struct rxe_mcg *mcg)
*
* Returns: 0 on success else an error
*/
-static int __rxe_init_mca(struct rxe_qp *qp, struct rxe_mcg *mcg,
- struct rxe_mca *mca)
+static int __rxe_init_mca_rcu(struct rxe_qp *qp, struct rxe_mcg *mcg,
+ struct rxe_mca *mca)
{
struct rxe_dev *rxe = to_rdev(qp->ibqp.device);
int n;
@@ -318,7 +324,7 @@ static int __rxe_init_mca(struct rxe_qp *qp, struct rxe_mcg *mcg,
rxe_add_ref(qp);
mca->qp = qp;
- list_add_tail(&mca->qp_list, &mcg->qp_list);
+ list_add_tail_rcu(&mca->qp_list, &mcg->qp_list);
return 0;
}
@@ -338,14 +344,14 @@ static int rxe_attach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
int err;
/* check to see if the qp is already a member of the group */
- spin_lock_bh(&rxe->mcg_lock);
- list_for_each_entry(mca, &mcg->qp_list, qp_list) {
+ rcu_read_lock();
+ list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) {
if (mca->qp == qp) {
- spin_unlock_bh(&rxe->mcg_lock);
+ rcu_read_unlock();
return 0;
}
}
- spin_unlock_bh(&rxe->mcg_lock);
+ rcu_read_unlock();
/* speculative alloc new mca without using GFP_ATOMIC */
mca = kzalloc(sizeof(*mca), GFP_KERNEL);
@@ -362,7 +368,7 @@ static int rxe_attach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
}
}
- err = __rxe_init_mca(qp, mcg, mca);
+ err = __rxe_init_mca_rcu(qp, mcg, mca);
if (err)
kfree(mca);
out:
@@ -371,22 +377,22 @@ static int rxe_attach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
}
/**
- * __rxe_cleanup_mca - cleanup mca object holding lock
+ * __rxe_cleanup_mca_rcu - cleanup mca object holding lock
* @mca: mca object
* @mcg: mcg object
*
* Context: caller must hold a reference to mcg and rxe->mcg_lock
*/
-static void __rxe_cleanup_mca(struct rxe_mca *mca, struct rxe_mcg *mcg)
+static void __rxe_cleanup_mca_rcu(struct rxe_mca *mca, struct rxe_mcg *mcg)
{
- list_del(&mca->qp_list);
+ list_del_rcu(&mca->qp_list);
atomic_dec(&mcg->qp_num);
atomic_dec(&mcg->rxe->mcg_attach);
atomic_dec(&mca->qp->mcg_num);
rxe_drop_ref(mca->qp);
- kfree(mca);
+ kfree_rcu(mca);
}
/**
@@ -399,30 +405,35 @@ static void __rxe_cleanup_mca(struct rxe_mca *mca, struct rxe_mcg *mcg)
static int rxe_detach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
{
struct rxe_dev *rxe = mcg->rxe;
- struct rxe_mca *mca, *tmp;
+ struct rxe_mca *mca;
+ int ret;
spin_lock_bh(&rxe->mcg_lock);
- list_for_each_entry_safe(mca, tmp, &mcg->qp_list, qp_list) {
- if (mca->qp == qp) {
- __rxe_cleanup_mca(mca, mcg);
-
- /* if the number of qp's attached to the
- * mcast group falls to zero go ahead and
- * tear it down. This will not free the
- * object since we are still holding a ref
- * from the caller
- */
- if (atomic_read(&mcg->qp_num) <= 0)
- __rxe_destroy_mcg(mcg);
-
- spin_unlock_bh(&rxe->mcg_lock);
- return 0;
- }
+ list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) {
+ if (mca->qp == qp)
+ goto found;
}
/* we didn't find the qp on the list */
+ ret = -EINVAL;
+ goto done;
+
+found:
+ __rxe_cleanup_mca_rcu(mca, mcg);
+
+ /* if the number of qp's attached to the
+ * mcast group falls to zero go ahead and
+ * tear it down. This will not free the
+ * object since we are still holding a ref
+ * from the caller
+ */
+ if (atomic_read(&mcg->qp_num) <= 0)
+ __rxe_destroy_mcg(mcg);
+
+ ret = 0;
+done:
spin_unlock_bh(&rxe->mcg_lock);
- return -EINVAL;
+ return ret;
}
/**
@@ -265,15 +265,15 @@ static void rxe_rcv_mcast_pkt(struct rxe_dev *rxe, struct sk_buff *skb)
qp_array = kmalloc_array(nmax, sizeof(qp), GFP_KERNEL);
n = 0;
- spin_lock_bh(&rxe->mcg_lock);
- list_for_each_entry(mca, &mcg->qp_list, qp_list) {
+ rcu_read_lock();
+ list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) {
/* protect the qp pointers in the list */
rxe_add_ref(mca->qp);
qp_array[n++] = mca->qp;
if (n == nmax)
break;
}
- spin_unlock_bh(&rxe->mcg_lock);
+ rcu_read_unlock();
nmax = n;
kref_put(&mcg->ref_cnt, rxe_cleanup_mcg);
Replace spinlock with rcu read locks for read side operations on mca in rxe_recv.c and rxe_mcast.c. Use rcu list extensions on write side operations and use spinlock to separate write threads. Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com> --- drivers/infiniband/sw/rxe/rxe_mcast.c | 73 +++++++++++++++------------ drivers/infiniband/sw/rxe/rxe_recv.c | 6 +-- 2 files changed, 45 insertions(+), 34 deletions(-)