diff mbox series

[v2,3/5] nvmet-rdma: use SRQ per completion vector

Message ID 20200318150257.198402-4-maxg@mellanox.com (mailing list archive)
State Not Applicable
Headers show
Series nvmet-rdma/srpt: SRQ per completion vector | expand

Commit Message

Max Gurtovoy March 18, 2020, 3:02 p.m. UTC
In order to save resource allocation and utilize the completion
locality in a better way (compared to SRQ per device that exist today),
allocate Shared Receive Queues (SRQs) per completion vector. Associate
each created QP/CQ with an appropriate SRQ according to the queue index.
This association will reduce the lock contention in the fast path
(compared to SRQ per device solution) and increase the locality in
memory buffers. Add new module parameter for SRQ size to adjust it
according to the expected load. User should make sure the size is >= 256
to avoid lack of resources.

Signed-off-by: Max Gurtovoy <maxg@mellanox.com>
---
 drivers/nvme/target/rdma.c | 205 ++++++++++++++++++++++++++++++++++-----------
 1 file changed, 154 insertions(+), 51 deletions(-)

Comments

Bart Van Assche March 19, 2020, 4:09 a.m. UTC | #1
On 2020-03-18 08:02, Max Gurtovoy wrote:
> In order to save resource allocation and utilize the completion
                   ^^^^^^^^^^^^^^^^^^^
                   resources?

> +static int nvmet_rdma_srq_size = 1024;
> +module_param_cb(srq_size, &srq_size_ops, &nvmet_rdma_srq_size, 0644);
> +MODULE_PARM_DESC(srq_size, "set Shared Receive Queue (SRQ) size, should >= 256 (default: 1024)");

Is an SRQ overflow fatal? Isn't the SRQ size something that should be
computed by the nvmet_rdma driver such that SRQ overflows do not happen?

Thanks,

Bart.
Max Gurtovoy March 19, 2020, 9:15 a.m. UTC | #2
On 3/19/2020 6:09 AM, Bart Van Assche wrote:
> On 2020-03-18 08:02, Max Gurtovoy wrote:
>> In order to save resource allocation and utilize the completion
>                     ^^^^^^^^^^^^^^^^^^^
>                     resources?

thanks.


>
>> +static int nvmet_rdma_srq_size = 1024;
>> +module_param_cb(srq_size, &srq_size_ops, &nvmet_rdma_srq_size, 0644);
>> +MODULE_PARM_DESC(srq_size, "set Shared Receive Queue (SRQ) size, should >= 256 (default: 1024)");
> Is an SRQ overflow fatal? Isn't the SRQ size something that should be
> computed by the nvmet_rdma driver such that SRQ overflows do not happen?

I've added the following code to make sure that the size is not greater 
than device capability:

+ndev->srq_size = min(ndev->device->attrs.max_srq_wr,
+                            nvmet_rdma_srq_size);

In case the SRQ doesn't have enough credits it will send rnr to the 
initiator and the initiator will retry later on.

We might need to change the min_rnr_timer during the connection 
establishment, but still didn't see the need for it.

-Max

>
> Thanks,
>
> Bart.
Jason Gunthorpe March 19, 2020, 11:56 a.m. UTC | #3
On Thu, Mar 19, 2020 at 11:15:50AM +0200, Max Gurtovoy wrote:
> 
> On 3/19/2020 6:09 AM, Bart Van Assche wrote:
> > On 2020-03-18 08:02, Max Gurtovoy wrote:
> > > In order to save resource allocation and utilize the completion
> >                     ^^^^^^^^^^^^^^^^^^^
> >                     resources?
> 
> thanks.
> 
> 
> > 
> > > +static int nvmet_rdma_srq_size = 1024;
> > > +module_param_cb(srq_size, &srq_size_ops, &nvmet_rdma_srq_size, 0644);
> > > +MODULE_PARM_DESC(srq_size, "set Shared Receive Queue (SRQ) size, should >= 256 (default: 1024)");
> > Is an SRQ overflow fatal? Isn't the SRQ size something that should be
> > computed by the nvmet_rdma driver such that SRQ overflows do not happen?
> 
> I've added the following code to make sure that the size is not greater than
> device capability:
> 
> +ndev->srq_size = min(ndev->device->attrs.max_srq_wr,
> +                            nvmet_rdma_srq_size);
> 
> In case the SRQ doesn't have enough credits it will send rnr to the
> initiator and the initiator will retry later on.

