@@ -370,6 +370,24 @@ int ceph_journaler_open(struct ceph_journaler *journaler)
goto out;
}
+ // TODO When the journal object size is smaller than 8M, there is a possibility
+ // to build a journal object larger than 2*object_size.
+ // e.g:
+ // When the journal object is 4M,
+ // (1) append first entry size of (4M - 1)
+ // (2) append second entry size of (4M + prefix_len + suffix_len)
+ // (3) journaler object size would be > 8M, which is 2*object_size.
+ //
+ // But when we do fetch, we will read a whole object into fetch_buf, which is
+ // size of 2*object_size. So if this object size is larger than fetch_len, we
+ // will fail in fetch.
+ //
+ // To solve this problem, we need to split append entry when we found entry size
+ // whould be larger than object size.
+ if (order < 23) {
+ return -ENOTSUPP;
+ }
+
mutex_lock(&journaler->meta_lock);
// set the immutable metas.
journaler->order = order;
@@ -608,3 +626,553 @@ int ceph_journaler_get_cached_client(struct ceph_journaler *journaler, char *cli
return ret;
}
EXPORT_SYMBOL(ceph_journaler_get_cached_client);
+
+// replaying
+static int ceph_journaler_obj_read_sync(struct ceph_journaler *journaler,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ void *buf, uint32_t read_off, uint64_t buf_len)
+
+{
+ struct ceph_osd_client *osdc = journaler->osdc;
+ struct ceph_osd_request *req;
+ struct page **pages;
+ int num_pages = calc_pages_for(0, buf_len);
+ int ret;
+
+ req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
+ if (!req)
+ return -ENOMEM;
+
+ ceph_oid_copy(&req->r_base_oid, oid);
+ ceph_oloc_copy(&req->r_base_oloc, oloc);
+ req->r_flags = CEPH_OSD_FLAG_READ;
+
+ pages = ceph_alloc_page_vector(num_pages, GFP_NOIO);
+ if (IS_ERR(pages)) {
+ ret = PTR_ERR(pages);
+ goto out_req;
+ }
+
+ osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, read_off, buf_len, 0, 0);
+ osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
+ true);
+
+ ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+ if (ret)
+ goto out_req;
+
+ ceph_osdc_start_request(osdc, req, false);
+ ret = ceph_osdc_wait_request(osdc, req);
+ if (ret >= 0)
+ ceph_copy_from_page_vector(pages, buf, 0, ret);
+
+out_req:
+ ceph_osdc_put_request(req);
+ return ret;
+}
+
+static bool entry_is_readable(struct ceph_journaler *journaler, void *buf,
+ void *end, uint32_t *bytes_needed)
+{
+ uint32_t remaining = end - buf;
+ uint64_t preamble;
+ uint32_t data_size;
+ void *origin_buf = buf;
+ uint32_t crc = 0, crc_encoded = 0;
+
+ if (remaining < HEADER_FIXED_SIZE) {
+ *bytes_needed = HEADER_FIXED_SIZE - remaining;
+ return false;
+ }
+
+ preamble = ceph_decode_64(&buf);
+ if (PREAMBLE != preamble) {
+ *bytes_needed = 0;
+ return false;
+ }
+
+ buf += (HEADER_FIXED_SIZE - sizeof(preamble));
+ remaining = end - buf;
+ if (remaining < sizeof(uint32_t)) {
+ *bytes_needed = sizeof(uint32_t) - remaining;
+ return false;
+ }
+
+ data_size = ceph_decode_32(&buf);
+ remaining = end - buf;
+ if (remaining < data_size) {
+ *bytes_needed = data_size - remaining;
+ return false;
+ }
+
+ buf += data_size;
+
+ remaining = end - buf;
+ if (remaining < sizeof(uint32_t)) {
+ *bytes_needed = sizeof(uint32_t) - remaining;
+ return false;
+ }
+
+ *bytes_needed = 0;
+ crc = crc32c(0, origin_buf, buf - origin_buf);
+ crc_encoded = ceph_decode_32(&buf);
+ if (crc != crc_encoded) {
+ pr_err("crc corrupted");
+ return false;
+ }
+ return true;
+}
+
+static int playback_entry(struct ceph_journaler *journaler,
+ struct ceph_journaler_entry *entry,
+ uint64_t commit_tid)
+{
+ int ret = 0;
+
+ if (journaler->handle_entry != NULL)
+ ret = journaler->handle_entry(journaler->entry_handler,
+ entry, commit_tid);
+
+ return ret;
+}
+
+static bool get_last_entry_tid(struct ceph_journaler *journaler,
+ uint64_t tag_tid, uint64_t *entry_tid)
+{
+ struct entry_tid *pos = NULL;
+
+ spin_lock(&journaler->entry_tid_lock);
+ list_for_each_entry(pos, &journaler->entry_tids, node) {
+ if (pos->tag_tid == tag_tid) {
+ *entry_tid = pos->entry_tid;
+ spin_unlock(&journaler->entry_tid_lock);
+ return true;
+ }
+ }
+ spin_unlock(&journaler->entry_tid_lock);
+
+ return false;
+}
+
+// There would not be too many entry_tids here, we need
+// only one entry_tid for all entries with same tag_tid.
+static struct entry_tid *entry_tid_alloc(struct ceph_journaler *journaler,
+ uint64_t tag_tid)
+{
+ struct entry_tid *entry_tid;
+
+ entry_tid = kzalloc(sizeof(struct entry_tid), GFP_NOIO);
+ if (!entry_tid) {
+ pr_err("failed to allocate new entry.");
+ return NULL;
+ }
+
+ entry_tid->tag_tid = tag_tid;
+ entry_tid->entry_tid = 0;
+ INIT_LIST_HEAD(&entry_tid->node);
+
+ list_add_tail(&entry_tid->node, &journaler->entry_tids);
+ return entry_tid;
+}
+
+static int reserve_entry_tid(struct ceph_journaler *journaler,
+ uint64_t tag_tid, uint64_t entry_tid)
+{
+ struct entry_tid *pos;
+
+ spin_lock(&journaler->entry_tid_lock);
+ list_for_each_entry(pos, &journaler->entry_tids, node) {
+ if (pos->tag_tid == tag_tid) {
+ if (pos->entry_tid < entry_tid) {
+ pos->entry_tid = entry_tid;
+ }
+
+ spin_unlock(&journaler->entry_tid_lock);
+ return 0;
+ }
+ }
+
+ pos = entry_tid_alloc(journaler, tag_tid);
+ if (!pos) {
+ spin_unlock(&journaler->entry_tid_lock);
+ pr_err("failed to allocate new entry.");
+ return -ENOMEM;
+ }
+
+ pos->entry_tid = entry_tid;
+ spin_unlock(&journaler->entry_tid_lock);
+
+ return 0;
+}
+
+static void journaler_entry_free(struct ceph_journaler_entry *entry)
+{
+ if (entry->data)
+ kvfree(entry->data);
+ kfree(entry);
+}
+
+static struct ceph_journaler_entry *journaler_entry_decode(void **p, void *end)
+{
+ struct ceph_journaler_entry *entry = NULL;
+ uint64_t preamble = 0;
+ uint8_t version = 0;
+ uint32_t crc = 0, crc_encoded = 0;
+ void *start = *p;
+
+ preamble = ceph_decode_64(p);
+ if (PREAMBLE != preamble) {
+ return NULL;
+ }
+
+ version = ceph_decode_8(p);
+ if (version != 1)
+ return NULL;
+
+ entry = kzalloc(sizeof(struct ceph_journaler_entry), GFP_NOIO);
+ if (!entry) {
+ goto err;
+ }
+
+ INIT_LIST_HEAD(&entry->node);
+ entry->entry_tid = ceph_decode_64(p);
+ entry->tag_tid = ceph_decode_64(p);
+ entry->data = ceph_extract_encoded_string_kvmalloc(p, end,
+ &entry->data_len, GFP_KERNEL);
+ if (IS_ERR(entry->data)) {
+ entry->data = NULL;
+ goto free_entry;
+ }
+
+ crc = crc32c(0, start, *p - start);
+ crc_encoded = ceph_decode_32(p);
+ if (crc != crc_encoded) {
+ goto free_entry;
+ }
+ return entry;
+
+free_entry:
+ journaler_entry_free(entry);
+err:
+ return NULL;
+}
+
+static int fetch(struct ceph_journaler *journaler, uint64_t object_num)
+{
+ struct ceph_object_id object_oid;
+ int ret = 0;
+ void *read_buf = NULL, *end = NULL;
+ uint64_t read_len = 2 << journaler->order;
+ struct ceph_journaler_object_pos *pos;
+ struct object_replayer *obj_replayer = NULL;
+
+ obj_replayer = &journaler->obj_replayers[object_num % journaler->splay_width];
+ obj_replayer->object_num = object_num;
+ list_for_each_entry(pos, &journaler->client->object_positions, node) {
+ if (pos->in_using && pos->object_num == object_num) {
+ obj_replayer->pos = pos;
+ break;
+ }
+ }
+
+ ceph_oid_init(&object_oid);
+ ret = ceph_oid_aprintf(&object_oid, GFP_NOIO, "%s%llu",
+ journaler->object_oid_prefix, object_num);
+ if (ret) {
+ pr_err("failed to initialize object_id : %d", ret);
+ return ret;
+ }
+
+ read_buf = journaler->fetch_buf;
+ ret = ceph_journaler_obj_read_sync(journaler, &object_oid,
+ &journaler->data_oloc, read_buf,
+ 0, read_len);
+ if (ret == -ENOENT) {
+ dout("no such object, %s: %d", object_oid.name, ret);
+ goto err_free_object_oid;
+ } else if (ret < 0) {
+ pr_err("failed to read: %d", ret);
+ goto err_free_object_oid;
+ } else if (ret == 0) {
+ pr_err("no data: %d", ret);
+ goto err_free_object_oid;
+ }
+
+ end = read_buf + ret;
+ while (read_buf < end) {
+ uint32_t bytes_needed = 0;
+ struct ceph_journaler_entry *entry = NULL;
+
+ if (!entry_is_readable(journaler, read_buf, end, &bytes_needed)) {
+ ret = -EIO;
+ goto err_free_object_oid;
+ }
+
+ entry = journaler_entry_decode(&read_buf, end);
+ if (!entry)
+ goto err_free_object_oid;
+
+ list_add_tail(&entry->node, &obj_replayer->entry_list);
+ }
+ ret = 0;
+
+err_free_object_oid:
+ ceph_oid_destroy(&object_oid);
+ return ret;
+}
+
+static int add_commit_entry(struct ceph_journaler *journaler, uint64_t commit_tid,
+ uint64_t object_num, uint64_t tag_tid, uint64_t entry_tid)
+{
+ struct commit_entry *commit_entry = NULL;
+
+ commit_entry = kmem_cache_zalloc(journaler_commit_entry_cache, GFP_NOIO);
+ if (!commit_entry)
+ return -ENOMEM;
+
+ RB_CLEAR_NODE(&commit_entry->r_node);
+
+ commit_entry->commit_tid = commit_tid;
+ commit_entry->object_num = object_num;
+ commit_entry->tag_tid = tag_tid;
+ commit_entry->entry_tid = entry_tid;
+
+ mutex_lock(&journaler->commit_lock);
+ insert_commit_entry(&journaler->commit_entries, commit_entry);
+ mutex_unlock(&journaler->commit_lock);
+
+ return 0;
+}
+
+static uint64_t allocate_commit_tid(struct ceph_journaler *journaler)
+{
+ return ++journaler->commit_tid;
+}
+
+static void prune_tag(struct ceph_journaler *journaler, uint64_t tag_tid)
+{
+ struct ceph_journaler_entry *entry, *next;
+ struct object_replayer *obj_replayer = NULL;
+ int i = 0;
+
+ if (journaler->prune_tag_tid == UINT_MAX ||
+ journaler->prune_tag_tid < tag_tid) {
+ journaler->prune_tag_tid = tag_tid;
+ }
+
+ for (i = 0; i < journaler->splay_width; i++) {
+ obj_replayer = &journaler->obj_replayers[i];
+ list_for_each_entry_safe(entry, next,
+ &obj_replayer->entry_list, node) {
+ if (entry->tag_tid == tag_tid) {
+ list_del(&entry->node);
+ journaler_entry_free(entry);
+ }
+ }
+ }
+}
+
+static int get_first_entry(struct ceph_journaler *journaler,
+ struct ceph_journaler_entry **entry,
+ uint64_t *commit_tid)
+{
+ struct object_replayer *obj_replayer = NULL;
+ struct ceph_journaler_entry *tmp_entry = NULL;
+ uint64_t last_entry_tid = 0;
+ int ret = 0;
+
+next:
+ obj_replayer = &journaler->obj_replayers[journaler->splay_offset];
+ if (list_empty(&obj_replayer->entry_list)) {
+ return -ENOENT;
+ }
+
+ tmp_entry = list_first_entry(&obj_replayer->entry_list,
+ struct ceph_journaler_entry, node);
+
+ journaler->splay_offset = (journaler->splay_offset + 1) % journaler->splay_width;
+ if (journaler->active_tag_tid == UINT_MAX) {
+ journaler->active_tag_tid = tmp_entry->tag_tid;
+ } else if (tmp_entry->tag_tid < journaler->active_tag_tid ||
+ (journaler->prune_tag_tid != UINT_MAX &&
+ tmp_entry->tag_tid <= journaler->prune_tag_tid)) {
+ list_del(&tmp_entry->node);
+ journaler_entry_free(tmp_entry);
+ prune_tag(journaler, tmp_entry->tag_tid);
+ goto next;
+ } else if (tmp_entry->tag_tid > journaler->active_tag_tid) {
+ prune_tag(journaler, journaler->active_tag_tid);
+ journaler->active_tag_tid = tmp_entry->tag_tid;
+
+ if (tmp_entry->entry_tid != 0) {
+ journaler->splay_offset = 0;
+ goto next;
+ }
+ }
+
+ list_del(&tmp_entry->node);
+ ret = get_last_entry_tid(journaler, tmp_entry->tag_tid, &last_entry_tid);
+ if (ret && tmp_entry->entry_tid != last_entry_tid + 1) {
+ pr_err("missing prior journal entry, last_entry_tid: %llu",
+ last_entry_tid);
+ ret = -ENOMSG;
+ goto free_entry;
+ }
+
+ if (list_empty(&obj_replayer->entry_list)) {
+ ret = fetch(journaler, obj_replayer->object_num + journaler->splay_width);
+ if (ret && ret != -ENOENT) {
+ goto free_entry;
+ }
+ }
+
+ ret = reserve_entry_tid(journaler, tmp_entry->tag_tid, tmp_entry->entry_tid);
+ if (ret)
+ goto free_entry;
+
+ *commit_tid = allocate_commit_tid(journaler);
+ ret = add_commit_entry(journaler, *commit_tid, obj_replayer->object_num,
+ tmp_entry->tag_tid, tmp_entry->entry_tid);
+ if (ret)
+ goto free_entry;
+
+ *entry = tmp_entry;
+ return 0;
+
+free_entry:
+ journaler_entry_free(tmp_entry);
+ return ret;
+}
+
+static int process_replay(struct ceph_journaler *journaler)
+{
+ int r = 0;
+ struct ceph_journaler_entry *entry = NULL;
+ uint64_t commit_tid = 0;
+
+next:
+ r = get_first_entry(journaler, &entry, &commit_tid);
+ if (r) {
+ if (r == -ENOENT) {
+ prune_tag(journaler, journaler->active_tag_tid);
+ r = 0;
+ }
+ return r;
+ }
+
+ r = playback_entry(journaler, entry, commit_tid);
+ journaler_entry_free(entry);
+ if (r) {
+ return r;
+ }
+
+ goto next;
+}
+
+static int preprocess_replay(struct ceph_journaler *journaler)
+{
+ struct ceph_journaler_entry *entry, *next;
+ bool found_commit = false;
+ struct object_replayer *obj_replayer = NULL;
+ int i = 0;
+ int ret = 0;
+
+ for (i = 0; i < journaler->splay_width; i++) {
+ obj_replayer = &journaler->obj_replayers[i];
+
+ if (!obj_replayer->pos)
+ continue;
+
+ found_commit = false;
+ list_for_each_entry_safe(entry, next,
+ &obj_replayer->entry_list, node) {
+ if (entry->tag_tid == obj_replayer->pos->tag_tid &&
+ entry->entry_tid == obj_replayer->pos->entry_tid) {
+ found_commit = true;
+ } else if (found_commit) {
+ break;
+ }
+
+ ret = reserve_entry_tid(journaler, entry->tag_tid, entry->entry_tid);
+ if (ret)
+ return ret;
+ list_del(&entry->node);
+ journaler_entry_free(entry);
+ }
+ }
+ return 0;
+}
+
+int ceph_journaler_start_replay(struct ceph_journaler *journaler)
+{
+ struct ceph_journaler_object_pos *active_pos = NULL;
+ uint64_t *fetch_objects = NULL;
+ uint64_t buf_len = (2 << journaler->order);
+ uint64_t object_num;
+ int i = 0;
+ int ret = 0;
+
+ fetch_objects = kzalloc(sizeof(uint64_t) * journaler->splay_width, GFP_NOIO);
+ if (!fetch_objects) {
+ return -ENOMEM;
+ }
+
+ mutex_lock(&journaler->meta_lock);
+ active_pos = list_first_entry(&journaler->client->object_positions,
+ struct ceph_journaler_object_pos, node);
+ if (active_pos->in_using) {
+ journaler->splay_offset = (active_pos->object_num + 1) % journaler->splay_width;
+ journaler->active_tag_tid = active_pos->tag_tid;
+
+ list_for_each_entry(active_pos, &journaler->client->object_positions, node) {
+ if (active_pos->in_using) {
+ fetch_objects[active_pos->object_num %
+ journaler->splay_width] = active_pos->object_num;
+ }
+ }
+ }
+ mutex_unlock(&journaler->meta_lock);
+
+ journaler->fetch_buf = ceph_kvmalloc(buf_len, GFP_NOIO);
+ if (!journaler->fetch_buf) {
+ pr_err("failed to alloc fetch buf: %llu", buf_len);
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ for (i = 0; i < journaler->splay_width; i++) {
+ if (fetch_objects[i] == 0) {
+ object_num = i;
+ } else {
+ object_num = fetch_objects[i];
+ }
+ ret = fetch(journaler, object_num);
+ if (ret && ret != -ENOENT)
+ goto free_fetch_buf;
+ }
+
+ ret = preprocess_replay(journaler);
+ if (ret)
+ goto free_fetch_buf;
+
+ ret = process_replay(journaler);
+
+free_fetch_buf:
+ kvfree(journaler->fetch_buf);
+out:
+ for (i = 0; i < journaler->splay_width; i++) {
+ struct object_replayer *obj_replayer = &journaler->obj_replayers[i];
+ struct ceph_journaler_entry *entry = NULL, *next_entry = NULL;
+
+ spin_lock(&obj_replayer->lock);
+ list_for_each_entry_safe(entry, next_entry, &obj_replayer->entry_list, node) {
+ list_del(&entry->node);
+ journaler_entry_free(entry);
+ }
+ spin_unlock(&obj_replayer->lock);
+ }
+ kfree(fetch_objects);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_start_replay);
When we are going to make sure the data and journal are consistent in opening journal, we can call the api of start_replay() to replay the all events recorded but not committed. Signed-off-by: Dongsheng Yang <dongsheng.yang@easystack.cn> --- net/ceph/journaler.c | 568 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 568 insertions(+)