@@ -7182,6 +7182,242 @@ static void rbd_img_journal_append(struct rbd_img_request *img_req)
}
}
+static int setup_write_bvecs(void *buf, u64 offset,
+ u64 length, struct bio_vec **bvecs,
+ u32 *bvec_count)
+{
+ u32 i;
+ u32 off;
+
+ *bvec_count = calc_pages_for(offset, length);
+ *bvecs = kcalloc(*bvec_count, sizeof(**bvecs), GFP_NOIO);
+ if (!(*bvecs))
+ return -ENOMEM;
+
+ div_u64_rem(offset, PAGE_SIZE, &off);
+ for (i = 0; i < *bvec_count; i++) {
+ u32 len = min(length, (u64)PAGE_SIZE - off);
+
+ (*bvecs)[i].bv_page = alloc_page(GFP_NOIO);
+ if (!(*bvecs)[i].bv_page)
+ return -ENOMEM;
+
+ (*bvecs)[i].bv_offset = off;
+ (*bvecs)[i].bv_len = len;
+ memcpy(page_address((*bvecs)[i].bv_page) + (*bvecs)[i].bv_offset, buf,
+ (*bvecs)[i].bv_len);
+ length -= len;
+ buf += len;
+ off = 0;
+ }
+ rbd_assert(!length);
+ return 0;
+}
+
+static int rbd_journal_handle_aio_write(struct rbd_device *rbd_dev, void **p,
+ void *end, u8 struct_v, u64 commit_tid)
+{
+ struct ceph_snap_context *snapc;
+ struct rbd_img_request *img_request;
+ struct journal_commit_info *commit_info;
+ struct ceph_file_extent ex;
+ struct bio_vec *bvecs = NULL;
+ u32 bvec_count;
+ ssize_t data_len;
+ u64 offset, length;
+ int result;
+
+ /* get offset and length */
+ offset = ceph_decode_64(p);
+ length = ceph_decode_64(p);
+ /* get data_len and check the buffer length */
+ data_len = ceph_decode_32(p);
+ if (!ceph_has_room(p, end, data_len)) {
+ pr_err("our of range");
+ return -ERANGE;
+ }
+ rbd_assert(length == data_len);
+ /* Create a new image request for writing */
+ snapc = rbd_dev->header.snapc;
+ ceph_get_snap_context(snapc);
+ img_request = rbd_img_request_create(rbd_dev, OBJ_OP_WRITE, snapc);
+ if (!img_request) {
+ result = -ENOMEM;
+ goto err;
+ }
+ /* Allocate a new commit_info for this journal replay */
+ commit_info = kzalloc(sizeof(*commit_info), GFP_NOIO);
+ if (!commit_info) {
+ result = -ENOMEM;
+ rbd_img_request_put(img_request);
+ goto err;
+ }
+ INIT_LIST_HEAD(&commit_info->node);
+
+ /* Don't down ->lock_rwsem in __rbd_img_handle_request */
+ __set_bit(IMG_REQ_NOLOCK, &img_request->flags);
+ commit_info->commit_tid = commit_tid;
+ list_add_tail(&commit_info->node, &img_request->journal_commit_list);
+ snapc = NULL; /* img_request consumes a ref */
+
+ result = setup_write_bvecs(*p, offset, length, &bvecs, &bvec_count);
+ if (result) {
+ rbd_warn(rbd_dev, "failed to setup bvecs.");
+ rbd_img_request_put(img_request);
+ goto err;
+ }
+ /* Skip data for this event */
+ *p = (char *)*p + data_len;
+ ex.fe_off = offset;
+ ex.fe_len = length;
+ result = rbd_img_fill_from_bvecs(img_request, &ex, 1, bvecs);
+ if (result) {
+ rbd_img_request_put(img_request);
+ goto err;
+ }
+ /*
+ * As we are doing journal replaying in exclusive-lock aqcuiring,
+ * we don't need to aquire exclusive-lock for this writing, so
+ * set the ->state to RBD_IMG_APPEND_JOURNAL to skip exclusive-lock
+ * acquiring in state machine
+ */
+ img_request->state = RBD_IMG_APPEND_JOURNAL;
+ rbd_img_handle_request(img_request, 0);
+ result = wait_for_completion_interruptible(&img_request->completion);
+err:
+ if (bvecs) {
+ int i;
+
+ for (i = 0; i < bvec_count; i++) {
+ if (bvecs[i].bv_page)
+ __free_page(bvecs[i].bv_page);
+ }
+ kfree(bvecs);
+ }
+ return result;
+}
+
+static int rbd_journal_handle_aio_discard(struct rbd_device *rbd_dev, void **p,
+ void *end, u8 struct_v,
+ u64 commit_tid)
+{
+ struct rbd_img_request *img_request;
+ struct ceph_snap_context *snapc;
+ struct journal_commit_info *commit_info;
+ enum obj_operation_type op_type;
+ u64 offset, length;
+ bool skip_partial_discard = true;
+ int result;
+
+ /* get offset and length*/
+ offset = ceph_decode_64(p);
+ length = ceph_decode_64(p);
+ if (struct_v >= 4)
+ skip_partial_discard = ceph_decode_8(p);
+ if (struct_v >= 5) {
+ /* skip discard_granularity_bytes */
+ ceph_decode_32(p);
+ }
+
+ snapc = rbd_dev->header.snapc;
+ ceph_get_snap_context(snapc);
+ /*
+ * When the skip_partial_discard is false, we need to guarantee
+ * that the data range would be zeroout-ed. Otherwise, we don't
+ * guarantee the range discarded would be zero filled.
+ */
+ if (skip_partial_discard)
+ op_type = OBJ_OP_DISCARD;
+ else
+ op_type = OBJ_OP_ZEROOUT;
+
+ img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
+ if (!img_request) {
+ result = -ENOMEM;
+ goto err;
+ }
+ /* Allocate a commit_info for this image request */
+ commit_info = kzalloc(sizeof(*commit_info), GFP_NOIO);
+ if (!commit_info) {
+ result = -ENOMEM;
+ rbd_img_request_put(img_request);
+ goto err;
+ }
+ INIT_LIST_HEAD(&commit_info->node);
+
+ /* Don't down ->lock_rwsem in __rbd_img_handle_request */
+ __set_bit(IMG_REQ_NOLOCK, &img_request->flags);
+ commit_info->commit_tid = commit_tid;
+ list_add_tail(&commit_info->node, &img_request->journal_commit_list);
+ result = rbd_img_fill_nodata(img_request, offset, length);
+ if (result) {
+ rbd_img_request_put(img_request);
+ goto err;
+ }
+
+ img_request->state = RBD_IMG_APPEND_JOURNAL;
+ rbd_img_handle_request(img_request, 0);
+ result = wait_for_completion_interruptible(&img_request->completion);
+err:
+ return result;
+}
+
+static int rbd_journal_replay(void *entry_handler,
+ struct ceph_journaler_entry *entry,
+ u64 commit_tid)
+{
+ struct rbd_device *rbd_dev = entry_handler;
+ void *data = entry->data;
+ void **p = &data;
+ void *end = *p + entry->data_len;
+ u32 event_type;
+ u8 struct_v;
+ u32 struct_len;
+ int ret;
+
+ ret = ceph_start_decoding(p, end, 1, "rbd_decode_entry", &struct_v,
+ &struct_len);
+ if (ret)
+ return -EINVAL;
+
+ event_type = ceph_decode_32(p);
+ switch (event_type) {
+ case EVENT_TYPE_AIO_WRITE:
+ ret = rbd_journal_handle_aio_write(rbd_dev, p, end, struct_v,
+ commit_tid);
+ break;
+ case EVENT_TYPE_AIO_DISCARD:
+ ret = rbd_journal_handle_aio_discard(rbd_dev, p, end, struct_v,
+ commit_tid);
+ break;
+ case EVENT_TYPE_AIO_FLUSH:
+ /*
+ * As the all event replaying are synchronized, we don't
+ * need any more work for replaying flush event. Just
+ * commit it.
+ */
+ ceph_journaler_client_committed(rbd_dev->journal->journaler,
+ commit_tid);
+ break;
+ default:
+ rbd_warn(rbd_dev, "unknown event_type: %u", event_type);
+ return -EINVAL;
+ }
+
+ if (struct_v >= 4) {
+ u8 meta_struct_v;
+ u32 meta_struct_len;
+
+ /* skip metadata */
+ ret = ceph_start_decoding(p, end, 1, "decode_metadata",
+ &meta_struct_v, &meta_struct_len);
+ if (ret)
+ return ret;
+ *p += meta_struct_len;
+ }
+ return ret;
+}
+
struct rbd_journal_tag_predecessor {
bool commit_valid;
u64 tag_tid;
when we found uncommitted events in journal, we need to do a replay. This commit only implement three kinds of events replaying: EVENT_TYPE_AIO_DISCARD: Will send a img_request to image with OBJ_OP_ZEROOUT, and wait for it completed. EVENT_TYPE_AIO_WRITE: Will send a img_request to image with OBJ_OP_WRITE, and wait for it completed. EVENT_TYPE_AIO_FLUSH: As all other events are replayed in synchoronized way, that means the events before are all flushed. we did nothing for this event. Signed-off-by: Dongsheng Yang <dongsheng.yang@easystack.cn> --- drivers/block/rbd.c | 236 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 236 insertions(+)