@@ -1176,3 +1176,729 @@ int ceph_journaler_start_replay(struct ceph_journaler *journaler)
return ret;
}
EXPORT_SYMBOL(ceph_journaler_start_replay);
+
+// recording
+static int get_new_entry_tid(struct ceph_journaler *journaler,
+ uint64_t tag_tid, uint64_t *entry_tid)
+{
+ struct entry_tid *pos = NULL;
+
+ spin_lock(&journaler->entry_tid_lock);
+ list_for_each_entry(pos, &journaler->entry_tids, node) {
+ if (pos->tag_tid == tag_tid) {
+ *entry_tid = pos->entry_tid++;
+ spin_unlock(&journaler->entry_tid_lock);
+ return 0;
+ }
+ }
+
+ pos = entry_tid_alloc(journaler, tag_tid);
+ if (!pos) {
+ spin_unlock(&journaler->entry_tid_lock);
+ pr_err("failed to allocate new entry.");
+ return -ENOMEM;
+ }
+
+ *entry_tid = pos->entry_tid++;
+ spin_unlock(&journaler->entry_tid_lock);
+
+ return 0;
+}
+
+static uint64_t get_object(struct ceph_journaler *journaler, uint64_t splay_offset)
+{
+ return splay_offset + (journaler->splay_width * journaler->active_set);
+}
+
+static void future_init(struct ceph_journaler_future *future,
+ uint64_t tag_tid,
+ uint64_t entry_tid,
+ uint64_t commit_tid,
+ struct ceph_journaler_ctx *journaler_ctx)
+{
+ future->tag_tid = tag_tid;
+ future->entry_tid = entry_tid;
+ future->commit_tid = commit_tid;
+
+ spin_lock_init(&future->lock);
+ future->safe = false;
+ future->consistent = false;
+
+ future->ctx = journaler_ctx;
+ future->wait = NULL;
+}
+
+static void set_prev_future(struct ceph_journaler *journaler,
+ struct ceph_journaler_future *future)
+{
+ bool prev_future_finished = false;
+
+ if (journaler->prev_future == NULL) {
+ prev_future_finished = true;
+ } else {
+ spin_lock(&journaler->prev_future->lock);
+ prev_future_finished = (journaler->prev_future->consistent &&
+ journaler->prev_future->safe);
+ journaler->prev_future->wait = future;
+ spin_unlock(&journaler->prev_future->lock);
+ }
+
+ spin_lock(&future->lock);
+ if (prev_future_finished) {
+ future->consistent = true;
+ }
+ spin_unlock(&future->lock);
+
+ journaler->prev_future = future;
+}
+
+static void entry_init(struct ceph_journaler_entry *entry,
+ uint64_t tag_tid,
+ uint64_t entry_tid,
+ struct ceph_journaler_ctx *journaler_ctx)
+{
+ entry->tag_tid = tag_tid;
+ entry->entry_tid = entry_tid;
+ entry->data_len = journaler_ctx->bio_len +
+ journaler_ctx->prefix_len + journaler_ctx->suffix_len;
+}
+
+static void journaler_entry_encode_prefix(struct ceph_journaler_entry *entry,
+ void **p, void *end)
+{
+ ceph_encode_64(p, PREAMBLE);
+ ceph_encode_8(p, (uint8_t)1);
+ ceph_encode_64(p, entry->entry_tid);
+ ceph_encode_64(p, entry->tag_tid);
+
+ ceph_encode_32(p, entry->data_len);
+}
+
+static uint32_t crc_bio(uint32_t crc, struct bio *bio)
+{
+ struct bio_vec bv;
+ struct bvec_iter iter;
+ char *buf = NULL;
+ u64 offset = 0;
+
+next:
+ bio_for_each_segment(bv, bio, iter) {
+ buf = page_address(bv.bv_page) + bv.bv_offset;
+ crc = crc32c(crc, buf, bv.bv_len);
+ offset += bv.bv_len;
+ }
+
+ if (bio->bi_next) {
+ bio = bio->bi_next;
+ goto next;
+ }
+
+ return crc;
+}
+
+static void journaler_finish(struct work_struct *work)
+{
+ struct ceph_journaler *journaler = container_of(work, struct ceph_journaler,
+ finish_work);
+ struct ceph_journaler_ctx *ctx_pos, *next;
+
+ spin_lock(&journaler->finish_lock);
+ list_for_each_entry_safe(ctx_pos, next, &journaler->ctx_list, node) {
+ list_del(&ctx_pos->node);
+ ctx_pos->callback(ctx_pos);
+ }
+ spin_unlock(&journaler->finish_lock);
+}
+
+static void future_consistent(struct ceph_journaler *journaler,
+ struct ceph_journaler_future *future,
+ int result);
+static void future_finish(struct ceph_journaler *journaler,
+ struct ceph_journaler_future *future,
+ int result) {
+ struct ceph_journaler_ctx *journaler_ctx = future->ctx;
+ struct ceph_journaler_future *future_wait = future->wait;
+
+ mutex_lock(&journaler->meta_lock);
+ if (journaler->prev_future == future)
+ journaler->prev_future = NULL;
+ mutex_unlock(&journaler->meta_lock);
+
+ spin_lock(&journaler->finish_lock);
+ if (journaler_ctx->result == 0)
+ journaler_ctx->result = result;
+ list_add_tail(&journaler_ctx->node, &journaler->ctx_list);
+ spin_unlock(&journaler->finish_lock);
+
+ queue_work(journaler->task_wq, &journaler->finish_work);
+ if (future_wait)
+ future_consistent(journaler, future_wait, result);
+}
+
+static void future_consistent(struct ceph_journaler *journaler,
+ struct ceph_journaler_future *future,
+ int result) {
+ bool future_finished = false;
+
+ spin_lock(&future->lock);
+ future->consistent = true;
+ future_finished = (future->safe && future->consistent);
+ spin_unlock(&future->lock);
+
+ if (future_finished)
+ future_finish(journaler, future, result);
+}
+
+static void future_safe(struct ceph_journaler *journaler,
+ struct ceph_journaler_future *future,
+ int result) {
+ bool future_finished = false;
+
+ spin_lock(&future->lock);
+ future->safe = true;
+ future_finished = (future->safe && future->consistent);
+ spin_unlock(&future->lock);
+
+ if (future_finished)
+ future_finish(journaler, future, result);
+}
+
+static void journaler_notify_update(struct work_struct *work)
+{
+ struct ceph_journaler *journaler = container_of(work,
+ struct ceph_journaler,
+ notify_update_work);
+ int ret = 0;
+
+ ret = ceph_osdc_notify(journaler->osdc, &journaler->header_oid,
+ &journaler->header_oloc, NULL, 0,
+ 5000, NULL, NULL);
+ if (ret)
+ pr_err("notify_update failed: %d", ret);
+}
+
+static bool advance_object_set(struct ceph_journaler *journaler)
+{
+ int ret = 0;
+ int i = 0;
+ struct object_recorder *obj_recorder;
+ uint64_t active_set = 0;
+
+ mutex_lock(&journaler->meta_lock);
+ if (journaler->advancing) {
+ mutex_unlock(&journaler->meta_lock);
+ return false;
+ }
+
+ // make sure all inflight appending finish
+ for (i = 0; i < journaler->splay_width; i++) {
+ obj_recorder = &journaler->obj_recorders[i];
+ spin_lock(&obj_recorder->lock);
+ if (obj_recorder->inflight_append) {
+ spin_unlock(&obj_recorder->lock);
+ mutex_unlock(&journaler->meta_lock);
+ return false;
+ }
+ spin_unlock(&obj_recorder->lock);
+ }
+
+ journaler->advancing = true;
+
+ active_set = journaler->active_set + 1;
+ mutex_unlock(&journaler->meta_lock);
+
+ ret = ceph_cls_journaler_set_active_set(journaler->osdc,
+ &journaler->header_oid, &journaler->header_oloc,
+ active_set);
+ if (ret) {
+ pr_err("error in set active_set: %d", ret);
+ }
+
+ queue_work(journaler->task_wq, &journaler->notify_update_work);
+
+ return true;
+}
+
+static void journaler_overflow(struct work_struct *work)
+{
+ struct ceph_journaler *journaler = container_of(work,
+ struct ceph_journaler,
+ overflow_work);
+ if (advance_object_set(journaler)) {
+ queue_work(journaler->task_wq, &journaler->flush_work);
+ }
+}
+
+static void journaler_append_callback(struct ceph_osd_request *osd_req)
+{
+ struct journaler_append_ctx *ctx = osd_req->r_priv;
+ struct ceph_journaler *journaler = ctx->journaler;
+ struct ceph_journaler_future *future = &ctx->future;
+ int ret = osd_req->r_result;
+ struct object_recorder *obj_recorder = &journaler->obj_recorders[ctx->splay_offset];
+
+ if (ret)
+ pr_err("ret of journaler_append_callback: %d", ret);
+
+ __free_page(ctx->req_page);
+ ceph_osdc_put_request(osd_req);
+
+ if (ret == -EOVERFLOW) {
+ mutex_lock(&journaler->meta_lock);
+ journaler->overflowed = true;
+ mutex_unlock(&journaler->meta_lock);
+
+ spin_lock(&obj_recorder->lock);
+ list_add_tail(&ctx->node, &obj_recorder->overflow_list);
+ if (--obj_recorder->inflight_append == 0)
+ queue_work(journaler->task_wq, &journaler->overflow_work);
+ spin_unlock(&obj_recorder->lock);
+ return;
+ }
+
+ spin_lock(&obj_recorder->lock);
+ if (--obj_recorder->inflight_append == 0) {
+ mutex_lock(&journaler->meta_lock);
+ if (journaler->overflowed)
+ queue_work(journaler->task_wq, &journaler->overflow_work);
+ mutex_unlock(&journaler->meta_lock);
+ }
+ spin_unlock(&obj_recorder->lock);
+
+ ret = add_commit_entry(journaler, ctx->future.commit_tid, ctx->object_num,
+ ctx->future.tag_tid, ctx->future.entry_tid);
+ if (ret) {
+ pr_err("failed to add_commit_entry: %d", ret);
+ future_finish(journaler, future, -ENOMEM);
+ return;
+ }
+
+ future_safe(journaler, future, ret);
+}
+
+static int append(struct ceph_journaler *journaler,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ struct journaler_append_ctx *ctx)
+
+{
+ struct ceph_osd_client *osdc = journaler->osdc;
+ struct ceph_osd_request *req;
+ void *p;
+ int ret;
+
+ req = ceph_osdc_alloc_request(osdc, NULL, 2, false, GFP_NOIO);
+ if (!req)
+ return -ENOMEM;
+
+ ceph_oid_copy(&req->r_base_oid, oid);
+ ceph_oloc_copy(&req->r_base_oloc, oloc);
+ req->r_flags = CEPH_OSD_FLAG_WRITE;
+ req->r_callback = journaler_append_callback;
+ req->r_priv = ctx;
+
+ // guard_append
+ ctx->req_page = alloc_page(GFP_NOIO);
+ if (!ctx->req_page) {
+ ret = -ENOMEM;
+ goto out_req;
+ }
+ p = page_address(ctx->req_page);
+ ceph_encode_64(&p, 1 << journaler->order);
+ ret = osd_req_op_cls_init(req, 0, "journal", "guard_append");
+ if (ret)
+ goto out_free_page;
+ osd_req_op_cls_request_data_pages(req, 0, &ctx->req_page, 8, 0, false, false);
+
+ // append_data
+ osd_req_op_extent_init(req, 1, CEPH_OSD_OP_APPEND, 0,
+ ctx->journaler_ctx.prefix_len + ctx->journaler_ctx.bio_len + ctx->journaler_ctx.suffix_len, 0, 0);
+
+ if (ctx->journaler_ctx.prefix_len)
+ osd_req_op_extent_prefix_pages(req, 1, &ctx->journaler_ctx.prefix_page,
+ ctx->journaler_ctx.prefix_len,
+ ctx->journaler_ctx.prefix_offset,
+ false, false);
+ if (ctx->journaler_ctx.bio_len)
+ osd_req_op_extent_osd_data_bio(req, 1, ctx->journaler_ctx.bio_iter, ctx->journaler_ctx.bio_len);
+ if (ctx->journaler_ctx.suffix_len)
+ osd_req_op_extent_suffix_pages(req, 1, &ctx->journaler_ctx.suffix_page,
+ ctx->journaler_ctx.suffix_len,
+ ctx->journaler_ctx.suffix_offset,
+ false, false);
+ ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+ if (ret)
+ goto out_free_page;
+
+ ceph_osdc_start_request(osdc, req, false);
+ return 0;
+
+out_free_page:
+ __free_page(ctx->req_page);
+out_req:
+ ceph_osdc_put_request(req);
+ return ret;
+}
+
+static int send_append_request(struct ceph_journaler *journaler,
+ uint64_t object_num,
+ struct journaler_append_ctx *ctx)
+{
+ struct ceph_object_id object_oid;
+ int ret = 0;
+
+ ceph_oid_init(&object_oid);
+ ret = ceph_oid_aprintf(&object_oid, GFP_NOIO, "%s%llu",
+ journaler->object_oid_prefix, object_num);
+ if (ret) {
+ pr_err("failed to initialize object id: %d", ret);
+ goto out;
+ }
+
+ ret = append(journaler, &object_oid, &journaler->data_oloc, ctx);
+out:
+ ceph_oid_destroy(&object_oid);
+ return ret;
+}
+
+static void journaler_flush(struct work_struct *work)
+{
+ struct ceph_journaler *journaler = container_of(work,
+ struct ceph_journaler,
+ flush_work);
+ int i = 0;
+ int ret = 0;
+ struct object_recorder *obj_recorder;
+ struct journaler_append_ctx *ctx, *next_ctx;
+ int req_num = 0;
+ LIST_HEAD(tmp);
+
+ if (journaler->overflowed) {
+ return;
+ }
+
+ for (i = 0; i < journaler->splay_width; i++) {
+ req_num = 0;
+ INIT_LIST_HEAD(&tmp);
+ obj_recorder = &journaler->obj_recorders[i];
+
+ spin_lock(&obj_recorder->lock);
+ list_splice_tail_init(&obj_recorder->overflow_list, &tmp);
+ list_splice_tail_init(&obj_recorder->append_list, &tmp);
+ spin_unlock(&obj_recorder->lock);
+
+ list_for_each_entry_safe(ctx, next_ctx, &tmp, node) {
+ ctx->object_num = get_object(journaler, obj_recorder->splay_offset);
+ ret = send_append_request(journaler, ctx->object_num, ctx);
+ if (ret) {
+ pr_err("failed to send append request: %d", ret);
+ list_del(&ctx->node);
+ future_finish(journaler, &ctx->future, ret);
+ continue;
+ }
+ req_num++;
+ }
+
+ spin_lock(&obj_recorder->lock);
+ obj_recorder->inflight_append += req_num;
+ spin_unlock(&obj_recorder->lock);
+ }
+}
+
+static int ceph_journaler_object_append(struct ceph_journaler *journaler,
+ struct journaler_append_ctx *append_ctx)
+{
+ void *buf = NULL;
+ void *end = NULL;
+ int ret = 0;
+ uint32_t crc = 0;
+ struct ceph_journaler_ctx *journaler_ctx = &append_ctx->journaler_ctx;
+ struct ceph_bio_iter *bio_iter = journaler_ctx->bio_iter;
+ struct object_recorder *obj_recorder;
+
+ // PEAMBLE(8) + version(1) + entry_tid(8) + tag_tid(8) + string_len(4) = 29
+ journaler_ctx->prefix_offset -= 29;
+ journaler_ctx->prefix_len += 29;
+ buf = page_address(journaler_ctx->prefix_page) + journaler_ctx->prefix_offset;
+ end = buf + 29;
+ journaler_entry_encode_prefix(&append_ctx->entry, &buf, end);
+
+ // size of crc is 4
+ journaler_ctx->suffix_offset += 0;
+ journaler_ctx->suffix_len += 4;
+ buf = page_address(journaler_ctx->suffix_page);
+ end = buf + 4;
+ crc = crc32c(crc, page_address(journaler_ctx->prefix_page) + journaler_ctx->prefix_offset,
+ journaler_ctx->prefix_len);
+ if (journaler_ctx->bio_len)
+ crc = crc_bio(crc, bio_iter->bio);
+ ceph_encode_32(&buf, crc);
+ obj_recorder = &journaler->obj_recorders[append_ctx->splay_offset];
+
+ spin_lock(&obj_recorder->lock);
+ list_add_tail(&append_ctx->node, &obj_recorder->append_list);
+ queue_work(journaler->task_wq, &journaler->flush_work);
+ spin_unlock(&obj_recorder->lock);
+
+ return ret;
+}
+
+struct journaler_append_ctx *journaler_append_ctx_alloc(void)
+{
+ struct journaler_append_ctx *append_ctx;
+ struct ceph_journaler_ctx *journaler_ctx;
+
+ append_ctx = kmem_cache_zalloc(journaler_append_ctx_cache, GFP_NOIO);
+ if (!append_ctx)
+ return NULL;
+
+ journaler_ctx = &append_ctx->journaler_ctx;
+ journaler_ctx->prefix_page = alloc_page(GFP_NOIO);
+ if (!journaler_ctx->prefix_page)
+ goto free_journaler_ctx;
+
+ journaler_ctx->suffix_page = alloc_page(GFP_NOIO);
+ if (!journaler_ctx->suffix_page)
+ goto free_prefix_page;
+
+ memset(page_address(journaler_ctx->prefix_page), 0, PAGE_SIZE);
+ memset(page_address(journaler_ctx->suffix_page), 0, PAGE_SIZE);
+ INIT_LIST_HEAD(&journaler_ctx->node);
+
+ kref_init(&append_ctx->kref);
+ INIT_LIST_HEAD(&append_ctx->node);
+ return append_ctx;
+
+free_prefix_page:
+ __free_page(journaler_ctx->prefix_page);
+free_journaler_ctx:
+ kmem_cache_free(journaler_append_ctx_cache, append_ctx);
+ return NULL;
+}
+
+struct ceph_journaler_ctx *ceph_journaler_ctx_alloc(void)
+{
+ struct journaler_append_ctx *append_ctx;
+
+ append_ctx = journaler_append_ctx_alloc();
+ if (!append_ctx)
+ return NULL;
+
+ return &append_ctx->journaler_ctx;
+}
+EXPORT_SYMBOL(ceph_journaler_ctx_alloc);
+
+static void journaler_append_ctx_release(struct kref *kref)
+{
+ struct journaler_append_ctx *append_ctx;
+ struct ceph_journaler_ctx *journaler_ctx;
+
+ append_ctx = container_of(kref, struct journaler_append_ctx, kref);
+ journaler_ctx = &append_ctx->journaler_ctx;
+
+ __free_page(journaler_ctx->prefix_page);
+ __free_page(journaler_ctx->suffix_page);
+ kmem_cache_free(journaler_append_ctx_cache, append_ctx);
+}
+
+static void journaler_append_ctx_put(struct journaler_append_ctx *append_ctx)
+{
+ if (append_ctx) {
+ kref_put(&append_ctx->kref, journaler_append_ctx_release);
+ }
+}
+
+void ceph_journaler_ctx_put(struct ceph_journaler_ctx *journaler_ctx)
+{
+ struct journaler_append_ctx *append_ctx;
+
+ if (journaler_ctx) {
+ append_ctx = container_of(journaler_ctx,
+ struct journaler_append_ctx,
+ journaler_ctx);
+ journaler_append_ctx_put(append_ctx);
+ }
+}
+EXPORT_SYMBOL(ceph_journaler_ctx_put);
+
+int ceph_journaler_append(struct ceph_journaler *journaler,
+ uint64_t tag_tid,
+ uint64_t *commit_tid,
+ struct ceph_journaler_ctx *journaler_ctx)
+{
+ uint8_t splay_width;
+ uint64_t entry_tid;
+ struct object_recorder *obj_recorder;
+ struct journaler_append_ctx *append_ctx;
+ int ret = 0;
+
+ append_ctx = container_of(journaler_ctx,
+ struct journaler_append_ctx,
+ journaler_ctx);
+
+ append_ctx->journaler = journaler;
+
+ mutex_lock(&journaler->meta_lock);
+ ret = get_new_entry_tid(journaler, tag_tid, &entry_tid);
+ if (ret) {
+ goto unlock;
+ }
+ splay_width = journaler->splay_width;
+ append_ctx->splay_offset = entry_tid % splay_width;
+
+ obj_recorder = &journaler->obj_recorders[splay_width];
+ append_ctx->object_num = get_object(journaler, append_ctx->splay_offset);
+
+ *commit_tid = allocate_commit_tid(journaler);
+ entry_init(&append_ctx->entry, tag_tid, entry_tid, journaler_ctx);
+ future_init(&append_ctx->future, tag_tid, entry_tid, *commit_tid, journaler_ctx);
+ set_prev_future(journaler, &append_ctx->future);
+ mutex_unlock(&journaler->meta_lock);
+
+ ret = ceph_journaler_object_append(journaler, append_ctx);
+ return ret;
+
+unlock:
+ mutex_unlock(&journaler->meta_lock);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_append);
+
+static void journaler_client_commit(struct work_struct *work)
+{
+ struct ceph_journaler *journaler = container_of(to_delayed_work(work),
+ struct ceph_journaler,
+ commit_work);
+ int ret = 0;
+
+ mutex_lock(&journaler->commit_lock);
+ copy_pos_list(&journaler->obj_pos_pending,
+ &journaler->obj_pos_committing);
+ mutex_unlock(&journaler->commit_lock);
+ ret = ceph_cls_journaler_client_committed(journaler->osdc,
+ &journaler->header_oid, &journaler->header_oloc,
+ journaler->client, &journaler->obj_pos_committing);
+
+ if (ret) {
+ pr_err("error in client committed: %d", ret);
+ }
+
+ queue_work(journaler->task_wq, &journaler->notify_update_work);
+
+ mutex_lock(&journaler->commit_lock);
+ journaler->commit_scheduled = false;
+ mutex_unlock(&journaler->commit_lock);
+}
+
+// hold journaler->commit_lock
+static void add_object_position(struct commit_entry *entry,
+ struct list_head *object_positions,
+ uint64_t splay_width)
+{
+ struct ceph_journaler_object_pos *position = NULL;
+ uint8_t splay_offset = entry->object_num % splay_width;
+ bool found = false;
+
+ list_for_each_entry(position, object_positions, node) {
+ if (position->in_using == false) {
+ found = true;
+ break;
+ }
+
+ if (splay_offset == position->object_num % splay_width) {
+ found = true;
+ break;
+ }
+ }
+
+ BUG_ON(!found);
+ if (position->in_using == false)
+ position->in_using = true;
+ position->object_num = entry->object_num;
+ position->tag_tid = entry->tag_tid;
+ position->entry_tid = entry->entry_tid;
+ list_move(&position->node, object_positions);
+}
+
+void ceph_journaler_client_committed(struct ceph_journaler *journaler, uint64_t commit_tid)
+{
+ struct commit_entry *commit_entry = NULL;
+ bool update_client_commit = true;
+ struct rb_node *n;
+
+ mutex_lock(&journaler->commit_lock);
+ for (n = rb_first(&journaler->commit_entries); n; n = rb_next(n)) {
+ commit_entry = rb_entry(n, struct commit_entry, r_node);
+ if (commit_entry->commit_tid == commit_tid) {
+ commit_entry->committed = true;
+ break;
+ }
+ if (commit_entry->committed == false)
+ update_client_commit = false;
+ }
+
+ if (update_client_commit) {
+ for (n = rb_first(&journaler->commit_entries); n;) {
+ commit_entry = rb_entry(n, struct commit_entry, r_node);
+ n = rb_next(n);
+
+ if (commit_entry->commit_tid > commit_tid)
+ break;
+ add_object_position(commit_entry,
+ &journaler->obj_pos_pending,
+ journaler->splay_width);
+ erase_commit_entry(&journaler->commit_entries, commit_entry);
+ kmem_cache_free(journaler_commit_entry_cache, commit_entry);
+ }
+ }
+
+ if (update_client_commit && !journaler->commit_scheduled) {
+ queue_delayed_work(journaler->task_wq, &journaler->commit_work,
+ JOURNALER_COMMIT_INTERVAL);
+ journaler->commit_scheduled = true;
+ }
+ mutex_unlock(&journaler->commit_lock);
+
+}
+EXPORT_SYMBOL(ceph_journaler_client_committed);
+
+int ceph_journaler_allocate_tag(struct ceph_journaler *journaler,
+ uint64_t tag_class, void *buf,
+ uint32_t buf_len,
+ struct ceph_journaler_tag *tag)
+{
+ uint64_t tag_tid = 0;
+ int ret = 0;
+
+retry:
+ ret = ceph_cls_journaler_get_next_tag_tid(journaler->osdc,
+ &journaler->header_oid,
+ &journaler->header_oloc,
+ &tag_tid);
+ if (ret)
+ goto out;
+
+ ret = ceph_cls_journaler_tag_create(journaler->osdc,
+ &journaler->header_oid,
+ &journaler->header_oloc,
+ tag_tid, tag_class,
+ buf, buf_len);
+ if (ret < 0) {
+ if (ret == -ESTALE) {
+ goto retry;
+ } else {
+ goto out;
+ }
+ }
+
+ ret = ceph_cls_journaler_get_tag(journaler->osdc,
+ &journaler->header_oid,
+ &journaler->header_oloc,
+ tag_tid, tag);
+ if (ret)
+ goto out;
+
+out:
+ return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_allocate_tag);
This commit introduce 3 APIs for journal recording: (1) ceph_journaler_allocate_tag() This api allocate a new tag for user to get a unified tag_tid. Then each event appended by this user will be tagged by this tag_tid. (2) ceph_journaler_append() This api allow user to append event to journal objects. (3) ceph_journaler_client_committed() This api will notify journaling that a event is already committed, you can remove it from journal if there is no other client refre to it. Signed-off-by: Dongsheng Yang <dongsheng.yang@easystack.cn> --- net/ceph/journaler.c | 726 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 726 insertions(+)