@@ -265,6 +265,9 @@
#define OUT_MAXREQSIZE (1000 * 1024)
#define OUT_MAXREPSIZE MDS_MAXREPSIZE
+#define BUT_MAXREQSIZE OUT_MAXREQSIZE
+#define BUT_MAXREPSIZE BUT_MAXREQSIZE
+
/*
* LDLM threads constants:
*
@@ -2051,6 +2054,7 @@ int lustre_pack_reply_flags(struct ptlrpc_request *, int count, u32 *lens,
char **bufs, int flags);
int lustre_shrink_msg(struct lustre_msg *msg, int segment,
unsigned int newlen, int move_data);
+int lustre_grow_msg(struct lustre_msg *msg, int segment, unsigned int newlen);
void lustre_free_reply_state(struct ptlrpc_reply_state *rs);
int __lustre_unpack_msg(struct lustre_msg *m, int len);
u32 lustre_msg_hdr_size(u32 magic, u32 count);
@@ -2061,6 +2065,7 @@ extern u32 lustre_msg_early_size;
void *lustre_msg_buf_v2(struct lustre_msg_v2 *m, u32 n, u32 min_size);
void *lustre_msg_buf(struct lustre_msg *m, u32 n, u32 minlen);
u32 lustre_msg_buflen(struct lustre_msg *m, u32 n);
+void lustre_msg_set_buflen(struct lustre_msg *m, u32 n, u32 len);
u32 lustre_msg_bufcount(struct lustre_msg *m);
char *lustre_msg_string(struct lustre_msg *m, u32 n, u32 max_len);
u32 lustre_msghdr_get_flags(struct lustre_msg *msg);
@@ -132,6 +132,9 @@ int req_capsule_field_present(const struct req_capsule *pill,
void req_capsule_shrink(struct req_capsule *pill,
const struct req_msg_field *field,
u32 newlen, enum req_location loc);
+int req_capsule_server_grow(struct req_capsule *pill,
+ const struct req_msg_field *field,
+ u32 newlen);
bool req_capsule_need_swab(struct req_capsule *pill, enum req_location loc,
u32 index);
void req_capsule_set_swabbed(struct req_capsule *pill, enum req_location loc,
@@ -133,7 +133,7 @@ static int mdc_batch_getattr_pack(struct batch_update_head *head,
req_capsule_set_size(&pill, &RMF_ACL, RCL_SERVER,
LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
req_capsule_set_size(&pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
- sizeof(struct lmv_user_md));
+ /*sizeof(struct lmv_user_md)*/MIN_MD_SIZE);
if (have_secctx) {
char *secctx_name;
@@ -360,11 +360,16 @@ static int batch_update_request_fini(struct batch_update_head *head,
*/
repmsg = NULL;
rc1 = -ECANCELED;
+ /*
+ * TODO: resend the unfinished sub request when the
+ * return code is -EOVERFLOW.
+ */
}
if (ouc->ouc_interpret)
ouc->ouc_interpret(req, repmsg, ouc, rc1);
+ index++;
object_update_callback_fini(ouc);
if (rc == 0 && rc1 < 0)
rc = rc1;
@@ -1915,16 +1915,62 @@ int req_capsule_server_pack(struct req_capsule *pill)
count, fmt->rf_name);
}
} else { /* SUB request */
+ struct ptlrpc_request *req = pill->rc_req;
+ u32 used_len;
u32 msg_len;
msg_len = lustre_msg_size_v2(count, pill->rc_area[RCL_SERVER]);
- if (msg_len > pill->rc_reqmsg->lm_repsize) {
+ used_len = (char *)pill->rc_repmsg - (char *)req->rq_repmsg;
+ /* Overflow the reply buffer */
+ if (used_len + msg_len > req->rq_replen) {
+ u32 len;
+ u32 max;
+ u32 add;
+
+ if (!req_capsule_has_field(&req->rq_pill,
+ &RMF_BUT_REPLY, RCL_SERVER))
+ return -EINVAL;
+
+ if (!req_capsule_field_present(&req->rq_pill,
+ &RMF_BUT_REPLY,
+ RCL_SERVER))
+ return -EINVAL;
+
+ if (used_len + msg_len > BUT_MAXREPSIZE)
+ return -EOVERFLOW;
+
+ len = req_capsule_get_size(&req->rq_pill,
+ &RMF_BUT_REPLY, RCL_SERVER);
+ /*
+ * Currently just increase the batch RPC reply buffer
+ * (including @RMF_PTLRPC_BODY + @RMF_BUT_REPLY) by 2.
+ * We must set the new length carefully as it will be
+ * rounded up with 8.
+ */
+ max = BUT_MAXREPSIZE - req->rq_replen;
+ add = len;
+ if (used_len + msg_len > len)
+ add = used_len + msg_len;
+
+ if (add > max)
+ len += max;
+ else
+ len += add;
+ rc = req_capsule_server_grow(&req->rq_pill,
+ &RMF_BUT_REPLY, len);
+ if (rc)
+ return rc;
+
+ pill->rc_repmsg =
+ (struct lustre_msg *)((char *)req->rq_repmsg +
+ used_len);
+ }
+ if (msg_len > pill->rc_reqmsg->lm_repsize)
/* TODO: Check whether there is enough buffer size */
CDEBUG(D_INFO,
"Overflow pack %d fields in format '%s' for the SUB request with message len %u:%u\n",
count, fmt->rf_name, msg_len,
pill->rc_reqmsg->lm_repsize);
- }
rc = 0;
lustre_init_msg_v2(pill->rc_repmsg, count,
@@ -2498,6 +2544,147 @@ void req_capsule_shrink(struct req_capsule *pill,
}
EXPORT_SYMBOL(req_capsule_shrink);
+int req_capsule_server_grow(struct req_capsule *pill,
+ const struct req_msg_field *field,
+ u32 newlen)
+{
+ struct ptlrpc_request *req = pill->rc_req;
+ struct ptlrpc_reply_state *rs = req->rq_reply_state, *nrs;
+ char *from, *to, *sptr = NULL;
+ u32 slen = 0, snewlen = 0;
+ u32 offset, len, max, diff;
+ int rc;
+
+ LASSERT(pill->rc_fmt);
+ LASSERT(__req_format_is_sane(pill->rc_fmt));
+ LASSERT(req_capsule_has_field(pill, field, RCL_SERVER));
+ LASSERT(req_capsule_field_present(pill, field, RCL_SERVER));
+
+ if (req_capsule_subreq(pill)) {
+ if (!req_capsule_has_field(&req->rq_pill, &RMF_BUT_REPLY,
+ RCL_SERVER))
+ return -EINVAL;
+
+ if (!req_capsule_field_present(&req->rq_pill, &RMF_BUT_REPLY,
+ RCL_SERVER))
+ return -EINVAL;
+
+ len = req_capsule_get_size(&req->rq_pill, &RMF_BUT_REPLY,
+ RCL_SERVER);
+ sptr = req_capsule_server_get(&req->rq_pill, &RMF_BUT_REPLY);
+ slen = req_capsule_get_size(pill, field, RCL_SERVER);
+
+ LASSERT(len >= (char *)pill->rc_repmsg - sptr +
+ lustre_packed_msg_size(pill->rc_repmsg));
+ if (len >= (char *)pill->rc_repmsg - sptr +
+ lustre_packed_msg_size(pill->rc_repmsg) - slen +
+ newlen) {
+ req_capsule_set_size(pill, field, RCL_SERVER, newlen);
+ offset = __req_capsule_offset(pill, field, RCL_SERVER);
+ lustre_grow_msg(pill->rc_repmsg, offset, newlen);
+ return 0;
+ }
+
+ /*
+ * Currently first try to increase the reply buffer by
+ * 2 * newlen with reply buffer limit of BUT_MAXREPSIZE.
+ * TODO: Enlarge the reply buffer properly according to the
+ * left SUB requests in the batch PTLRPC request.
+ */
+ snewlen = newlen;
+ diff = snewlen - slen;
+ max = BUT_MAXREPSIZE - req->rq_replen;
+ if (diff > max)
+ return -EOVERFLOW;
+
+ if (diff * 2 + len < max)
+ newlen = (len + diff) * 2;
+ else
+ newlen = len + max;
+
+ req_capsule_set_size(pill, field, RCL_SERVER, snewlen);
+ req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY, RCL_SERVER,
+ newlen);
+ offset = __req_capsule_offset(&req->rq_pill, &RMF_BUT_REPLY,
+ RCL_SERVER);
+ } else {
+ len = req_capsule_get_size(pill, field, RCL_SERVER);
+ offset = __req_capsule_offset(pill, field, RCL_SERVER);
+ req_capsule_set_size(pill, field, RCL_SERVER, newlen);
+ }
+
+ CDEBUG(D_INFO, "Reply packed: %d, allocated: %d, field len %d -> %d\n",
+ lustre_packed_msg_size(rs->rs_msg), rs->rs_repbuf_len,
+ len, newlen);
+
+ /**
+ * There can be enough space in current reply buffer, make sure
+ * that rs_repbuf is not a wrapper but real reply msg, otherwise
+ * re-packing is still needed.
+ */
+ if (rs->rs_msg == rs->rs_repbuf &&
+ rs->rs_repbuf_len >=
+ lustre_packed_msg_size(rs->rs_msg) - len + newlen) {
+ req->rq_replen = lustre_grow_msg(rs->rs_msg, offset, newlen);
+ return 0;
+ }
+
+ /* Re-allocate replay state */
+ req->rq_reply_state = NULL;
+ rc = req_capsule_server_pack(&req->rq_pill);
+ if (rc) {
+ /* put old values back, the caller should decide what to do */
+ if (req_capsule_subreq(pill)) {
+ req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
+ RCL_SERVER, len);
+ req_capsule_set_size(pill, field, RCL_SERVER, slen);
+ } else {
+ req_capsule_set_size(pill, field, RCL_SERVER, len);
+ }
+ pill->rc_req->rq_reply_state = rs;
+ return rc;
+ }
+ nrs = req->rq_reply_state;
+ LASSERT(lustre_packed_msg_size(nrs->rs_msg) >
+ lustre_packed_msg_size(rs->rs_msg));
+
+ /* Now we need only buffers, copy them and grow the needed one */
+ to = lustre_msg_buf(nrs->rs_msg, 0, 0);
+ from = lustre_msg_buf(rs->rs_msg, 0, 0);
+ memcpy(to, from,
+ (char *)rs->rs_msg + lustre_packed_msg_size(rs->rs_msg) - from);
+ lustre_msg_set_buflen(nrs->rs_msg, offset, len);
+ req->rq_replen = lustre_grow_msg(nrs->rs_msg, offset, newlen);
+
+ if (req_capsule_subreq(pill)) {
+ char *ptr;
+
+ ptr = req_capsule_server_get(&req->rq_pill, &RMF_BUT_REPLY);
+ pill->rc_repmsg = (struct lustre_msg *)(ptr +
+ ((char *)pill->rc_repmsg - sptr));
+ offset = __req_capsule_offset(pill, field, RCL_SERVER);
+ lustre_grow_msg(pill->rc_repmsg, offset, snewlen);
+ }
+
+ if (rs->rs_difficult) {
+ /* copy rs data */
+ int i;
+
+ nrs->rs_difficult = 1;
+ nrs->rs_no_ack = rs->rs_no_ack;
+ for (i = 0; i < rs->rs_nlocks; i++) {
+ nrs->rs_locks[i] = rs->rs_locks[i];
+ nrs->rs_nlocks++;
+ }
+ rs->rs_nlocks = 0;
+ rs->rs_difficult = 0;
+ rs->rs_no_ack = 0;
+ }
+ ptlrpc_rs_decref(rs);
+ return 0;
+}
+EXPORT_SYMBOL(req_capsule_server_grow);
+
void req_capsule_subreq_init(struct req_capsule *pill,
const struct req_format *fmt,
struct ptlrpc_request *req,
@@ -454,6 +454,58 @@ int lustre_shrink_msg(struct lustre_msg *msg, int segment,
}
EXPORT_SYMBOL(lustre_shrink_msg);
+static int lustre_grow_msg_v2(struct lustre_msg_v2 *msg, __u32 segment,
+ unsigned int newlen)
+{
+ char *tail = NULL, *newpos;
+ int tail_len = 0, n;
+
+ LASSERT(msg);
+ LASSERT(msg->lm_bufcount > segment);
+ LASSERT(msg->lm_buflens[segment] <= newlen);
+
+ if (msg->lm_buflens[segment] == newlen)
+ goto out;
+
+ if (msg->lm_bufcount > segment + 1) {
+ tail = lustre_msg_buf_v2(msg, segment + 1, 0);
+ for (n = segment + 1; n < msg->lm_bufcount; n++)
+ tail_len += round_up(msg->lm_buflens[n], 8);
+ }
+
+ msg->lm_buflens[segment] = newlen;
+
+ if (tail && tail_len) {
+ newpos = lustre_msg_buf_v2(msg, segment + 1, 0);
+ memmove(newpos, tail, tail_len);
+ }
+out:
+ return lustre_msg_size_v2(msg->lm_bufcount, msg->lm_buflens);
+}
+
+/*
+ * for @msg, grow @segment to size @newlen.
+ * Always move higher buffer forward.
+ *
+ * return new msg size after growing.
+ *
+ * CAUTION:
+ * - caller must make sure there is enough space in allocated message buffer
+ * - caller should NOT keep pointers to msg buffers which higher than @segment
+ * after call shrink.
+ */
+int lustre_grow_msg(struct lustre_msg *msg, int segment, unsigned int newlen)
+{
+ switch (msg->lm_magic) {
+ case LUSTRE_MSG_MAGIC_V2:
+ return lustre_grow_msg_v2(msg, segment, newlen);
+ default:
+ LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic);
+ return -EINVAL;
+ }
+}
+EXPORT_SYMBOL(lustre_grow_msg);
+
void lustre_free_reply_state(struct ptlrpc_reply_state *rs)
{
PTLRPC_RS_DEBUG_LRU_DEL(rs);
@@ -660,6 +712,24 @@ u32 lustre_msg_buflen(struct lustre_msg *m, u32 n)
}
EXPORT_SYMBOL(lustre_msg_buflen);
+static inline void
+lustre_msg_set_buflen_v2(struct lustre_msg_v2 *m, u32 n, u32 len)
+{
+ LASSERT(n < m->lm_bufcount);
+ m->lm_buflens[n] = len;
+}
+
+void lustre_msg_set_buflen(struct lustre_msg *m, u32 n, u32 len)
+{
+ switch (m->lm_magic) {
+ case LUSTRE_MSG_MAGIC_V2:
+ lustre_msg_set_buflen_v2(m, n, len);
+ return;
+ default:
+ LASSERTF(0, "incorrect message magic: %08x\n", m->lm_magic);
+ }
+}
+
/* NB return the bufcount for lustre_msg_v2 format, so if message is packed
* in V1 format, the result is one bigger. (add struct ptlrpc_body).
*/