@@ -139,6 +139,7 @@ struct ceph_msg {
struct kvec front; /* unaligned blobs of message */
struct ceph_buffer *middle;
+ size_t data_length;
struct ceph_msg_data *data; /* data payload */
struct ceph_connection *con;
@@ -270,7 +271,8 @@ extern void ceph_msg_data_set_pages(struct ceph_msg
*msg, struct page **pages,
size_t length, size_t alignment);
extern void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
struct ceph_pagelist *pagelist);
-extern void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio);
+extern void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
+ size_t length);
extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
bool can_fail);
@@ -2981,6 +2981,7 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg,
struct page **pages,
BUG_ON(!pages);
BUG_ON(!length);
+ BUG_ON(msg->data_length);
BUG_ON(msg->data != NULL);
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
@@ -2990,6 +2991,7 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg,
struct page **pages,
data->alignment = alignment & ~PAGE_MASK;
msg->data = data;
+ msg->data_length = length;
}
EXPORT_SYMBOL(ceph_msg_data_set_pages);
@@ -3000,6 +3002,7 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
BUG_ON(!pagelist);
BUG_ON(!pagelist->length);
+ BUG_ON(msg->data_length);
BUG_ON(msg->data != NULL);
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
@@ -3007,14 +3010,17 @@ void ceph_msg_data_set_pagelist(struct ceph_msg
*msg,
data->pagelist = pagelist;
msg->data = data;
+ msg->data_length = pagelist->length;
}
EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
-void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
+void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
+ size_t length)
{
struct ceph_msg_data *data;
BUG_ON(!bio);
+ BUG_ON(msg->data_length);
BUG_ON(msg->data != NULL);
data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
@@ -3022,6 +3028,7 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg,
struct bio *bio)
data->bio = bio;
msg->data = data;
+ msg->data_length = length;
}
EXPORT_SYMBOL(ceph_msg_data_set_bio);
@@ -3200,6 +3207,7 @@ void ceph_msg_last_put(struct kref *kref)
}
ceph_msg_data_destroy(m->data);
m->data = NULL;
+ m->data_length = 0;
if (m->pool)
ceph_msgpool_put(m->pool, m);
@@ -1848,7 +1848,7 @@ static void ceph_osdc_msg_data_set(struct ceph_msg
*msg,
ceph_msg_data_set_pagelist(msg, osd_data->pagelist);
#ifdef CONFIG_BLOCK
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
- ceph_msg_data_set_bio(msg, osd_data->bio);
+ ceph_msg_data_set_bio(msg, osd_data->bio, osd_data->bio_length);
#endif
} else {
I found a problem whose fix belongs in this patch. When the data field is reset in ceph_msg_last_put(), the data_length field should be reset to zero as well. Not doing so triggered an assertion for a re-used message that came from a message pool. The branch "review/wip-3761" has been updated to reflect this change. -Alex Keep track of the length of the data portion for a message in a separate field in the ceph_msg structure. This information has been maintained in wire byte order in the message header, but that's going to change soon. Signed-off-by: Alex Elder <elder@inktank.com> --- v2: Reset data_length to 0 in ceph_msg_last_put(). include/linux/ceph/messenger.h | 4 +++- net/ceph/messenger.c | 10 +++++++++- net/ceph/osd_client.c | 2 +- 3 files changed, 13 insertions(+), 3 deletions(-) BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);