This is a pretty big change, in bad cases we could significantly
overflow the srq space available...

A big part of most verbs protocols to ensure that the RQ does not
overflow.

Are we sure it is OK? With all initiator/targets out there?

Jason
Max Gurtovoy March 19, 2020, 12:48 p.m. UTC | #4
On 3/19/2020 1:56 PM, Jason Gunthorpe wrote:
> On Thu, Mar 19, 2020 at 11:15:50AM +0200, Max Gurtovoy wrote:
>> On 3/19/2020 6:09 AM, Bart Van Assche wrote:
>>> On 2020-03-18 08:02, Max Gurtovoy wrote:
>>>> In order to save resource allocation and utilize the completion
>>>                      ^^^^^^^^^^^^^^^^^^^
>>>                      resources?
>> thanks.
>>
>>
>>>> +static int nvmet_rdma_srq_size = 1024;
>>>> +module_param_cb(srq_size, &srq_size_ops, &nvmet_rdma_srq_size, 0644);
>>>> +MODULE_PARM_DESC(srq_size, "set Shared Receive Queue (SRQ) size, should >= 256 (default: 1024)");
>>> Is an SRQ overflow fatal? Isn't the SRQ size something that should be
>>> computed by the nvmet_rdma driver such that SRQ overflows do not happen?
>> I've added the following code to make sure that the size is not greater than
>> device capability:
>>
>> +ndev->srq_size = min(ndev->device->attrs.max_srq_wr,
>> +                            nvmet_rdma_srq_size);
>>
>> In case the SRQ doesn't have enough credits it will send rnr to the
>> initiator and the initiator will retry later on.
> This is a pretty big change, in bad cases we could significantly
> overflow the srq space available...
>
> A big part of most verbs protocols to ensure that the RQ does not
> overflow.
>
> Are we sure it is OK? With all initiator/targets out there?

IMO if we set the srq size to utilize the wire so we're good.

So the best we can do is decrease the rnr timer to re-transmit faster - 
I've patches for that as well.

Let me know if you prefer I'll sent it in v3.

Nevertheless, this situation is better from the current SRQ per HCA 
implementation.


>
> Jason
Jason Gunthorpe March 19, 2020, 1:53 p.m. UTC | #5
On Thu, Mar 19, 2020 at 02:48:20PM +0200, Max Gurtovoy wrote:

> Nevertheless, this situation is better from the current SRQ per HCA
> implementation.

nvme/srp/etc already use srq? I see it in the target but not initiator?

Just worried about breaking some weird target somewhere

Jason
Bart Van Assche March 19, 2020, 2:49 p.m. UTC | #6
On 3/19/20 6:53 AM, Jason Gunthorpe wrote:
> On Thu, Mar 19, 2020 at 02:48:20PM +0200, Max Gurtovoy wrote:
> 
>> Nevertheless, this situation is better from the current SRQ per HCA
>> implementation.
> 
> nvme/srp/etc already use srq? I see it in the target but not initiator?
> 
> Just worried about breaking some weird target somewhere

 From the upstream SRP target driver:

static void srpt_get_ioc(struct srpt_port *sport, u32 slot,
			 struct ib_dm_mad *mad)
{
	[ ... ]
	if (sdev->use_srq)
		send_queue_depth = sdev->srq_size;
	else
		send_queue_depth = min(MAX_SRPT_RQ_SIZE,
				       sdev->device->attrs.max_qp_wr);
	[ ... ]
	iocp->send_queue_depth = cpu_to_be16(send_queue_depth);
	[ ... ]
}

I'm not sure the SRP initiator uses that data from the device management
I/O controller profile.

Anyway, with one SRQ per initiator it is possible for the initiator to
prevent SRQ overflows. I don't think that it is possible for an initiator
to prevent target side SRQ overflows if shared receive queues are shared
across multiple initiators.

Thanks,

