@@ -98,6 +98,13 @@ static __inline__ bool ceph_msg_data_type_valid(enum
ceph_msg_data_type type)
struct ceph_msg_data_cursor {
bool last_piece; /* now at last piece of data item */
union {
+#ifdef CONFIG_BLOCK
+ struct { /* bio */
+ struct bio *bio; /* bio from list */
+ unsigned int vector_index; /* vector from bio */
+ unsigned int vector_offset; /* bytes from vector */
+ };
+#endif /* CONFIG_BLOCK */
struct { /* pagelist */
struct page *page; /* page from list */
size_t offset; /* bytes from list */
@@ -739,6 +739,97 @@ static void iter_bio_next(struct bio **bio_iter,
unsigned int *seg)
if (*seg == (*bio_iter)->bi_vcnt)
init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
}
+
+/*
+ * For a bio data item, a piece is whatever remains of the next
+ * entry in the current bio iovec, or the first entry in the next
+ * bio in the list.
+ */
+static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data)
+{
+ struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct bio *bio;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+ bio = data->bio;
+ BUG_ON(!bio);
+ BUG_ON(!bio->bi_vcnt);
+ /* resid = bio->bi_size */
+
+ cursor->bio = bio;
+ cursor->vector_index = 0;
+ cursor->vector_offset = 0;
+ cursor->last_piece = !bio->bi_next && bio->bi_vcnt == 1;
+}
+
+static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
+ size_t *page_offset,
+ size_t *length,
+ bool *last_piece)
+{
+ struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct bio *bio;
+ struct bio_vec *bio_vec;
+ unsigned int index;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+ bio = cursor->bio;
+ BUG_ON(!bio);
+
+ index = cursor->vector_index;
+ BUG_ON(index >= (unsigned int) bio->bi_vcnt);
+
+ bio_vec = &bio->bi_io_vec[index];
+ BUG_ON(cursor->vector_offset >= bio_vec->bv_len);
+ *page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset);
+ BUG_ON(*page_offset >= PAGE_SIZE);
+ *length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
+ BUG_ON(*length > PAGE_SIZE);
+ *last_piece = cursor->last_piece;
+
+ return bio_vec->bv_page;
+}
+
+static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data,
size_t bytes)
+{
+ struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct bio *bio;
+ struct bio_vec *bio_vec;
+ unsigned int index;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+ bio = cursor->bio;
+ BUG_ON(!bio);
+
+ index = cursor->vector_index;
+ BUG_ON(index >= (unsigned int) bio->bi_vcnt);
+ bio_vec = &bio->bi_io_vec[index];
+ BUG_ON(cursor->vector_offset + bytes > bio_vec->bv_len);
+
+ /* Advance the cursor offset */
+
+ cursor->vector_offset += bytes;
+ if (cursor->vector_offset < bio_vec->bv_len)
+ return false; /* more bytes to process in this segment */
+
+ /* Move on to the next segment, and possibly the next bio */
+
+ if (++cursor->vector_index == (unsigned int) bio->bi_vcnt) {
+ bio = bio->bi_next;
+ cursor->bio = bio;
+ cursor->vector_index = 0;
+ }
+ cursor->vector_offset = 0;
+
+ if (!cursor->last_piece && bio && !bio->bi_next)
+ if (cursor->vector_index == (unsigned int) bio->bi_vcnt - 1)
+ cursor->last_piece = true;
+
+ return true;
+}
#endif
Implement and use cursor routines for bio message data items for outbound message data. (See the previous commit for reasoning in support of the changes in out_msg_pos_next().) Signed-off-by: Alex Elder <elder@inktank.com> --- include/linux/ceph/messenger.h | 7 +++ net/ceph/messenger.c | 128 ++++++++++++++++++++++++++++++++++------ 2 files changed, 118 insertions(+), 17 deletions(-) /* @@ -850,6 +941,8 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data) break; #ifdef CONFIG_BLOCK case CEPH_MSG_DATA_BIO: + ceph_msg_data_bio_cursor_init(data); + break; #endif /* CONFIG_BLOCK */ default: break; @@ -875,7 +968,8 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data, last_piece); #ifdef CONFIG_BLOCK case CEPH_MSG_DATA_BIO: - break; + return ceph_msg_data_bio_next(data, page_offset, length, + last_piece); #endif /* CONFIG_BLOCK */ } BUG(); @@ -896,7 +990,7 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes) return ceph_msg_data_pagelist_advance(data, bytes); #ifdef CONFIG_BLOCK case CEPH_MSG_DATA_BIO: - break; + return ceph_msg_data_bio_advance(data, bytes); #endif /* CONFIG_BLOCK */ } BUG(); @@ -923,6 +1017,10 @@ static void prepare_message_data(struct ceph_msg *msg, /* Initialize data cursors */ +#ifdef CONFIG_BLOCK + if (ceph_msg_has_bio(msg)) + ceph_msg_data_cursor_init(&msg->b); +#endif /* CONFIG_BLOCK */ if (ceph_msg_has_pagelist(msg)) ceph_msg_data_cursor_init(&msg->l); if (ceph_msg_has_trail(msg)) @@ -1223,6 +1321,10 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page, need_crc = ceph_msg_data_advance(&msg->t, sent); else if (ceph_msg_has_pagelist(msg)) need_crc = ceph_msg_data_advance(&msg->l, sent); +#ifdef CONFIG_BLOCK + else if (ceph_msg_has_bio(msg)) + need_crc = ceph_msg_data_advance(&msg->b, sent); +#endif /* CONFIG_BLOCK */ BUG_ON(need_crc && sent != len); if (sent < len) @@ -1232,10 +1334,6 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page, msg_pos->page_pos = 0; msg_pos->page++; msg_pos->did_page_crc = false; -#ifdef CONFIG_BLOCK - if (ceph_msg_has_bio(msg)) - iter_bio_next(&msg->b.bio_iter, &msg->b.bio_seg); -#endif } static void in_msg_pos_next(struct ceph_connection *con, size_t len, @@ -1313,8 +1411,6 @@ static int write_partial_message_data(struct ceph_connection *con) struct page *page = NULL; size_t page_offset; size_t length; - int max_write = PAGE_SIZE; - int bio_offset = 0; bool use_cursor = false; bool last_piece = true; /* preserve existing behavior */ @@ -1335,21 +1431,19 @@ static int write_partial_message_data(struct ceph_connection *con) &length, &last_piece); #ifdef CONFIG_BLOCK } else if (ceph_msg_has_bio(msg)) { - struct bio_vec *bv; - - bv = bio_iovec_idx(msg->b.bio_iter, msg->b.bio_seg); - page = bv->bv_page; - bio_offset = bv->bv_offset; - max_write = bv->bv_len; + use_cursor = true; + page = ceph_msg_data_next(&msg->b, &page_offset, + &length, &last_piece); #endif } else { page = zero_page; } - if (!use_cursor) - length = min_t(int, max_write - msg_pos->page_pos, + if (!use_cursor) { + length = min_t(int, PAGE_SIZE - msg_pos->page_pos, total_max_write); - page_offset = msg_pos->page_pos + bio_offset; + page_offset = msg_pos->page_pos; + } if (do_datacrc && !msg_pos->did_page_crc) { u32 crc = le32_to_cpu(msg->footer.data_crc);