@@ -170,8 +170,6 @@ struct svc_rdma_chunk_ctxt {
struct list_head cc_rwctxts;
ktime_t cc_posttime;
int cc_sqecount;
- enum ib_wc_status cc_status;
- struct completion cc_done;
};
struct svc_rdma_recv_ctxt {
@@ -191,6 +189,7 @@ struct svc_rdma_recv_ctxt {
unsigned int rc_pageoff;
unsigned int rc_curpage;
unsigned int rc_readbytes;
+ struct xdr_buf rc_saved_arg;
struct svc_rdma_chunk_ctxt rc_cc;
struct svc_rdma_pcl rc_call_pcl;
@@ -240,6 +239,9 @@ extern int svc_rdma_recvfrom(struct svc_rqst *);
extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma);
extern void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
struct svc_rdma_chunk_ctxt *cc);
+extern void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
+ struct svc_rdma_chunk_ctxt *cc,
+ enum dma_data_direction dir);
extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
const struct svc_rdma_chunk *chunk,
const struct xdr_buf *xdr);
@@ -214,6 +214,8 @@ struct svc_rdma_recv_ctxt *svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
struct svc_rdma_recv_ctxt *ctxt)
{
+ svc_rdma_cc_release(rdma, &ctxt->rc_cc, DMA_FROM_DEVICE);
+
/* @rc_page_count is normally zero here, but error flows
* can leave pages in @rc_pages.
*/
@@ -870,6 +872,7 @@ static noinline void svc_rdma_read_complete(struct svc_rqst *rqstp,
* procedure for that depends on what kind of RPC/RDMA
* chunks were provided by the client.
*/
+ rqstp->rq_arg = ctxt->rc_saved_arg;
if (pcl_is_empty(&ctxt->rc_call_pcl)) {
if (ctxt->rc_read_pcl.cl_count == 1)
svc_rdma_read_complete_one(rqstp, ctxt);
@@ -930,7 +933,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_read_complete_q);
if (ctxt) {
list_del(&ctxt->rc_list);
- spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
+ spin_unlock(&rdma_xprt->sc_rq_dto_lock);
+ svc_xprt_received(xprt);
svc_rdma_read_complete(rqstp, ctxt);
goto complete;
}
@@ -965,11 +969,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
svc_rdma_get_inv_rkey(rdma_xprt, ctxt);
if (!pcl_is_empty(&ctxt->rc_read_pcl) ||
- !pcl_is_empty(&ctxt->rc_call_pcl)) {
- ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt);
- if (ret < 0)
- goto out_readfail;
- }
+ !pcl_is_empty(&ctxt->rc_call_pcl))
+ goto out_readlist;
complete:
rqstp->rq_xprt_ctxt = ctxt;
@@ -983,12 +984,23 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
return 0;
-out_readfail:
- if (ret == -EINVAL)
- svc_rdma_send_error(rdma_xprt, ctxt, ret);
- svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
- svc_xprt_deferred_close(xprt);
- return -ENOTCONN;
+out_readlist:
+ /* This @rqstp is about to be recycled. Save the work
+ * already done constructing the Call message in rq_arg
+ * so it can be restored when the RDMA Reads have
+ * completed.
+ */
+ ctxt->rc_saved_arg = rqstp->rq_arg;
+
+ ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt);
+ if (ret < 0) {
+ if (ret == -EINVAL)
+ svc_rdma_send_error(rdma_xprt, ctxt, ret);
+ svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
+ svc_xprt_deferred_close(xprt);
+ return ret;
+ }
+ return 0;
out_backchannel:
svc_rdma_handle_bc_reply(rqstp, ctxt);
@@ -163,14 +163,15 @@ void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
cc->cc_sqecount = 0;
}
-/*
- * The consumed rw_ctx's are cleaned and placed on a local llist so
- * that only one atomic llist operation is needed to put them all
- * back on the free list.
+/**
+ * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt
+ * @rdma: controlling transport instance
+ * @cc: svc_rdma_chunk_ctxt to be released
+ * @dir: DMA direction
*/
-static void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
- struct svc_rdma_chunk_ctxt *cc,
- enum dma_data_direction dir)
+void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
+ struct svc_rdma_chunk_ctxt *cc,
+ enum dma_data_direction dir)
{
struct llist_node *first, *last;
struct svc_rdma_rw_ctxt *ctxt;
@@ -300,12 +301,21 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
struct svc_rdma_recv_ctxt *ctxt;
+ svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
+
+ ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc);
switch (wc->status) {
case IB_WC_SUCCESS:
- ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc);
trace_svcrdma_wc_read(wc, &cc->cc_cid, ctxt->rc_readbytes,
cc->cc_posttime);
- break;
+
+ spin_lock(&rdma->sc_rq_dto_lock);
+ list_add_tail(&ctxt->rc_list, &rdma->sc_read_complete_q);
+ /* the unlock pairs with the smp_rmb in svc_xprt_ready */
+ set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
+ spin_unlock(&rdma->sc_rq_dto_lock);
+ svc_xprt_enqueue(&rdma->sc_xprt);
+ return;
case IB_WC_WR_FLUSH_ERR:
trace_svcrdma_wc_read_flush(wc, &cc->cc_cid);
break;
@@ -313,10 +323,13 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
trace_svcrdma_wc_read_err(wc, &cc->cc_cid);
}
- svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
- cc->cc_status = wc->status;
- complete(&cc->cc_done);
- return;
+ /* The RDMA Read has flushed, so the incoming RPC message
+ * cannot be constructed and must be dropped. Signal the
+ * loss to the client by closing the connection.
+ */
+ svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE);
+ svc_rdma_recv_ctxt_put(rdma, ctxt);
+ svc_xprt_deferred_close(&rdma->sc_xprt);
}
/*
@@ -823,7 +836,6 @@ svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
struct svc_rdma_recv_ctxt *head)
{
const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
- struct xdr_buf *buf = &rqstp->rq_arg;
struct svc_rdma_chunk *chunk, *next;
unsigned int start, length;
int ret;
@@ -853,18 +865,7 @@ svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
start += length;
length = head->rc_byte_len - start;
- ret = svc_rdma_copy_inline_range(rqstp, head, start, length);
- if (ret < 0)
- return ret;
-
- buf->len += head->rc_readbytes;
- buf->buflen += head->rc_readbytes;
-
- buf->head[0].iov_base = page_address(rqstp->rq_pages[0]);
- buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, head->rc_readbytes);
- buf->pages = &rqstp->rq_pages[1];
- buf->page_len = head->rc_readbytes - buf->head[0].iov_len;
- return 0;
+ return svc_rdma_copy_inline_range(rqstp, head, start, length);
}
/**
@@ -888,42 +889,8 @@ svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
static int svc_rdma_read_data_item(struct svc_rqst *rqstp,
struct svc_rdma_recv_ctxt *head)
{
- struct xdr_buf *buf = &rqstp->rq_arg;
- struct svc_rdma_chunk *chunk;
- unsigned int length;
- int ret;
-
- chunk = pcl_first_chunk(&head->rc_read_pcl);
- ret = svc_rdma_build_read_chunk(rqstp, head, chunk);
- if (ret < 0)
- goto out;
-
- /* Split the Receive buffer between the head and tail
- * buffers at Read chunk's position. XDR roundup of the
- * chunk is not included in either the pagelist or in
- * the tail.
- */
- buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position;
- buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position;
- buf->head[0].iov_len = chunk->ch_position;
-
- /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
- *
- * If the client already rounded up the chunk length, the
- * length does not change. Otherwise, the length of the page
- * list is increased to include XDR round-up.
- *
- * Currently these chunks always start at page offset 0,
- * thus the rounded-up length never crosses a page boundary.
- */
- buf->pages = &rqstp->rq_pages[0];
- length = xdr_align_size(chunk->ch_length);
- buf->page_len = length;
- buf->len += length;
- buf->buflen += length;
-
-out:
- return ret;
+ return svc_rdma_build_read_chunk(rqstp, head,
+ pcl_first_chunk(&head->rc_read_pcl));
}
/**
@@ -1051,23 +1018,28 @@ static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp,
static noinline int svc_rdma_read_special(struct svc_rqst *rqstp,
struct svc_rdma_recv_ctxt *head)
{
- struct xdr_buf *buf = &rqstp->rq_arg;
- int ret;
-
- ret = svc_rdma_read_call_chunk(rqstp, head);
- if (ret < 0)
- goto out;
-
- buf->len += head->rc_readbytes;
- buf->buflen += head->rc_readbytes;
+ return svc_rdma_read_call_chunk(rqstp, head);
+}
- buf->head[0].iov_base = page_address(rqstp->rq_pages[0]);
- buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, head->rc_readbytes);
- buf->pages = &rqstp->rq_pages[1];
- buf->page_len = head->rc_readbytes - buf->head[0].iov_len;
+/* Pages under I/O have been copied to head->rc_pages. Ensure that
+ * svc_xprt_release() does not put them when svc_rdma_recvfrom()
+ * returns. This has to be done after all Read WRs are constructed
+ * to properly handle a page that happens to be part of I/O on behalf
+ * of two different RDMA segments.
+ *
+ * Note: if the subsequent post_send fails, these pages have already
+ * been moved to head->rc_pages and thus will be cleaned up by
+ * svc_rdma_recv_ctxt_put().
+ */
+static void svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp,
+ struct svc_rdma_recv_ctxt *head)
+{
+ unsigned int i;
-out:
- return ret;
+ for (i = 0; i < head->rc_page_count; i++) {
+ head->rc_pages[i] = rqstp->rq_pages[i];
+ rqstp->rq_pages[i] = NULL;
+ }
}
/**
@@ -1113,30 +1085,11 @@ int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
ret = svc_rdma_read_multiple_chunks(rqstp, head);
} else
ret = svc_rdma_read_special(rqstp, head);
+ svc_rdma_clear_rqst_pages(rqstp, head);
if (ret < 0)
- goto out_err;
+ return ret;
trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount);
- init_completion(&cc->cc_done);
ret = svc_rdma_post_chunk_ctxt(rdma, cc);
- if (ret < 0)
- goto out_err;
-
- ret = 1;
- wait_for_completion(&cc->cc_done);
- if (cc->cc_status != IB_WC_SUCCESS)
- ret = -EIO;
-
- /* rq_respages starts after the last arg page */
- rqstp->rq_respages = &rqstp->rq_pages[head->rc_page_count];
- rqstp->rq_next_page = rqstp->rq_respages + 1;
-
- /* Ensure svc_rdma_recv_ctxt_put() does not release pages
- * left in @rc_pages while I/O proceeds.
- */
- head->rc_page_count = 0;
-
-out_err:
- svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE);
- return ret;
+ return ret < 0 ? ret : 1;
}