Bart.
Max Gurtovoy March 19, 2020, 4:27 p.m. UTC | #7
and again - hopefully last time I'm blocked...

On 3/19/2020 5:44 PM, Max Gurtovoy wrote:
>
> sending again - I think the previous mail was blocked.
>
> On 3/19/2020 5:10 PM, Max Gurtovoy wrote:
>>
>>
>> On 3/19/2020 4:49 PM, Bart Van Assche wrote:
>>> On 3/19/20 6:53 AM, Jason Gunthorpe wrote:
>>>> On Thu, Mar 19, 2020 at 02:48:20PM +0200, Max Gurtovoy wrote:
>>>>
>>>>> Nevertheless, this situation is better from the current SRQ per HCA
>>>>> implementation.
>>>>
>>>> nvme/srp/etc already use srq? I see it in the target but not 
>>>> initiator?
>>>>
>>>> Just worried about breaking some weird target somewhere
>>
>> Jason,
>>
>> The feature is only for target side and has no influence on initiator 
>> an the ULP level.
>>
>> The only thing that I did is fixed the SRQ implementation for 
>> nvme/rdma and srpt that allocate 1 SRQ per device.
>>
>> Now there are N SRQs per device that try to act as pure MQ 
>> implementation (without SRQ).
>>
>>>
>>> From the upstream SRP target driver:
>>>
>>> static void srpt_get_ioc(struct srpt_port *sport, u32 slot,
>>>              struct ib_dm_mad *mad)
>>> {
>>>     [ ... ]
>>>     if (sdev->use_srq)
>>>         send_queue_depth = sdev->srq_size;
>>>     else
>>>         send_queue_depth = min(MAX_SRPT_RQ_SIZE,
>>>                        sdev->device->attrs.max_qp_wr);
>>>     [ ... ]
>>>     iocp->send_queue_depth = cpu_to_be16(send_queue_depth);
>>>     [ ... ]
>>> }
>>>
>>> I'm not sure the SRP initiator uses that data from the device 
>>> management
>>> I/O controller profile.
>>>
>>> Anyway, with one SRQ per initiator it is possible for the initiator to
>>> prevent SRQ overflows. I don't think that it is possible for an 
>>> initiator
>>> to prevent target side SRQ overflows if shared receive queues are 
>>> shared
>>> across multiple initiators.
>>
>> I don't to change initiator code and prevent overflow.
>>
>> As I explained earlier, the SRQs in the target side will be assigned 
>> for all controllers for specific device (instead of global 1 per 
>> device) and share the receive buffers.
>>
>> Not per initiator. This will cause lock contention.
>>
>> In case the target SRQ has no resources in the specific time, the low 
>> level (RC qp) is responsible to send rnr to the initiator and the 
>> initiator (RC qp) will retry in the transport layer and not in the ULP.
>>
>> This is set by min_rnr_timer value that by default set to 0 (max 
>> value).For SRQ case in general, IMO better to set it to 1 (minimal 
>> value) to avoid longer latency since there is a chance that SRQ is full.
>>
>> In my testing I didn't see a real need to set the min_rnr_timer but I 
>> have patches for that in case Jason thinks that this should be part 
>> of this series that is not so small without it.
>>
>>
>>>
>>> Thanks,
>>>
>>> Bart.
Sagi Grimberg March 20, 2020, 5:47 a.m. UTC | #8
> +static void nvmet_rdma_destroy_srqs(struct nvmet_rdma_device *ndev)
>   {
> -	if (!ndev->srq)
> +	int i;
> +
> +	if (!ndev->srqs)
>   		return;
>   
> -	nvmet_rdma_free_cmds(ndev, ndev->srq_cmds, ndev->srq_size, false);
> -	ib_destroy_srq(ndev->srq);
> +	for (i = 0; i < ndev->srq_count; i++)
> +		nvmet_rdma_destroy_srq(ndev->srqs[i]);
> +
> +	rdma_srq_pool_destroy(ndev->pd);
> +	kfree(ndev->srqs);
> +	ndev->srqs = NULL;
> +	ndev->srq_count = 0;
> +	ndev->srq_size = 0;

What is the point assigning these?
diff mbox series

Patch

diff --git a/drivers/nvme/target/rdma.c b/drivers/nvme/target/rdma.c
index 04062af..e415128 100644
--- a/drivers/nvme/target/rdma.c
+++ b/drivers/nvme/target/rdma.c
@@ -18,6 +18,7 @@ 
 #include <asm/unaligned.h>
 
 #include <rdma/ib_verbs.h>
+#include <rdma/srq_pool.h>
 #include <rdma/rdma_cm.h>
 #include <rdma/rw.h>
 
@@ -31,6 +32,8 @@ 
 #define NVMET_RDMA_MAX_INLINE_SGE		4
 #define NVMET_RDMA_MAX_INLINE_DATA_SIZE		max_t(int, SZ_16K, PAGE_SIZE)
 
+struct nvmet_rdma_srq;
+
 struct nvmet_rdma_cmd {
 	struct ib_sge		sge[NVMET_RDMA_MAX_INLINE_SGE + 1];
 	struct ib_cqe		cqe;
@@ -38,7 +41,7 @@  struct nvmet_rdma_cmd {
 	struct scatterlist	inline_sg[NVMET_RDMA_MAX_INLINE_SGE];
 	struct nvme_command     *nvme_cmd;
 	struct nvmet_rdma_queue	*queue;
-	struct ib_srq		*srq;
+	struct nvmet_rdma_srq   *nsrq;
 };
 
 enum {
@@ -80,6 +83,7 @@  struct nvmet_rdma_queue {
 	struct ib_cq		*cq;
 	atomic_t		sq_wr_avail;
 	struct nvmet_rdma_device *dev;
+	struct nvmet_rdma_srq   *nsrq;
 	spinlock_t		state_lock;
 	enum nvmet_rdma_queue_state state;
 	struct nvmet_cq		nvme_cq;
@@ -97,17 +101,24 @@  struct nvmet_rdma_queue {
 
 	int			idx;
 	int			host_qid;
+	int			comp_vector;
 	int			recv_queue_size;
 	int			send_queue_size;
 
 	struct list_head	queue_list;
 };
 
+struct nvmet_rdma_srq {
+	struct ib_srq            *srq;
+	struct nvmet_rdma_cmd    *cmds;
+	struct nvmet_rdma_device *ndev;
+};
+
 struct nvmet_rdma_device {
 	struct ib_device	*device;
 	struct ib_pd		*pd;
-	struct ib_srq		*srq;
-	struct nvmet_rdma_cmd	*srq_cmds;
+	struct nvmet_rdma_srq	**srqs;
+	int			srq_count;
 	size_t			srq_size;
 	struct kref		ref;
 	struct list_head	entry;
@@ -119,6 +130,16 @@  struct nvmet_rdma_device {
 module_param_named(use_srq, nvmet_rdma_use_srq, bool, 0444);
 MODULE_PARM_DESC(use_srq, "Use shared receive queue.");
 
+static int srq_size_set(const char *val, const struct kernel_param *kp);
+static const struct kernel_param_ops srq_size_ops = {
+	.set = srq_size_set,
+	.get = param_get_int,
+};
+
+static int nvmet_rdma_srq_size = 1024;
+module_param_cb(srq_size, &srq_size_ops, &nvmet_rdma_srq_size, 0644);
+MODULE_PARM_DESC(srq_size, "set Shared Receive Queue (SRQ) size, should >= 256 (default: 1024)");
+
 static DEFINE_IDA(nvmet_rdma_queue_ida);
 static LIST_HEAD(nvmet_rdma_queue_list);
 static DEFINE_MUTEX(nvmet_rdma_queue_mutex);
@@ -139,6 +160,17 @@  static int nvmet_rdma_alloc_rsp(struct nvmet_rdma_device *ndev,
 
 static const struct nvmet_fabrics_ops nvmet_rdma_ops;
 
+static int srq_size_set(const char *val, const struct kernel_param *kp)
+{
+	int n = 0, ret;
+
+	ret = kstrtoint(val, 10, &n);
+	if (ret != 0 || n < 256)
+		return -EINVAL;
+
+	return param_set_int(val, kp);
+}
+
 static int num_pages(int len)
 {
 	return 1 + (((len - 1) & PAGE_MASK) >> PAGE_SHIFT);
@@ -462,8 +494,8 @@  static int nvmet_rdma_post_recv(struct nvmet_rdma_device *ndev,
 		cmd->sge[0].addr, cmd->sge[0].length,
 		DMA_FROM_DEVICE);
 
-	if (cmd->srq)
-		ret = ib_post_srq_recv(cmd->srq, &cmd->wr, NULL);
+	if (cmd->nsrq)
+		ret = ib_post_srq_recv(cmd->nsrq->srq, &cmd->wr, NULL);
 	else
 		ret = ib_post_recv(cmd->queue->cm_id->qp, &cmd->wr, NULL);
 
@@ -841,30 +873,83 @@  static void nvmet_rdma_recv_done(struct ib_cq *cq, struct ib_wc *wc)
 	nvmet_rdma_handle_command(queue, rsp);
 }
 
-static void nvmet_rdma_destroy_srq(struct nvmet_rdma_device *ndev)
+static void nvmet_rdma_destroy_srq(struct nvmet_rdma_srq *nsrq)
+{
+	nvmet_rdma_free_cmds(nsrq->ndev, nsrq->cmds, nsrq->ndev->srq_size,
+			     false);
+	rdma_srq_pool_put(nsrq->ndev->pd, nsrq->srq);
+
+	kfree(nsrq);
+}
+
+static void nvmet_rdma_destroy_srqs(struct nvmet_rdma_device *ndev)
 {
-	if (!ndev->srq)
+	int i;
+
+	if (!ndev->srqs)
 		return;
 
-	nvmet_rdma_free_cmds(ndev, ndev->srq_cmds, ndev->srq_size, false);
-	ib_destroy_srq(ndev->srq);
+	for (i = 0; i < ndev->srq_count; i++)
+		nvmet_rdma_destroy_srq(ndev->srqs[i]);
+
+	rdma_srq_pool_destroy(ndev->pd);
+	kfree(ndev->srqs);
+	ndev->srqs = NULL;
+	ndev->srq_count = 0;
+	ndev->srq_size = 0;
 }
 
-static int nvmet_rdma_init_srq(struct nvmet_rdma_device *ndev)
+static struct nvmet_rdma_srq *
+nvmet_rdma_init_srq(struct nvmet_rdma_device *ndev)
 {
-	struct ib_srq_init_attr srq_attr = { NULL, };
+	size_t srq_size = ndev->srq_size;
+	struct nvmet_rdma_srq *nsrq;
 	struct ib_srq *srq;
-	size_t srq_size;
 	int ret, i;
 
-	srq_size = 4095;	/* XXX: tune */
+	nsrq = kzalloc(sizeof(*nsrq), GFP_KERNEL);
+	if (!nsrq)
+		return ERR_PTR(-ENOMEM);
 
-	srq_attr.attr.max_wr = srq_size;
-	srq_attr.attr.max_sge = 1 + ndev->inline_page_count;
-	srq_attr.attr.srq_limit = 0;
-	srq_attr.srq_type = IB_SRQT_BASIC;
-	srq = ib_create_srq(ndev->pd, &srq_attr);
-	if (IS_ERR(srq)) {
+	srq = rdma_srq_pool_get(ndev->pd);
+	if (!srq) {
+		ret = -EAGAIN;
+		goto out_free_nsrq;
+	}
+
+	nsrq->cmds = nvmet_rdma_alloc_cmds(ndev, srq_size, false);
+	if (IS_ERR(nsrq->cmds)) {
+		ret = PTR_ERR(nsrq->cmds);
+		goto out_put_srq;
+	}
+
+	nsrq->srq = srq;
+	nsrq->ndev = ndev;
+
+	for (i = 0; i < srq_size; i++) {
+		nsrq->cmds[i].nsrq = nsrq;
+		ret = nvmet_rdma_post_recv(ndev, &nsrq->cmds[i]);
+		if (ret)
+			goto out_free_cmds;
+	}
+
+	return nsrq;
+
+out_free_cmds:
+	nvmet_rdma_free_cmds(ndev, nsrq->cmds, srq_size, false);
+out_put_srq:
+	rdma_srq_pool_put(ndev->pd, srq);
+out_free_nsrq:
+	kfree(nsrq);
+	return ERR_PTR(ret);
+}
+
+static int nvmet_rdma_init_srqs(struct nvmet_rdma_device *ndev)
+{
+	struct ib_srq_init_attr srq_attr = { NULL, };
+	int i, ret;
+
+	if (!ndev->device->attrs.max_srq_wr || !ndev->device->attrs.max_srq) {
 		/*
 		 * If SRQs aren't supported we just go ahead and use normal
 		 * non-shared receive queues.
@@ -873,31 +958,44 @@  static int nvmet_rdma_init_srq(struct nvmet_rdma_device *ndev)
 		return 0;
 	}
 
-	ndev->srq_cmds = nvmet_rdma_alloc_cmds(ndev, srq_size, false);
-	if (IS_ERR(ndev->srq_cmds)) {
-		ret = PTR_ERR(ndev->srq_cmds);
-		goto out_destroy_srq;
-	}
+	ndev->srq_size = min(ndev->device->attrs.max_srq_wr,
+			     nvmet_rdma_srq_size);
+	ndev->srq_count = min(ndev->device->num_comp_vectors,
+			      ndev->device->attrs.max_srq);
 
-	ndev->srq = srq;
-	ndev->srq_size = srq_size;
+	ndev->srqs = kcalloc(ndev->srq_count, sizeof(*ndev->srqs), GFP_KERNEL);
+	if (!ndev->srqs)
+		return -ENOMEM;
 
-	for (i = 0; i < srq_size; i++) {
-		ndev->srq_cmds[i].srq = srq;
-		ret = nvmet_rdma_post_recv(ndev, &ndev->srq_cmds[i]);
-		if (ret)
-			goto out_free_cmds;
+	srq_attr.attr.max_wr = ndev->srq_size;
+	srq_attr.attr.max_sge = 2;
+	srq_attr.attr.srq_limit = 0;
+	srq_attr.srq_type = IB_SRQT_BASIC;
+	ret = rdma_srq_pool_init(ndev->pd, ndev->srq_count, &srq_attr);
+	if (ret)
+		goto err_free;
+
+	for (i = 0; i < ndev->srq_count; i++) {
+		ndev->srqs[i] = nvmet_rdma_init_srq(ndev);
+		if (IS_ERR(ndev->srqs[i]))
+			goto err_srq;
 	}
 
 	return 0;
 
-out_free_cmds:
-	nvmet_rdma_free_cmds(ndev, ndev->srq_cmds, ndev->srq_size, false);
-out_destroy_srq:
-	ib_destroy_srq(srq);
+err_srq:
+	while (--i >= 0)
+		nvmet_rdma_destroy_srq(ndev->srqs[i]);
+	rdma_srq_pool_destroy(ndev->pd);
+err_free:
+	kfree(ndev->srqs);
+	ndev->srqs = NULL;
+	ndev->srq_count = 0;
+	ndev->srq_size = 0;
 	return ret;
 }
 
+
 static void nvmet_rdma_free_dev(struct kref *ref)
 {
 	struct nvmet_rdma_device *ndev =
@@ -907,7 +1005,7 @@  static void nvmet_rdma_free_dev(struct kref *ref)
 	list_del(&ndev->entry);
 	mutex_unlock(&device_list_mutex);
 
-	nvmet_rdma_destroy_srq(ndev);
+	nvmet_rdma_destroy_srqs(ndev);
 	ib_dealloc_pd(ndev->pd);
 
 	kfree(ndev);
@@ -953,7 +1051,7 @@  static void nvmet_rdma_free_dev(struct kref *ref)
 		goto out_free_dev;
 
 	if (nvmet_rdma_use_srq) {
-		ret = nvmet_rdma_init_srq(ndev);
+		ret = nvmet_rdma_init_srqs(ndev);
 		if (ret)
 			goto out_free_pd;
 	}
@@ -977,14 +1075,8 @@  static int nvmet_rdma_create_queue_ib(struct nvmet_rdma_queue *queue)
 {
 	struct ib_qp_init_attr qp_attr;
 	struct nvmet_rdma_device *ndev = queue->dev;
-	int comp_vector, nr_cqe, ret, i;
+	int nr_cqe, ret, i;
 
-	/*
-	 * Spread the io queues across completion vectors,
-	 * but still keep all admin queues on vector 0.
-	 */
-	comp_vector = !queue->host_qid ? 0 :
-		queue->idx % ndev->device->num_comp_vectors;
 
 	/*
 	 * Reserve CQ slots for RECV + RDMA_READ/RDMA_WRITE + RDMA_SEND.
@@ -992,7 +1084,7 @@  static int nvmet_rdma_create_queue_ib(struct nvmet_rdma_queue *queue)
 	nr_cqe = queue->recv_queue_size + 2 * queue->send_queue_size;
 
 	queue->cq = ib_alloc_cq(ndev->device, queue,
-			nr_cqe + 1, comp_vector,
+			nr_cqe + 1, queue->comp_vector,
 			IB_POLL_WORKQUEUE);
 	if (IS_ERR(queue->cq)) {
 		ret = PTR_ERR(queue->cq);
@@ -1014,8 +1106,8 @@  static int nvmet_rdma_create_queue_ib(struct nvmet_rdma_queue *queue)
 	qp_attr.cap.max_send_sge = max(ndev->device->attrs.max_sge_rd,
 					ndev->device->attrs.max_send_sge);
 
-	if (ndev->srq) {
-		qp_attr.srq = ndev->srq;
+	if (queue->nsrq) {
+		qp_attr.srq = queue->nsrq->srq;
 	} else {
 		/* +1 for drain */
 		qp_attr.cap.max_recv_wr = 1 + queue->recv_queue_size;
@@ -1034,7 +1126,7 @@  static int nvmet_rdma_create_queue_ib(struct nvmet_rdma_queue *queue)
 		 __func__, queue->cq->cqe, qp_attr.cap.max_send_sge,
 		 qp_attr.cap.max_send_wr, queue->cm_id);
 
-	if (!ndev->srq) {
+	if (!queue->nsrq) {
 		for (i = 0; i < queue->recv_queue_size; i++) {
 			queue->cmds[i].queue = queue;
 			ret = nvmet_rdma_post_recv(ndev, &queue->cmds[i]);
@@ -1070,7 +1162,7 @@  static void nvmet_rdma_free_queue(struct nvmet_rdma_queue *queue)
 	nvmet_sq_destroy(&queue->nvme_sq);
 
 	nvmet_rdma_destroy_queue_ib(queue);
-	if (!queue->dev->srq) {
+	if (!queue->nsrq) {
 		nvmet_rdma_free_cmds(queue->dev, queue->cmds,
 				queue->recv_queue_size,
 				!queue->host_qid);
@@ -1182,13 +1274,22 @@  static int nvmet_rdma_cm_reject(struct rdma_cm_id *cm_id,
 		goto out_destroy_sq;
 	}
 
+	/*
+	 * Spread the io queues across completion vectors,
+	 * but still keep all admin queues on vector 0.
+	 */
+	queue->comp_vector = !queue->host_qid ? 0 :
+		queue->idx % ndev->device->num_comp_vectors;
+
 	ret = nvmet_rdma_alloc_rsps(queue);
 	if (ret) {
 		ret = NVME_RDMA_CM_NO_RSC;
 		goto out_ida_remove;
 	}
 
-	if (!ndev->srq) {
+	if (ndev->srqs) {
+		queue->nsrq = ndev->srqs[queue->comp_vector % ndev->srq_count];
+	} else {
 		queue->cmds = nvmet_rdma_alloc_cmds(ndev,
 				queue->recv_queue_size,
 				!queue->host_qid);
@@ -1209,10 +1310,12 @@  static int nvmet_rdma_cm_reject(struct rdma_cm_id *cm_id,
 	return queue;
 
 out_free_cmds:
-	if (!ndev->srq) {
+	if (!queue->nsrq) {
 		nvmet_rdma_free_cmds(queue->dev, queue->cmds,
 				queue->recv_queue_size,
 				!queue->host_qid);
+	} else {
+		queue->nsrq = NULL;
 	}
 out_free_responses:
 	nvmet_rdma_free_rsps(queue);