new file mode 100644
@@ -0,0 +1,89 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _LINUX_CEPH_CLS_JOURNAL_CLIENT_H
+#define _LINUX_CEPH_CLS_JOURNAL_CLIENT_H
+
+#include <linux/ceph/osd_client.h>
+
+struct ceph_journaler;
+struct ceph_journaler_client;
+
+struct ceph_journaler_object_pos {
+ struct list_head node;
+ u64 object_num;
+ u64 tag_tid;
+ u64 entry_tid;
+};
+
+struct ceph_journaler_client {
+ struct list_head node;
+ size_t id_len;
+ char *id;
+ size_t data_len;
+ char *data;
+ struct list_head object_positions;
+};
+
+struct ceph_journaler_tag {
+ uint64_t tid;
+ uint64_t tag_class;
+ size_t data_len;
+ char *data;
+};
+
+void destroy_client(struct ceph_journaler_client *client);
+
+int ceph_cls_journaler_get_immutable_metas(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ uint8_t *order,
+ uint8_t *splay_width,
+ int64_t *pool_id);
+
+int ceph_cls_journaler_get_mutable_metas(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ uint64_t *minimum_set, uint64_t *active_set);
+
+int ceph_cls_journaler_client_list(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ struct ceph_journaler_client **clients,
+ uint32_t *client_num);
+
+int ceph_cls_journaler_get_next_tag_tid(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ uint64_t *tag_tid);
+
+int ceph_cls_journaler_get_tag(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ uint64_t tag_tid, struct ceph_journaler_tag *tag);
+
+int ceph_cls_journaler_tag_create(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ uint64_t tag_tid, uint64_t tag_class,
+ void *buf, uint32_t buf_len);
+
+int ceph_cls_journaler_client_committed(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ struct ceph_journaler_client *client,
+ struct list_head *object_positions);
+
+int ceph_cls_journaler_set_active_set(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ uint64_t active_set);
+
+int ceph_cls_journaler_set_minimum_set(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ uint64_t minimum_set);
+
+int ceph_cls_journaler_guard_append(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ uint64_t soft_limit);
+#endif
new file mode 100644
@@ -0,0 +1,535 @@
+// SPDX-License-Identifier: GPL-2.0
+#include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/cls_journaler_client.h>
+#include <linux/ceph/journaler.h>
+
+#include <linux/types.h>
+
+//TODO get all metas in one single request
+int ceph_cls_journaler_get_immutable_metas(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ uint8_t *order,
+ uint8_t *splay_width,
+ int64_t *pool_id)
+{
+ struct page *reply_page;
+ size_t reply_len = sizeof(*order);
+ int ret = 0;
+
+ reply_page = alloc_page(GFP_NOIO);
+ if (!reply_page)
+ return -ENOMEM;
+
+ ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_order",
+ CEPH_OSD_FLAG_READ, NULL,
+ 0, reply_page, &reply_len);
+
+ if (!ret) {
+ memcpy(order, page_address(reply_page), reply_len);
+ } else {
+ goto out;
+ }
+
+ reply_len = sizeof(*splay_width);
+ ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_splay_width",
+ CEPH_OSD_FLAG_READ, NULL,
+ 0, reply_page, &reply_len);
+
+ if (!ret) {
+ memcpy(splay_width, page_address(reply_page), reply_len);
+ } else {
+ goto out;
+ }
+
+ reply_len = sizeof(*pool_id);
+ ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_pool_id",
+ CEPH_OSD_FLAG_READ, NULL,
+ 0, reply_page, &reply_len);
+
+ if (!ret) {
+ memcpy(pool_id, page_address(reply_page), reply_len);
+ } else {
+ goto out;
+ }
+out:
+ __free_page(reply_page);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_cls_journaler_get_immutable_metas);
+
+//TODO get all metas in one single request
+int ceph_cls_journaler_get_mutable_metas(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ uint64_t *minimum_set, uint64_t *active_set)
+{
+ struct page *reply_page;
+ int ret = 0;
+ size_t reply_len = sizeof(*minimum_set);
+
+ reply_page = alloc_page(GFP_NOIO);
+ if (!reply_page)
+ return -ENOMEM;
+
+ ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_minimum_set",
+ CEPH_OSD_FLAG_READ, NULL,
+ 0, reply_page, &reply_len);
+
+ if (!ret) {
+ memcpy(minimum_set, page_address(reply_page), reply_len);
+ } else {
+ goto out;
+ }
+
+ reply_len = sizeof(active_set);
+ ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_active_set",
+ CEPH_OSD_FLAG_READ, NULL,
+ 0, reply_page, &reply_len);
+
+ if (!ret) {
+ memcpy(active_set, page_address(reply_page), reply_len);
+ } else {
+ goto out;
+ }
+out:
+ __free_page(reply_page);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_cls_journaler_get_mutable_metas);
+
+static int decode_object_position(void **p, void *end, struct ceph_journaler_object_pos *pos)
+{
+ u8 struct_v;
+ u32 struct_len;
+ int ret = 0;
+
+ u64 object_num = 0;
+ u64 tag_tid = 0;
+ u64 entry_tid = 0;
+
+ ret = ceph_start_decoding(p, end, 1, "cls_journal_object_position",
+ &struct_v, &struct_len);
+ if (ret)
+ return ret;
+
+ object_num = ceph_decode_64(p);
+
+ tag_tid = ceph_decode_64(p);
+
+ entry_tid = ceph_decode_64(p);
+
+ pos->object_num = object_num;
+ pos->tag_tid = tag_tid;
+ pos->entry_tid = entry_tid;
+
+ return ret;
+}
+
+void destroy_client(struct ceph_journaler_client *client)
+{
+ struct ceph_journaler_object_pos *pos, *next;
+
+ list_for_each_entry_safe(pos, next, &client->object_positions, node) {
+ list_del(&pos->node);
+ kfree(pos);
+ }
+ kfree(client->id);
+ kfree(client->data);
+}
+
+static int decode_client(void **p, void *end, struct ceph_journaler_client *client)
+{
+ u8 struct_v;
+ u8 state_raw;
+ u32 struct_len;
+ int ret = 0, num = 0, i = 0;
+ struct ceph_journaler_object_pos *pos, *next_pos;
+
+ INIT_LIST_HEAD(&client->node);
+ INIT_LIST_HEAD(&client->object_positions);
+ ret = ceph_start_decoding(p, end, 1, "cls_journal_get_client_reply",
+ &struct_v, &struct_len);
+ if (ret)
+ return ret;
+
+ client->id = ceph_extract_encoded_string(p, end, &client->id_len, GFP_NOIO);
+ if (IS_ERR(client->id)) {
+ ret = PTR_ERR(client->id);
+ goto err;
+ }
+ client->data = ceph_extract_encoded_string(p, end, &client->data_len, GFP_NOIO);
+ if (IS_ERR(client->data)) {
+ ret = PTR_ERR(client->data);
+ goto free_id;
+ }
+ ret = ceph_start_decoding(p, end, 1, "cls_joural_client_object_set_position",
+ &struct_v, &struct_len);
+ if (ret)
+ goto free_data;
+
+ num = ceph_decode_32(p);
+ for (i = 0; i < num; i++) {
+ pos = kzalloc(sizeof(*pos), GFP_KERNEL);
+ if (!pos)
+ goto free_positions;
+
+ ret = decode_object_position(p, end, pos);
+ if (ret) {
+ kfree(pos);
+ goto free_positions;
+ }
+ list_add_tail(&pos->node, &client->object_positions);
+ }
+
+ state_raw = ceph_decode_8(p);
+ return 0;
+
+free_positions:
+ list_for_each_entry_safe(pos, next_pos, &client->object_positions, node) {
+ list_del(&pos->node);
+ kfree(pos);
+ }
+free_data:
+ kfree(client->data);
+free_id:
+ kfree(client->id);
+err:
+ return ret;
+}
+
+static int decode_clients(void **p, void *end, struct ceph_journaler_client **clients, uint32_t *client_num)
+{
+ int i;
+ int ret = 0;
+
+ *client_num = ceph_decode_32(p);
+ if (ret)
+ return ret;
+
+ *clients = kcalloc(*client_num, sizeof(**clients), GFP_NOIO);
+ if (!*clients)
+ return -ENOMEM;
+
+ for (i = 0; i < *client_num; i++) {
+ ret = decode_client(p, end, *clients + i);
+ if (ret)
+ goto err;
+ }
+ return 0;
+
+err:
+ for (--i; i >= 0; i--)
+ destroy_client(*clients + i);
+ kfree(*clients);
+
+ return ret;
+}
+
+int ceph_cls_journaler_client_list(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ struct ceph_journaler_client **clients,
+ uint32_t *client_num)
+{
+ struct page *reply_page;
+ struct page *req_page;
+ int ret = 0;
+ size_t reply_len = PAGE_SIZE;
+ int buf_size;
+ void *p, *end;
+ char name[] = "";
+
+ buf_size = strlen(name) + sizeof(__le32) + sizeof(uint64_t);
+
+ if (buf_size > PAGE_SIZE)
+ return -E2BIG;
+
+ reply_page = alloc_page(GFP_NOIO);
+ if (!reply_page)
+ return -ENOMEM;
+
+ req_page = alloc_page(GFP_NOIO);
+ if (!req_page) {
+ ret = -ENOMEM;
+ goto free_reply_page;
+ }
+
+ p = page_address(req_page);
+ end = p + buf_size;
+
+ ceph_encode_string(&p, end, name, strlen(name));
+ ceph_encode_64(&p, (uint64_t)256);
+
+ ret = ceph_osdc_call(osdc, oid, oloc, "journal", "client_list",
+ CEPH_OSD_FLAG_READ, req_page,
+ buf_size, reply_page, &reply_len);
+
+ if (!ret) {
+ p = page_address(reply_page);
+ end = p + reply_len;
+
+ ret = decode_clients(&p, end, clients, client_num);
+ }
+
+ __free_page(req_page);
+free_reply_page:
+ __free_page(reply_page);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_cls_journaler_client_list);
+
+int ceph_cls_journaler_get_next_tag_tid(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ uint64_t *tag_tid)
+{
+ struct page *reply_page;
+ int ret = 0;
+ size_t reply_len = PAGE_SIZE;
+
+ reply_page = alloc_page(GFP_NOIO);
+ if (!reply_page)
+ return -ENOMEM;
+
+ ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_next_tag_tid",
+ CEPH_OSD_FLAG_READ, NULL,
+ 0, reply_page, &reply_len);
+
+ if (!ret) {
+ memcpy(tag_tid, page_address(reply_page), reply_len);
+ }
+
+ __free_page(reply_page);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_cls_journaler_get_next_tag_tid);
+
+int ceph_cls_journaler_tag_create(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ uint64_t tag_tid, uint64_t tag_class,
+ void *buf, uint32_t buf_len)
+{
+ struct page *req_page;
+ int ret = 0;
+ int buf_size;
+ void *p, *end;
+
+ buf_size = buf_len + sizeof(__le32) + sizeof(uint64_t) + sizeof(uint64_t);
+
+ if (buf_size > PAGE_SIZE)
+ return -E2BIG;
+
+ req_page = alloc_page(GFP_NOIO);
+ if (!req_page)
+ return -ENOMEM;
+
+ p = page_address(req_page);
+ end = p + buf_size;
+
+ ceph_encode_64(&p, tag_tid);
+ ceph_encode_64(&p, tag_class);
+ ceph_encode_string(&p, end, buf, buf_len);
+
+ ret = ceph_osdc_call(osdc, oid, oloc, "journal", "tag_create",
+ CEPH_OSD_FLAG_WRITE, req_page,
+ buf_size, NULL, NULL);
+
+ __free_page(req_page);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_cls_journaler_tag_create);
+
+int decode_tag(void **p, void *end, struct ceph_journaler_tag *tag)
+{
+ int ret = 0;
+ u8 struct_v;
+ u32 struct_len;
+
+ ret = ceph_start_decoding(p, end, 1, "cls_journaler_tag",
+ &struct_v, &struct_len);
+ if (ret)
+ return ret;
+
+ tag->tid = ceph_decode_64(p);
+ tag->tag_class = ceph_decode_64(p);
+ tag->data = ceph_extract_encoded_string(p, end, &tag->data_len, GFP_NOIO);
+
+ return 0;
+}
+
+int ceph_cls_journaler_get_tag(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ uint64_t tag_tid, struct ceph_journaler_tag *tag)
+{
+ struct page *reply_page;
+ struct page *req_page;
+ int ret = 0;
+ size_t reply_len = PAGE_SIZE;
+ int buf_size;
+ void *p, *end;
+
+ buf_size = sizeof(tag_tid);
+
+ reply_page = alloc_page(GFP_NOIO);
+ if (!reply_page)
+ return -ENOMEM;
+
+ req_page = alloc_page(GFP_NOIO);
+ if (!req_page)
+ goto free_reply_page;
+
+ p = page_address(req_page);
+ end = p + buf_size;
+
+ ceph_encode_64(&p, tag_tid);
+
+ ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_tag",
+ CEPH_OSD_FLAG_READ, req_page,
+ buf_size, reply_page, &reply_len);
+
+ if (!ret) {
+ p = page_address(reply_page);
+ end = p + reply_len;
+
+ ret = decode_tag(&p, end, tag);
+ }
+
+ __free_page(req_page);
+free_reply_page:
+ __free_page(reply_page);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_cls_journaler_get_tag);
+
+static int version_len = 6;
+
+int ceph_cls_journaler_client_committed(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ struct ceph_journaler_client *client,
+ struct list_head *object_positions)
+{
+ struct page *req_page;
+ int ret = 0;
+ int buf_size;
+ void *p, *end;
+ struct ceph_journaler_object_pos *position = NULL;
+
+ int object_position_len = version_len + 8 + 8 + 8;
+
+ int pos_num = 0;
+
+ buf_size = 4 + client->id_len + version_len + 4;
+
+ list_for_each_entry(position, object_positions, node) {
+ buf_size += object_position_len;
+ pos_num++;
+ }
+
+ if (buf_size > PAGE_SIZE)
+ return -E2BIG;
+
+ req_page = alloc_page(GFP_NOIO);
+ if (!req_page)
+ return -ENOMEM;
+
+ p = page_address(req_page);
+ end = p + buf_size;
+
+ ceph_encode_string(&p, end, client->id, client->id_len);
+
+ ceph_start_encoding(&p, 1, 1, buf_size - client->id_len - version_len - 4);
+
+ ceph_encode_32(&p, pos_num);
+
+ list_for_each_entry(position, object_positions, node) {
+ ceph_start_encoding(&p, 1, 1, 24);
+ ceph_encode_64(&p, position->object_num);
+ ceph_encode_64(&p, position->tag_tid);
+ ceph_encode_64(&p, position->entry_tid);
+ }
+
+ ret = ceph_osdc_call(osdc, oid, oloc, "journal", "client_commit",
+ CEPH_OSD_FLAG_WRITE, req_page,
+ buf_size, NULL, NULL);
+
+ __free_page(req_page);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_cls_journaler_client_committed);
+
+
+int ceph_cls_journaler_set_minimum_set(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ uint64_t minimum_set)
+{
+ struct page *req_page;
+ int ret = 0;
+ void *p;
+
+ req_page = alloc_page(GFP_NOIO);
+ if (!req_page)
+ return -ENOMEM;
+
+ p = page_address(req_page);
+ ceph_encode_64(&p, minimum_set);
+ ret = ceph_osdc_call(osdc, oid, oloc, "journal", "set_minimum_set",
+ CEPH_OSD_FLAG_WRITE, req_page,
+ 8, NULL, NULL);
+
+ __free_page(req_page);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_cls_journaler_set_minimum_set);
+
+int ceph_cls_journaler_set_active_set(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ uint64_t active_set)
+{
+ struct page *req_page;
+ int ret = 0;
+ void *p;
+
+ req_page = alloc_page(GFP_NOIO);
+ if (!req_page)
+ return -ENOMEM;
+
+ p = page_address(req_page);
+ ceph_encode_64(&p, active_set);
+ ret = ceph_osdc_call(osdc, oid, oloc, "journal", "set_active_set",
+ CEPH_OSD_FLAG_WRITE, req_page,
+ 8, NULL, NULL);
+
+ __free_page(req_page);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_cls_journaler_set_active_set);
+
+int ceph_cls_journaler_guard_append(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ uint64_t soft_limit)
+{
+ struct page *req_page;
+ int ret = 0;
+ void *p;
+
+ req_page = alloc_page(GFP_NOIO);
+ if (!req_page)
+ return -ENOMEM;
+
+ p = page_address(req_page);
+ ceph_encode_64(&p, soft_limit);
+ ret = ceph_osdc_call(osdc, oid, oloc, "journal", "guard_append",
+ CEPH_OSD_FLAG_READ, req_page,
+ 8, NULL, NULL);
+
+ __free_page(req_page);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_cls_journaler_guard_append);
This is a cls client module for journaler. Signed-off-by: Dongsheng Yang <dongsheng.yang@easystack.cn> --- include/linux/ceph/cls_journaler_client.h | 89 +++++ net/ceph/cls_journaler_client.c | 535 ++++++++++++++++++++++++++++++ 2 files changed, 624 insertions(+) create mode 100644 include/linux/ceph/cls_journaler_client.h create mode 100644 net/ceph/cls_journaler_client.c