Message ID | 1564393377-28949-6-git-send-email-dongsheng.yang@easystack.cn (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | rbd journaling feature | expand |
On Mon, Jul 29, 2019 at 11:43 AM Dongsheng Yang <dongsheng.yang@easystack.cn> wrote: > > This is a cls client module for journaler. > > Signed-off-by: Dongsheng Yang <dongsheng.yang@easystack.cn> > --- > include/linux/ceph/cls_journaler_client.h | 94 +++++ > net/ceph/cls_journaler_client.c | 558 ++++++++++++++++++++++++++++++ > 2 files changed, 652 insertions(+) > create mode 100644 include/linux/ceph/cls_journaler_client.h > create mode 100644 net/ceph/cls_journaler_client.c > > diff --git a/include/linux/ceph/cls_journaler_client.h b/include/linux/ceph/cls_journaler_client.h > new file mode 100644 > index 0000000..6245882 > --- /dev/null > +++ b/include/linux/ceph/cls_journaler_client.h > @@ -0,0 +1,94 @@ > +/* SPDX-License-Identifier: GPL-2.0 */ > +#ifndef _LINUX_CEPH_CLS_JOURNAL_CLIENT_H > +#define _LINUX_CEPH_CLS_JOURNAL_CLIENT_H > + > +#include <linux/ceph/osd_client.h> > + > +struct ceph_journaler; > +struct ceph_journaler_client; > + > +struct ceph_journaler_object_pos { > + struct list_head node; > + u64 object_num; > + u64 tag_tid; > + u64 entry_tid; > + // ->in_using means this object_pos is initialized. > + // There would be some stub for it created in init step > + // to allocate memory as early as possible. > + bool in_using; > +}; > + > +struct ceph_journaler_client { > + struct list_head node; > + size_t id_len; > + char *id; > + size_t data_len; > + char *data; Is client->data ever used? If not, drop it and skip over this string when decoding. > + struct list_head object_positions; > + struct ceph_journaler_object_pos *object_positions_array; > +}; > + > +struct ceph_journaler_tag { > + uint64_t tid; > + uint64_t tag_class; > + size_t data_len; > + char *data; Same here, is tag->data ever used? > +}; > + > +void destroy_client(struct ceph_journaler_client *client); > + > +int ceph_cls_journaler_get_immutable_metas(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint8_t *order, > + uint8_t *splay_width, > + int64_t *pool_id); > + > +int ceph_cls_journaler_get_mutable_metas(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t *minimum_set, uint64_t *active_set); > + > +int ceph_cls_journaler_client_list(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + struct list_head *clients, > + uint8_t splay_width); > + > +int ceph_cls_journaler_get_next_tag_tid(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t *tag_tid); > + > +int ceph_cls_journaler_get_tag(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t tag_tid, struct ceph_journaler_tag *tag); > + > +int ceph_cls_journaler_tag_create(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t tag_tid, uint64_t tag_class, > + void *buf, uint32_t buf_len); > + > +int ceph_cls_journaler_client_committed(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + struct ceph_journaler_client *client, > + struct list_head *object_positions); > + > +int ceph_cls_journaler_set_active_set(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t active_set); > + > +int ceph_cls_journaler_set_minimum_set(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t minimum_set); > + > +int ceph_cls_journaler_guard_append(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t soft_limit); > +#endif > diff --git a/net/ceph/cls_journaler_client.c b/net/ceph/cls_journaler_client.c > new file mode 100644 > index 0000000..ac27589 > --- /dev/null > +++ b/net/ceph/cls_journaler_client.c > @@ -0,0 +1,558 @@ > +// SPDX-License-Identifier: GPL-2.0 > +#include <linux/ceph/ceph_debug.h> > +#include <linux/ceph/cls_journaler_client.h> > +#include <linux/ceph/journaler.h> > + > +#include <linux/types.h> > + > +//TODO get all metas in one single request > +int ceph_cls_journaler_get_immutable_metas(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint8_t *order, > + uint8_t *splay_width, > + int64_t *pool_id) > +{ > + struct page *reply_page; > + size_t reply_len = sizeof(*order); > + int ret; > + > + reply_page = alloc_page(GFP_NOIO); > + if (!reply_page) > + return -ENOMEM; > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_order", > + CEPH_OSD_FLAG_READ, NULL, > + 0, &reply_page, &reply_len); > + if (!ret) { > + memcpy(order, page_address(reply_page), reply_len); > + } else { > + goto out; > + } > + reply_len = sizeof(*splay_width); > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_splay_width", > + CEPH_OSD_FLAG_READ, NULL, > + 0, &reply_page, &reply_len); > + if (!ret) { > + memcpy(splay_width, page_address(reply_page), reply_len); > + } else { > + goto out; > + } > + reply_len = sizeof(*pool_id); > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_pool_id", > + CEPH_OSD_FLAG_READ, NULL, > + 0, &reply_page, &reply_len); > + if (!ret) { > + memcpy(pool_id, page_address(reply_page), reply_len); This memcpy() is not going to work on big endian machines. Use either ceph_decode_*() or le*_to_cpu() helpers. Also, check for errors instead of success: ret = ceph_osdc_call(...); if (ret) goto out; ... decode ... > + } else { > + goto out; > + } > +out: > + __free_page(reply_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_get_immutable_metas); > + > +//TODO get all metas in one single request > +int ceph_cls_journaler_get_mutable_metas(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t *minimum_set, uint64_t *active_set) > +{ > + struct page *reply_page; > + int ret; > + size_t reply_len = sizeof(*minimum_set); > + > + reply_page = alloc_page(GFP_NOIO); > + if (!reply_page) > + return -ENOMEM; > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_minimum_set", > + CEPH_OSD_FLAG_READ, NULL, > + 0, &reply_page, &reply_len); > + if (!ret) { > + memcpy(minimum_set, page_address(reply_page), reply_len); Same here. > + } else { > + goto out; > + } > + reply_len = sizeof(active_set); > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_active_set", > + CEPH_OSD_FLAG_READ, NULL, > + 0, &reply_page, &reply_len); > + if (!ret) { > + memcpy(active_set, page_address(reply_page), reply_len); Same here. > + } else { > + goto out; > + } > +out: > + __free_page(reply_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_get_mutable_metas); > + > +static int decode_object_position(void **p, void *end, struct ceph_journaler_object_pos *pos) > +{ > + u8 struct_v; > + u32 struct_len; > + int ret; > + u64 object_num; > + u64 tag_tid; > + u64 entry_tid; > + > + ret = ceph_start_decoding(p, end, 1, "cls_journal_object_position", > + &struct_v, &struct_len); > + if (ret) > + return ret; > + > + object_num = ceph_decode_64(p); > + tag_tid = ceph_decode_64(p); > + entry_tid = ceph_decode_64(p); > + > + pos->object_num = object_num; > + pos->tag_tid = tag_tid; > + pos->entry_tid = entry_tid; Can decode directly into pos, without going through local variables. > + > + return ret; > +} > + > +void destroy_client(struct ceph_journaler_client *client) > +{ > + kfree(client->object_positions_array); > + kfree(client->id); > + kfree(client->data); > + > + kfree(client); > +} > + > +struct ceph_journaler_client *create_client(uint8_t splay_width) > +{ > + struct ceph_journaler_client *client; > + struct ceph_journaler_object_pos *pos; > + int i; > + > + client = kzalloc(sizeof(*client), GFP_NOIO); > + if (!client) > + return NULL; > + > + client->object_positions_array = kcalloc(splay_width, sizeof(*pos), GFP_NOIO); > + if (!client->object_positions_array) > + goto free_client; > + > + INIT_LIST_HEAD(&client->object_positions); > + for (i = 0; i < splay_width; i++) { > + pos = &client->object_positions_array[i]; > + INIT_LIST_HEAD(&pos->node); > + list_add_tail(&pos->node, &client->object_positions); > + } > + > + INIT_LIST_HEAD(&client->node); > + client->data = NULL; > + client->id = NULL; > + > + return client; > +free_client: > + kfree(client); > + return NULL; > +} > + > +static int decode_client(void **p, void *end, struct ceph_journaler_client *client) > +{ > + u8 struct_v; > + u8 state_raw; > + u32 struct_len; > + int ret, num, i; > + struct ceph_journaler_object_pos *pos; > + > + ret = ceph_start_decoding(p, end, 1, "cls_journal_get_client_reply", > + &struct_v, &struct_len); > + if (ret) > + return ret; > + > + client->id = ceph_extract_encoded_string(p, end, &client->id_len, GFP_NOIO); > + if (IS_ERR(client->id)) { > + ret = PTR_ERR(client->id); > + client->id = NULL; > + goto err; > + } > + client->data = ceph_extract_encoded_string(p, end, &client->data_len, GFP_NOIO); > + if (IS_ERR(client->data)) { > + ret = PTR_ERR(client->data); > + client->data = NULL; > + goto free_id; > + } > + ret = ceph_start_decoding(p, end, 1, "cls_joural_client_object_set_position", > + &struct_v, &struct_len); > + if (ret) > + goto free_data; > + > + num = ceph_decode_32(p); > + i = 0; > + list_for_each_entry(pos, &client->object_positions, node) { > + if (i < num) { > + // we will use this position stub > + pos->in_using = true; > + ret = decode_object_position(p, end, pos); > + if (ret) { > + goto free_data; > + } > + } else { > + // This stub is not used anymore > + pos->in_using = false; > + } > + i++; > + } > + > + state_raw = ceph_decode_8(p); Just skip over it (i.e. increment p with a comment), don't add bogus local variables. > + return 0; > + > +free_data: > + kfree(client->data); > +free_id: > + kfree(client->id); > +err: > + return ret; > +} > + > +static int decode_clients(void **p, void *end, struct list_head *clients, uint8_t splay_width) > +{ > + int i = 0; > + int ret; > + uint32_t client_num; > + struct ceph_journaler_client *client, *next; > + > + client_num = ceph_decode_32(p); > + // Reuse the clients already exist in list. > + list_for_each_entry_safe(client, next, clients, node) { > + // Some clients unregistered. > + if (i < client_num) { > + kfree(client->id); > + kfree(client->data); > + ret = decode_client(p, end, client); > + if (ret) > + return ret; > + } else { > + list_del(&client->node); > + destroy_client(client); > + } > + i++; > + } > + > + // Some more clients registered. > + for (; i < client_num; i++) { > + client = create_client(splay_width); > + if (!client) > + return -ENOMEM; > + ret = decode_client(p, end, client); > + if (ret) { > + destroy_client(client); > + return ret; > + } > + list_add_tail(&client->node, clients); > + } > + return 0; > +} > + > +int ceph_cls_journaler_client_list(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + struct list_head *clients, > + uint8_t splay_width) > +{ > + struct page *reply_page; > + struct page *req_page; > + int ret; > + size_t reply_len = PAGE_SIZE; > + int buf_size; > + void *p, *end; > + char name[] = ""; > + > + buf_size = strlen(name) + sizeof(__le32) + sizeof(uint64_t); > + > + if (buf_size > PAGE_SIZE) > + return -E2BIG; If name is always an empty string, this can't ever be true. > + > + reply_page = alloc_page(GFP_NOIO); > + if (!reply_page) > + return -ENOMEM; > + > + req_page = alloc_page(GFP_NOIO); > + if (!req_page) { > + ret = -ENOMEM; > + goto free_reply_page; > + } > + > + p = page_address(req_page); > + end = p + buf_size; > + > + ceph_encode_string(&p, end, name, strlen(name)); I would drop name and encode 0 directly. > + ceph_encode_64(&p, (uint64_t)256); This constant needs a define or a comment. Unnecessary cast. > + > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "client_list", > + CEPH_OSD_FLAG_READ, req_page, > + buf_size, &reply_page, &reply_len); > + > + if (!ret) { > + p = page_address(reply_page); > + end = p + reply_len; > + > + ret = decode_clients(&p, end, clients, splay_width); > + } > + > + __free_page(req_page); > +free_reply_page: > + __free_page(reply_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_client_list); > + > +int ceph_cls_journaler_get_next_tag_tid(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t *tag_tid) > +{ > + struct page *reply_page; > + int ret; > + size_t reply_len = PAGE_SIZE; > + > + reply_page = alloc_page(GFP_NOIO); > + if (!reply_page) > + return -ENOMEM; > + > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_next_tag_tid", > + CEPH_OSD_FLAG_READ, NULL, > + 0, &reply_page, &reply_len); > + > + if (!ret) { > + memcpy(tag_tid, page_address(reply_page), reply_len); Same here. > + } > + > + __free_page(reply_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_get_next_tag_tid); > + > +int ceph_cls_journaler_tag_create(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t tag_tid, uint64_t tag_class, > + void *buf, uint32_t buf_len) > +{ > + struct page *req_page; > + int ret; > + int buf_size; > + void *p, *end; > + > + buf_size = buf_len + sizeof(__le32) + sizeof(uint64_t) + sizeof(uint64_t); > + > + if (buf_size > PAGE_SIZE) > + return -E2BIG; > + > + req_page = alloc_page(GFP_NOIO); > + if (!req_page) > + return -ENOMEM; > + > + p = page_address(req_page); > + end = p + buf_size; > + > + ceph_encode_64(&p, tag_tid); > + ceph_encode_64(&p, tag_class); > + ceph_encode_string(&p, end, buf, buf_len); > + > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "tag_create", > + CEPH_OSD_FLAG_WRITE, req_page, > + buf_size, NULL, NULL); > + > + __free_page(req_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_tag_create); > + > +int decode_tag(void **p, void *end, struct ceph_journaler_tag *tag) > +{ > + int ret; > + u8 struct_v; > + u32 struct_len; > + > + ret = ceph_start_decoding(p, end, 1, "cls_journaler_tag", > + &struct_v, &struct_len); > + if (ret) > + return ret; > + > + tag->tid = ceph_decode_64(p); > + tag->tag_class = ceph_decode_64(p); > + tag->data = ceph_extract_encoded_string(p, end, &tag->data_len, GFP_NOIO); > + if (IS_ERR(tag->data)) { > + ret = PTR_ERR(tag->data); > + tag->data = NULL; > + return ret; > + } > + > + return 0; > +} > + > +int ceph_cls_journaler_get_tag(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t tag_tid, struct ceph_journaler_tag *tag) > +{ > + struct page *reply_page; > + struct page *req_page; > + int ret; > + size_t reply_len = PAGE_SIZE; > + int buf_size; > + void *p, *end; > + > + buf_size = sizeof(tag_tid); > + > + reply_page = alloc_page(GFP_NOIO); > + if (!reply_page) > + return -ENOMEM; > + > + req_page = alloc_page(GFP_NOIO); > + if (!req_page) { > + ret = -ENOMEM; > + goto free_reply_page; > + } > + > + p = page_address(req_page); > + end = p + buf_size; > + ceph_encode_64(&p, tag_tid); > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_tag", > + CEPH_OSD_FLAG_READ, req_page, > + buf_size, &reply_page, &reply_len); > + > + if (!ret) { > + p = page_address(reply_page); > + end = p + reply_len; > + > + ret = decode_tag(&p, end, tag); > + } > + > + __free_page(req_page); > +free_reply_page: > + __free_page(reply_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_get_tag); > + > +static int version_len = 6; Use CEPH_ENCODING_START_BLK_LEN. > + > +int ceph_cls_journaler_client_committed(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + struct ceph_journaler_client *client, > + struct list_head *object_positions) > +{ > + struct page *req_page; > + int ret; > + int buf_size; > + void *p, *end; > + struct ceph_journaler_object_pos *position = NULL; > + int object_position_len = version_len + 8 + 8 + 8; > + int pos_num = 0; > + > + buf_size = 4 + client->id_len + version_len + 4; > + list_for_each_entry(position, object_positions, node) { > + buf_size += object_position_len; > + pos_num++; > + } > + > + if (buf_size > PAGE_SIZE) > + return -E2BIG; > + > + req_page = alloc_page(GFP_NOIO); > + if (!req_page) > + return -ENOMEM; > + > + p = page_address(req_page); > + end = p + buf_size; > + ceph_encode_string(&p, end, client->id, client->id_len); > + ceph_start_encoding(&p, 1, 1, buf_size - client->id_len - version_len - 4); > + ceph_encode_32(&p, pos_num); > + > + list_for_each_entry(position, object_positions, node) { > + ceph_start_encoding(&p, 1, 1, 24); > + ceph_encode_64(&p, position->object_num); > + ceph_encode_64(&p, position->tag_tid); > + ceph_encode_64(&p, position->entry_tid); > + } > + > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "client_commit", > + CEPH_OSD_FLAG_WRITE, req_page, > + buf_size, NULL, NULL); > + > + __free_page(req_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_client_committed); > + > + > +int ceph_cls_journaler_set_minimum_set(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t minimum_set) > +{ > + struct page *req_page; > + int ret; > + void *p; > + > + req_page = alloc_page(GFP_NOIO); > + if (!req_page) > + return -ENOMEM; > + > + p = page_address(req_page); > + ceph_encode_64(&p, minimum_set); > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "set_minimum_set", > + CEPH_OSD_FLAG_WRITE, req_page, > + 8, NULL, NULL); > + > + __free_page(req_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_set_minimum_set); > + > +int ceph_cls_journaler_set_active_set(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t active_set) > +{ > + struct page *req_page; > + int ret; > + void *p; > + > + req_page = alloc_page(GFP_NOIO); > + if (!req_page) > + return -ENOMEM; > + > + p = page_address(req_page); > + ceph_encode_64(&p, active_set); > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "set_active_set", > + CEPH_OSD_FLAG_WRITE, req_page, > + 8, NULL, NULL); > + > + __free_page(req_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_set_active_set); > + > +int ceph_cls_journaler_guard_append(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t soft_limit) > +{ > + struct page *req_page; > + int ret; > + void *p; > + > + req_page = alloc_page(GFP_NOIO); > + if (!req_page) > + return -ENOMEM; > + > + p = page_address(req_page); > + ceph_encode_64(&p, soft_limit); > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "guard_append", > + CEPH_OSD_FLAG_READ, req_page, > + 8, NULL, NULL); > + > + __free_page(req_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_guard_append); Looks like this function is unused. Thanks, Ilya
On 08/19/2019 04:27 PM, Ilya Dryomov wrote: > On Mon, Jul 29, 2019 at 11:43 AM Dongsheng Yang > <dongsheng.yang@easystack.cn> wrote: >> This is a cls client module for journaler. >> >> Signed-off-by: Dongsheng Yang<dongsheng.yang@easystack.cn> >> --- >> include/linux/ceph/cls_journaler_client.h | 94 +++++ >> net/ceph/cls_journaler_client.c | 558 ++++++++++++++++++++++++++++++ >> 2 files changed, 652 insertions(+) >> create mode 100644 include/linux/ceph/cls_journaler_client.h >> create mode 100644 net/ceph/cls_journaler_client.c >> >> diff --git a/include/linux/ceph/cls_journaler_client.h b/include/linux/ceph/cls_journaler_client.h >> new file mode 100644 >> index 0000000..6245882 >> --- /dev/null >> +++ b/include/linux/ceph/cls_journaler_client.h >> @@ -0,0 +1,94 @@ >> +/* SPDX-License-Identifier: GPL-2.0 */ >> +#ifndef _LINUX_CEPH_CLS_JOURNAL_CLIENT_H >> +#define _LINUX_CEPH_CLS_JOURNAL_CLIENT_H >> + >> +#include <linux/ceph/osd_client.h> >> + >> +struct ceph_journaler; >> +struct ceph_journaler_client; >> + >> +struct ceph_journaler_object_pos { >> + struct list_head node; >> + u64 object_num; >> + u64 tag_tid; >> + u64 entry_tid; >> + // ->in_using means this object_pos is initialized. >> + // There would be some stub for it created in init step >> + // to allocate memory as early as possible. >> + bool in_using; >> +}; >> + >> +struct ceph_journaler_client { >> + struct list_head node; >> + size_t id_len; >> + char *id; >> + size_t data_len; >> + char *data; > Is client->data ever used? If not, drop it and skip over this string > when decoding. agreed >> + struct list_head object_positions; >> + struct ceph_journaler_object_pos *object_positions_array; >> +}; >> + >> +struct ceph_journaler_tag { >> + uint64_t tid; >> + uint64_t tag_class; >> + size_t data_len; >> + char *data; > Same here, is tag->data ever used? agreed >> +}; >> + >> +void destroy_client(struct ceph_journaler_client *client); >> + >> +int ceph_cls_journaler_get_immutable_metas(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + uint8_t *order, >> + uint8_t *splay_width, >> + int64_t *pool_id); >> + >> +int ceph_cls_journaler_get_mutable_metas(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + uint64_t *minimum_set, uint64_t *active_set); >> + >> +int ceph_cls_journaler_client_list(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + struct list_head *clients, >> + uint8_t splay_width); >> + >> +int ceph_cls_journaler_get_next_tag_tid(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + uint64_t *tag_tid); >> + >> +int ceph_cls_journaler_get_tag(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + uint64_t tag_tid, struct ceph_journaler_tag *tag); >> + >> +int ceph_cls_journaler_tag_create(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + uint64_t tag_tid, uint64_t tag_class, >> + void *buf, uint32_t buf_len); >> + >> +int ceph_cls_journaler_client_committed(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + struct ceph_journaler_client *client, >> + struct list_head *object_positions); >> + >> +int ceph_cls_journaler_set_active_set(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + uint64_t active_set); >> + >> +int ceph_cls_journaler_set_minimum_set(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + uint64_t minimum_set); >> + >> +int ceph_cls_journaler_guard_append(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + uint64_t soft_limit); >> +#endif >> diff --git a/net/ceph/cls_journaler_client.c b/net/ceph/cls_journaler_client.c >> new file mode 100644 >> index 0000000..ac27589 >> --- /dev/null >> +++ b/net/ceph/cls_journaler_client.c >> @@ -0,0 +1,558 @@ >> +// SPDX-License-Identifier: GPL-2.0 >> +#include <linux/ceph/ceph_debug.h> >> +#include <linux/ceph/cls_journaler_client.h> >> +#include <linux/ceph/journaler.h> >> + >> +#include <linux/types.h> >> + >> +//TODO get all metas in one single request >> +int ceph_cls_journaler_get_immutable_metas(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + uint8_t *order, >> + uint8_t *splay_width, >> + int64_t *pool_id) >> +{ >> + struct page *reply_page; >> + size_t reply_len = sizeof(*order); >> + int ret; >> + >> + reply_page = alloc_page(GFP_NOIO); >> + if (!reply_page) >> + return -ENOMEM; >> + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_order", >> + CEPH_OSD_FLAG_READ, NULL, >> + 0, &reply_page, &reply_len); >> + if (!ret) { >> + memcpy(order, page_address(reply_page), reply_len); >> + } else { >> + goto out; >> + } >> + reply_len = sizeof(*splay_width); >> + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_splay_width", >> + CEPH_OSD_FLAG_READ, NULL, >> + 0, &reply_page, &reply_len); >> + if (!ret) { >> + memcpy(splay_width, page_address(reply_page), reply_len); >> + } else { >> + goto out; >> + } >> + reply_len = sizeof(*pool_id); >> + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_pool_id", >> + CEPH_OSD_FLAG_READ, NULL, >> + 0, &reply_page, &reply_len); >> + if (!ret) { >> + memcpy(pool_id, page_address(reply_page), reply_len); > This memcpy() is not going to work on big endian machines. Use either > ceph_decode_*() or le*_to_cpu() helpers. > > Also, check for errors instead of success: > > ret = ceph_osdc_call(...); > if (ret) > goto out; > > ... decode ... will use decode function >> + } else { >> + goto out; >> + } >> +out: >> + __free_page(reply_page); >> + return ret; >> +} >> +EXPORT_SYMBOL(ceph_cls_journaler_get_immutable_metas); >> + >> +//TODO get all metas in one single request >> +int ceph_cls_journaler_get_mutable_metas(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + uint64_t *minimum_set, uint64_t *active_set) >> +{ >> + struct page *reply_page; >> + int ret; >> + size_t reply_len = sizeof(*minimum_set); >> + >> + reply_page = alloc_page(GFP_NOIO); >> + if (!reply_page) >> + return -ENOMEM; >> + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_minimum_set", >> + CEPH_OSD_FLAG_READ, NULL, >> + 0, &reply_page, &reply_len); >> + if (!ret) { >> + memcpy(minimum_set, page_address(reply_page), reply_len); > Same here. thanx >> + } else { >> + goto out; >> + } >> + reply_len = sizeof(active_set); >> + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_active_set", >> + CEPH_OSD_FLAG_READ, NULL, >> + 0, &reply_page, &reply_len); >> + if (!ret) { >> + memcpy(active_set, page_address(reply_page), reply_len); > Same here. thanx >> + } else { >> + goto out; >> + } >> +out: >> + __free_page(reply_page); >> + return ret; >> +} >> +EXPORT_SYMBOL(ceph_cls_journaler_get_mutable_metas); >> + >> +static int decode_object_position(void **p, void *end, struct ceph_journaler_object_pos *pos) >> +{ >> + u8 struct_v; >> + u32 struct_len; >> + int ret; >> + u64 object_num; >> + u64 tag_tid; >> + u64 entry_tid; >> + >> + ret = ceph_start_decoding(p, end, 1, "cls_journal_object_position", >> + &struct_v, &struct_len); >> + if (ret) >> + return ret; >> + >> + object_num = ceph_decode_64(p); >> + tag_tid = ceph_decode_64(p); >> + entry_tid = ceph_decode_64(p); >> + >> + pos->object_num = object_num; >> + pos->tag_tid = tag_tid; >> + pos->entry_tid = entry_tid; > Can decode directly into pos, without going through local variables. I use local variables actually want to make the code to be more readable. I will remove the local variables and add a comment for them. >> + >> + return ret; >> +} >> + >> +void destroy_client(struct ceph_journaler_client *client) >> +{ >> + kfree(client->object_positions_array); >> + kfree(client->id); >> + kfree(client->data); >> + >> + kfree(client); >> +} >> + >> +struct ceph_journaler_client *create_client(uint8_t splay_width) >> +{ >> + struct ceph_journaler_client *client; >> + struct ceph_journaler_object_pos *pos; >> + int i; >> + >> + client = kzalloc(sizeof(*client), GFP_NOIO); >> + if (!client) >> + return NULL; >> + >> + client->object_positions_array = kcalloc(splay_width, sizeof(*pos), GFP_NOIO); >> + if (!client->object_positions_array) >> + goto free_client; >> + >> + INIT_LIST_HEAD(&client->object_positions); >> + for (i = 0; i < splay_width; i++) { >> + pos = &client->object_positions_array[i]; >> + INIT_LIST_HEAD(&pos->node); >> + list_add_tail(&pos->node, &client->object_positions); >> + } >> + >> + INIT_LIST_HEAD(&client->node); >> + client->data = NULL; >> + client->id = NULL; >> + >> + return client; >> +free_client: >> + kfree(client); >> + return NULL; >> +} >> + >> +static int decode_client(void **p, void *end, struct ceph_journaler_client *client) >> +{ >> + u8 struct_v; >> + u8 state_raw; >> + u32 struct_len; >> + int ret, num, i; >> + struct ceph_journaler_object_pos *pos; >> + >> + ret = ceph_start_decoding(p, end, 1, "cls_journal_get_client_reply", >> + &struct_v, &struct_len); >> + if (ret) >> + return ret; >> + >> + client->id = ceph_extract_encoded_string(p, end, &client->id_len, GFP_NOIO); >> + if (IS_ERR(client->id)) { >> + ret = PTR_ERR(client->id); >> + client->id = NULL; >> + goto err; >> + } >> + client->data = ceph_extract_encoded_string(p, end, &client->data_len, GFP_NOIO); >> + if (IS_ERR(client->data)) { >> + ret = PTR_ERR(client->data); >> + client->data = NULL; >> + goto free_id; >> + } >> + ret = ceph_start_decoding(p, end, 1, "cls_joural_client_object_set_position", >> + &struct_v, &struct_len); >> + if (ret) >> + goto free_data; >> + >> + num = ceph_decode_32(p); >> + i = 0; >> + list_for_each_entry(pos, &client->object_positions, node) { >> + if (i < num) { >> + // we will use this position stub >> + pos->in_using = true; >> + ret = decode_object_position(p, end, pos); >> + if (ret) { >> + goto free_data; >> + } >> + } else { >> + // This stub is not used anymore >> + pos->in_using = false; >> + } >> + i++; >> + } >> + >> + state_raw = ceph_decode_8(p); > Just skip over it (i.e. increment p with a comment), don't add bogus local > variables. thanx >> + return 0; >> + >> +free_data: >> + kfree(client->data); >> +free_id: >> + kfree(client->id); >> +err: >> + return ret; >> +} >> + >> +static int decode_clients(void **p, void *end, struct list_head *clients, uint8_t splay_width) >> +{ >> + int i = 0; >> + int ret; >> + uint32_t client_num; >> + struct ceph_journaler_client *client, *next; >> + >> + client_num = ceph_decode_32(p); >> + // Reuse the clients already exist in list. >> + list_for_each_entry_safe(client, next, clients, node) { >> + // Some clients unregistered. >> + if (i < client_num) { >> + kfree(client->id); >> + kfree(client->data); >> + ret = decode_client(p, end, client); >> + if (ret) >> + return ret; >> + } else { >> + list_del(&client->node); >> + destroy_client(client); >> + } >> + i++; >> + } >> + >> + // Some more clients registered. >> + for (; i < client_num; i++) { >> + client = create_client(splay_width); >> + if (!client) >> + return -ENOMEM; >> + ret = decode_client(p, end, client); >> + if (ret) { >> + destroy_client(client); >> + return ret; >> + } >> + list_add_tail(&client->node, clients); >> + } >> + return 0; >> +} >> + >> +int ceph_cls_journaler_client_list(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + struct list_head *clients, >> + uint8_t splay_width) >> +{ >> + struct page *reply_page; >> + struct page *req_page; >> + int ret; >> + size_t reply_len = PAGE_SIZE; >> + int buf_size; >> + void *p, *end; >> + char name[] = ""; >> + >> + buf_size = strlen(name) + sizeof(__le32) + sizeof(uint64_t); >> + >> + if (buf_size > PAGE_SIZE) >> + return -E2BIG; > If name is always an empty string, this can't ever be true. correct. will remove this check. >> + >> + reply_page = alloc_page(GFP_NOIO); >> + if (!reply_page) >> + return -ENOMEM; >> + >> + req_page = alloc_page(GFP_NOIO); >> + if (!req_page) { >> + ret = -ENOMEM; >> + goto free_reply_page; >> + } >> + >> + p = page_address(req_page); >> + end = p + buf_size; >> + >> + ceph_encode_string(&p, end, name, strlen(name)); > I would drop name and encode 0 directly. okey, will drop name and add a comment here. >> + ceph_encode_64(&p, (uint64_t)256); > This constant needs a define or a comment. Unnecessary cast. agreed >> + >> + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "client_list", >> + CEPH_OSD_FLAG_READ, req_page, >> + buf_size, &reply_page, &reply_len); >> + >> + if (!ret) { >> + p = page_address(reply_page); >> + end = p + reply_len; >> + >> + ret = decode_clients(&p, end, clients, splay_width); >> + } >> + >> + __free_page(req_page); >> +free_reply_page: >> + __free_page(reply_page); >> + return ret; >> +} >> +EXPORT_SYMBOL(ceph_cls_journaler_client_list); >> + >> +int ceph_cls_journaler_get_next_tag_tid(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + uint64_t *tag_tid) >> +{ >> + struct page *reply_page; >> + int ret; >> + size_t reply_len = PAGE_SIZE; >> + >> + reply_page = alloc_page(GFP_NOIO); >> + if (!reply_page) >> + return -ENOMEM; >> + >> + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_next_tag_tid", >> + CEPH_OSD_FLAG_READ, NULL, >> + 0, &reply_page, &reply_len); >> + >> + if (!ret) { >> + memcpy(tag_tid, page_address(reply_page), reply_len); > Same here. thanx >> + } >> + >> + __free_page(reply_page); >> + return ret; >> +} >> +EXPORT_SYMBOL(ceph_cls_journaler_get_next_tag_tid); >> + >> +int ceph_cls_journaler_tag_create(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + uint64_t tag_tid, uint64_t tag_class, >> + void *buf, uint32_t buf_len) >> +{ >> + struct page *req_page; >> + int ret; >> + int buf_size; >> + void *p, *end; >> + >> + buf_size = buf_len + sizeof(__le32) + sizeof(uint64_t) + sizeof(uint64_t); >> + >> + if (buf_size > PAGE_SIZE) >> + return -E2BIG; >> + >> + req_page = alloc_page(GFP_NOIO); >> + if (!req_page) >> + return -ENOMEM; >> + >> + p = page_address(req_page); >> + end = p + buf_size; >> + >> + ceph_encode_64(&p, tag_tid); >> + ceph_encode_64(&p, tag_class); >> + ceph_encode_string(&p, end, buf, buf_len); >> + >> + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "tag_create", >> + CEPH_OSD_FLAG_WRITE, req_page, >> + buf_size, NULL, NULL); >> + >> + __free_page(req_page); >> + return ret; >> +} >> +EXPORT_SYMBOL(ceph_cls_journaler_tag_create); >> + >> +int decode_tag(void **p, void *end, struct ceph_journaler_tag *tag) >> +{ >> + int ret; >> + u8 struct_v; >> + u32 struct_len; >> + >> + ret = ceph_start_decoding(p, end, 1, "cls_journaler_tag", >> + &struct_v, &struct_len); >> + if (ret) >> + return ret; >> + >> + tag->tid = ceph_decode_64(p); >> + tag->tag_class = ceph_decode_64(p); >> + tag->data = ceph_extract_encoded_string(p, end, &tag->data_len, GFP_NOIO); >> + if (IS_ERR(tag->data)) { >> + ret = PTR_ERR(tag->data); >> + tag->data = NULL; >> + return ret; >> + } >> + >> + return 0; >> +} >> + >> +int ceph_cls_journaler_get_tag(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + uint64_t tag_tid, struct ceph_journaler_tag *tag) >> +{ >> + struct page *reply_page; >> + struct page *req_page; >> + int ret; >> + size_t reply_len = PAGE_SIZE; >> + int buf_size; >> + void *p, *end; >> + >> + buf_size = sizeof(tag_tid); >> + >> + reply_page = alloc_page(GFP_NOIO); >> + if (!reply_page) >> + return -ENOMEM; >> + >> + req_page = alloc_page(GFP_NOIO); >> + if (!req_page) { >> + ret = -ENOMEM; >> + goto free_reply_page; >> + } >> + >> + p = page_address(req_page); >> + end = p + buf_size; >> + ceph_encode_64(&p, tag_tid); >> + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_tag", >> + CEPH_OSD_FLAG_READ, req_page, >> + buf_size, &reply_page, &reply_len); >> + >> + if (!ret) { >> + p = page_address(reply_page); >> + end = p + reply_len; >> + >> + ret = decode_tag(&p, end, tag); >> + } >> + >> + __free_page(req_page); >> +free_reply_page: >> + __free_page(reply_page); >> + return ret; >> +} >> +EXPORT_SYMBOL(ceph_cls_journaler_get_tag); >> + >> +static int version_len = 6; > Use CEPH_ENCODING_START_BLK_LEN. okey >> + >> +int ceph_cls_journaler_client_committed(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + struct ceph_journaler_client *client, >> + struct list_head *object_positions) >> +{ >> + struct page *req_page; >> + int ret; >> + int buf_size; >> + void *p, *end; >> + struct ceph_journaler_object_pos *position = NULL; >> + int object_position_len = version_len + 8 + 8 + 8; >> + int pos_num = 0; >> + >> + buf_size = 4 + client->id_len + version_len + 4; >> + list_for_each_entry(position, object_positions, node) { >> + buf_size += object_position_len; >> + pos_num++; >> + } >> + >> + if (buf_size > PAGE_SIZE) >> + return -E2BIG; >> + >> + req_page = alloc_page(GFP_NOIO); >> + if (!req_page) >> + return -ENOMEM; >> + >> + p = page_address(req_page); >> + end = p + buf_size; >> + ceph_encode_string(&p, end, client->id, client->id_len); >> + ceph_start_encoding(&p, 1, 1, buf_size - client->id_len - version_len - 4); >> + ceph_encode_32(&p, pos_num); >> + >> + list_for_each_entry(position, object_positions, node) { >> + ceph_start_encoding(&p, 1, 1, 24); >> + ceph_encode_64(&p, position->object_num); >> + ceph_encode_64(&p, position->tag_tid); >> + ceph_encode_64(&p, position->entry_tid); >> + } >> + >> + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "client_commit", >> + CEPH_OSD_FLAG_WRITE, req_page, >> + buf_size, NULL, NULL); >> + >> + __free_page(req_page); >> + return ret; >> +} >> +EXPORT_SYMBOL(ceph_cls_journaler_client_committed); >> + >> + >> +int ceph_cls_journaler_set_minimum_set(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + uint64_t minimum_set) >> +{ >> + struct page *req_page; >> + int ret; >> + void *p; >> + >> + req_page = alloc_page(GFP_NOIO); >> + if (!req_page) >> + return -ENOMEM; >> + >> + p = page_address(req_page); >> + ceph_encode_64(&p, minimum_set); >> + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "set_minimum_set", >> + CEPH_OSD_FLAG_WRITE, req_page, >> + 8, NULL, NULL); >> + >> + __free_page(req_page); >> + return ret; >> +} >> +EXPORT_SYMBOL(ceph_cls_journaler_set_minimum_set); >> + >> +int ceph_cls_journaler_set_active_set(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + uint64_t active_set) >> +{ >> + struct page *req_page; >> + int ret; >> + void *p; >> + >> + req_page = alloc_page(GFP_NOIO); >> + if (!req_page) >> + return -ENOMEM; >> + >> + p = page_address(req_page); >> + ceph_encode_64(&p, active_set); >> + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "set_active_set", >> + CEPH_OSD_FLAG_WRITE, req_page, >> + 8, NULL, NULL); >> + >> + __free_page(req_page); >> + return ret; >> +} >> +EXPORT_SYMBOL(ceph_cls_journaler_set_active_set); >> + >> +int ceph_cls_journaler_guard_append(struct ceph_osd_client *osdc, >> + struct ceph_object_id *oid, >> + struct ceph_object_locator *oloc, >> + uint64_t soft_limit) >> +{ >> + struct page *req_page; >> + int ret; >> + void *p; >> + >> + req_page = alloc_page(GFP_NOIO); >> + if (!req_page) >> + return -ENOMEM; >> + >> + p = page_address(req_page); >> + ceph_encode_64(&p, soft_limit); >> + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "guard_append", >> + CEPH_OSD_FLAG_READ, req_page, >> + 8, NULL, NULL); >> + >> + __free_page(req_page); >> + return ret; >> +} >> +EXPORT_SYMBOL(ceph_cls_journaler_guard_append); > Looks like this function is unused. yes, will drop it. > Thanks, > > Ilya >
diff --git a/include/linux/ceph/cls_journaler_client.h b/include/linux/ceph/cls_journaler_client.h new file mode 100644 index 0000000..6245882 --- /dev/null +++ b/include/linux/ceph/cls_journaler_client.h @@ -0,0 +1,94 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +#ifndef _LINUX_CEPH_CLS_JOURNAL_CLIENT_H +#define _LINUX_CEPH_CLS_JOURNAL_CLIENT_H + +#include <linux/ceph/osd_client.h> + +struct ceph_journaler; +struct ceph_journaler_client; + +struct ceph_journaler_object_pos { + struct list_head node; + u64 object_num; + u64 tag_tid; + u64 entry_tid; + // ->in_using means this object_pos is initialized. + // There would be some stub for it created in init step + // to allocate memory as early as possible. + bool in_using; +}; + +struct ceph_journaler_client { + struct list_head node; + size_t id_len; + char *id; + size_t data_len; + char *data; + struct list_head object_positions; + struct ceph_journaler_object_pos *object_positions_array; +}; + +struct ceph_journaler_tag { + uint64_t tid; + uint64_t tag_class; + size_t data_len; + char *data; +}; + +void destroy_client(struct ceph_journaler_client *client); + +int ceph_cls_journaler_get_immutable_metas(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint8_t *order, + uint8_t *splay_width, + int64_t *pool_id); + +int ceph_cls_journaler_get_mutable_metas(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t *minimum_set, uint64_t *active_set); + +int ceph_cls_journaler_client_list(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + struct list_head *clients, + uint8_t splay_width); + +int ceph_cls_journaler_get_next_tag_tid(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t *tag_tid); + +int ceph_cls_journaler_get_tag(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t tag_tid, struct ceph_journaler_tag *tag); + +int ceph_cls_journaler_tag_create(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t tag_tid, uint64_t tag_class, + void *buf, uint32_t buf_len); + +int ceph_cls_journaler_client_committed(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + struct ceph_journaler_client *client, + struct list_head *object_positions); + +int ceph_cls_journaler_set_active_set(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t active_set); + +int ceph_cls_journaler_set_minimum_set(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t minimum_set); + +int ceph_cls_journaler_guard_append(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t soft_limit); +#endif diff --git a/net/ceph/cls_journaler_client.c b/net/ceph/cls_journaler_client.c new file mode 100644 index 0000000..ac27589 --- /dev/null +++ b/net/ceph/cls_journaler_client.c @@ -0,0 +1,558 @@ +// SPDX-License-Identifier: GPL-2.0 +#include <linux/ceph/ceph_debug.h> +#include <linux/ceph/cls_journaler_client.h> +#include <linux/ceph/journaler.h> + +#include <linux/types.h> + +//TODO get all metas in one single request +int ceph_cls_journaler_get_immutable_metas(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint8_t *order, + uint8_t *splay_width, + int64_t *pool_id) +{ + struct page *reply_page; + size_t reply_len = sizeof(*order); + int ret; + + reply_page = alloc_page(GFP_NOIO); + if (!reply_page) + return -ENOMEM; + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_order", + CEPH_OSD_FLAG_READ, NULL, + 0, &reply_page, &reply_len); + if (!ret) { + memcpy(order, page_address(reply_page), reply_len); + } else { + goto out; + } + reply_len = sizeof(*splay_width); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_splay_width", + CEPH_OSD_FLAG_READ, NULL, + 0, &reply_page, &reply_len); + if (!ret) { + memcpy(splay_width, page_address(reply_page), reply_len); + } else { + goto out; + } + reply_len = sizeof(*pool_id); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_pool_id", + CEPH_OSD_FLAG_READ, NULL, + 0, &reply_page, &reply_len); + if (!ret) { + memcpy(pool_id, page_address(reply_page), reply_len); + } else { + goto out; + } +out: + __free_page(reply_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_get_immutable_metas); + +//TODO get all metas in one single request +int ceph_cls_journaler_get_mutable_metas(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t *minimum_set, uint64_t *active_set) +{ + struct page *reply_page; + int ret; + size_t reply_len = sizeof(*minimum_set); + + reply_page = alloc_page(GFP_NOIO); + if (!reply_page) + return -ENOMEM; + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_minimum_set", + CEPH_OSD_FLAG_READ, NULL, + 0, &reply_page, &reply_len); + if (!ret) { + memcpy(minimum_set, page_address(reply_page), reply_len); + } else { + goto out; + } + reply_len = sizeof(active_set); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_active_set", + CEPH_OSD_FLAG_READ, NULL, + 0, &reply_page, &reply_len); + if (!ret) { + memcpy(active_set, page_address(reply_page), reply_len); + } else { + goto out; + } +out: + __free_page(reply_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_get_mutable_metas); + +static int decode_object_position(void **p, void *end, struct ceph_journaler_object_pos *pos) +{ + u8 struct_v; + u32 struct_len; + int ret; + u64 object_num; + u64 tag_tid; + u64 entry_tid; + + ret = ceph_start_decoding(p, end, 1, "cls_journal_object_position", + &struct_v, &struct_len); + if (ret) + return ret; + + object_num = ceph_decode_64(p); + tag_tid = ceph_decode_64(p); + entry_tid = ceph_decode_64(p); + + pos->object_num = object_num; + pos->tag_tid = tag_tid; + pos->entry_tid = entry_tid; + + return ret; +} + +void destroy_client(struct ceph_journaler_client *client) +{ + kfree(client->object_positions_array); + kfree(client->id); + kfree(client->data); + + kfree(client); +} + +struct ceph_journaler_client *create_client(uint8_t splay_width) +{ + struct ceph_journaler_client *client; + struct ceph_journaler_object_pos *pos; + int i; + + client = kzalloc(sizeof(*client), GFP_NOIO); + if (!client) + return NULL; + + client->object_positions_array = kcalloc(splay_width, sizeof(*pos), GFP_NOIO); + if (!client->object_positions_array) + goto free_client; + + INIT_LIST_HEAD(&client->object_positions); + for (i = 0; i < splay_width; i++) { + pos = &client->object_positions_array[i]; + INIT_LIST_HEAD(&pos->node); + list_add_tail(&pos->node, &client->object_positions); + } + + INIT_LIST_HEAD(&client->node); + client->data = NULL; + client->id = NULL; + + return client; +free_client: + kfree(client); + return NULL; +} + +static int decode_client(void **p, void *end, struct ceph_journaler_client *client) +{ + u8 struct_v; + u8 state_raw; + u32 struct_len; + int ret, num, i; + struct ceph_journaler_object_pos *pos; + + ret = ceph_start_decoding(p, end, 1, "cls_journal_get_client_reply", + &struct_v, &struct_len); + if (ret) + return ret; + + client->id = ceph_extract_encoded_string(p, end, &client->id_len, GFP_NOIO); + if (IS_ERR(client->id)) { + ret = PTR_ERR(client->id); + client->id = NULL; + goto err; + } + client->data = ceph_extract_encoded_string(p, end, &client->data_len, GFP_NOIO); + if (IS_ERR(client->data)) { + ret = PTR_ERR(client->data); + client->data = NULL; + goto free_id; + } + ret = ceph_start_decoding(p, end, 1, "cls_joural_client_object_set_position", + &struct_v, &struct_len); + if (ret) + goto free_data; + + num = ceph_decode_32(p); + i = 0; + list_for_each_entry(pos, &client->object_positions, node) { + if (i < num) { + // we will use this position stub + pos->in_using = true; + ret = decode_object_position(p, end, pos); + if (ret) { + goto free_data; + } + } else { + // This stub is not used anymore + pos->in_using = false; + } + i++; + } + + state_raw = ceph_decode_8(p); + return 0; + +free_data: + kfree(client->data); +free_id: + kfree(client->id); +err: + return ret; +} + +static int decode_clients(void **p, void *end, struct list_head *clients, uint8_t splay_width) +{ + int i = 0; + int ret; + uint32_t client_num; + struct ceph_journaler_client *client, *next; + + client_num = ceph_decode_32(p); + // Reuse the clients already exist in list. + list_for_each_entry_safe(client, next, clients, node) { + // Some clients unregistered. + if (i < client_num) { + kfree(client->id); + kfree(client->data); + ret = decode_client(p, end, client); + if (ret) + return ret; + } else { + list_del(&client->node); + destroy_client(client); + } + i++; + } + + // Some more clients registered. + for (; i < client_num; i++) { + client = create_client(splay_width); + if (!client) + return -ENOMEM; + ret = decode_client(p, end, client); + if (ret) { + destroy_client(client); + return ret; + } + list_add_tail(&client->node, clients); + } + return 0; +} + +int ceph_cls_journaler_client_list(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + struct list_head *clients, + uint8_t splay_width) +{ + struct page *reply_page; + struct page *req_page; + int ret; + size_t reply_len = PAGE_SIZE; + int buf_size; + void *p, *end; + char name[] = ""; + + buf_size = strlen(name) + sizeof(__le32) + sizeof(uint64_t); + + if (buf_size > PAGE_SIZE) + return -E2BIG; + + reply_page = alloc_page(GFP_NOIO); + if (!reply_page) + return -ENOMEM; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) { + ret = -ENOMEM; + goto free_reply_page; + } + + p = page_address(req_page); + end = p + buf_size; + + ceph_encode_string(&p, end, name, strlen(name)); + ceph_encode_64(&p, (uint64_t)256); + + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "client_list", + CEPH_OSD_FLAG_READ, req_page, + buf_size, &reply_page, &reply_len); + + if (!ret) { + p = page_address(reply_page); + end = p + reply_len; + + ret = decode_clients(&p, end, clients, splay_width); + } + + __free_page(req_page); +free_reply_page: + __free_page(reply_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_client_list); + +int ceph_cls_journaler_get_next_tag_tid(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t *tag_tid) +{ + struct page *reply_page; + int ret; + size_t reply_len = PAGE_SIZE; + + reply_page = alloc_page(GFP_NOIO); + if (!reply_page) + return -ENOMEM; + + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_next_tag_tid", + CEPH_OSD_FLAG_READ, NULL, + 0, &reply_page, &reply_len); + + if (!ret) { + memcpy(tag_tid, page_address(reply_page), reply_len); + } + + __free_page(reply_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_get_next_tag_tid); + +int ceph_cls_journaler_tag_create(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t tag_tid, uint64_t tag_class, + void *buf, uint32_t buf_len) +{ + struct page *req_page; + int ret; + int buf_size; + void *p, *end; + + buf_size = buf_len + sizeof(__le32) + sizeof(uint64_t) + sizeof(uint64_t); + + if (buf_size > PAGE_SIZE) + return -E2BIG; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) + return -ENOMEM; + + p = page_address(req_page); + end = p + buf_size; + + ceph_encode_64(&p, tag_tid); + ceph_encode_64(&p, tag_class); + ceph_encode_string(&p, end, buf, buf_len); + + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "tag_create", + CEPH_OSD_FLAG_WRITE, req_page, + buf_size, NULL, NULL); + + __free_page(req_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_tag_create); + +int decode_tag(void **p, void *end, struct ceph_journaler_tag *tag) +{ + int ret; + u8 struct_v; + u32 struct_len; + + ret = ceph_start_decoding(p, end, 1, "cls_journaler_tag", + &struct_v, &struct_len); + if (ret) + return ret; + + tag->tid = ceph_decode_64(p); + tag->tag_class = ceph_decode_64(p); + tag->data = ceph_extract_encoded_string(p, end, &tag->data_len, GFP_NOIO); + if (IS_ERR(tag->data)) { + ret = PTR_ERR(tag->data); + tag->data = NULL; + return ret; + } + + return 0; +} + +int ceph_cls_journaler_get_tag(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t tag_tid, struct ceph_journaler_tag *tag) +{ + struct page *reply_page; + struct page *req_page; + int ret; + size_t reply_len = PAGE_SIZE; + int buf_size; + void *p, *end; + + buf_size = sizeof(tag_tid); + + reply_page = alloc_page(GFP_NOIO); + if (!reply_page) + return -ENOMEM; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) { + ret = -ENOMEM; + goto free_reply_page; + } + + p = page_address(req_page); + end = p + buf_size; + ceph_encode_64(&p, tag_tid); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_tag", + CEPH_OSD_FLAG_READ, req_page, + buf_size, &reply_page, &reply_len); + + if (!ret) { + p = page_address(reply_page); + end = p + reply_len; + + ret = decode_tag(&p, end, tag); + } + + __free_page(req_page); +free_reply_page: + __free_page(reply_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_get_tag); + +static int version_len = 6; + +int ceph_cls_journaler_client_committed(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + struct ceph_journaler_client *client, + struct list_head *object_positions) +{ + struct page *req_page; + int ret; + int buf_size; + void *p, *end; + struct ceph_journaler_object_pos *position = NULL; + int object_position_len = version_len + 8 + 8 + 8; + int pos_num = 0; + + buf_size = 4 + client->id_len + version_len + 4; + list_for_each_entry(position, object_positions, node) { + buf_size += object_position_len; + pos_num++; + } + + if (buf_size > PAGE_SIZE) + return -E2BIG; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) + return -ENOMEM; + + p = page_address(req_page); + end = p + buf_size; + ceph_encode_string(&p, end, client->id, client->id_len); + ceph_start_encoding(&p, 1, 1, buf_size - client->id_len - version_len - 4); + ceph_encode_32(&p, pos_num); + + list_for_each_entry(position, object_positions, node) { + ceph_start_encoding(&p, 1, 1, 24); + ceph_encode_64(&p, position->object_num); + ceph_encode_64(&p, position->tag_tid); + ceph_encode_64(&p, position->entry_tid); + } + + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "client_commit", + CEPH_OSD_FLAG_WRITE, req_page, + buf_size, NULL, NULL); + + __free_page(req_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_client_committed); + + +int ceph_cls_journaler_set_minimum_set(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t minimum_set) +{ + struct page *req_page; + int ret; + void *p; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) + return -ENOMEM; + + p = page_address(req_page); + ceph_encode_64(&p, minimum_set); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "set_minimum_set", + CEPH_OSD_FLAG_WRITE, req_page, + 8, NULL, NULL); + + __free_page(req_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_set_minimum_set); + +int ceph_cls_journaler_set_active_set(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t active_set) +{ + struct page *req_page; + int ret; + void *p; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) + return -ENOMEM; + + p = page_address(req_page); + ceph_encode_64(&p, active_set); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "set_active_set", + CEPH_OSD_FLAG_WRITE, req_page, + 8, NULL, NULL); + + __free_page(req_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_set_active_set); + +int ceph_cls_journaler_guard_append(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t soft_limit) +{ + struct page *req_page; + int ret; + void *p; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) + return -ENOMEM; + + p = page_address(req_page); + ceph_encode_64(&p, soft_limit); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "guard_append", + CEPH_OSD_FLAG_READ, req_page, + 8, NULL, NULL); + + __free_page(req_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_guard_append);
This is a cls client module for journaler. Signed-off-by: Dongsheng Yang <dongsheng.yang@easystack.cn> --- include/linux/ceph/cls_journaler_client.h | 94 +++++ net/ceph/cls_journaler_client.c | 558 ++++++++++++++++++++++++++++++ 2 files changed, 652 insertions(+) create mode 100644 include/linux/ceph/cls_journaler_client.h create mode 100644 net/ceph/cls_journaler_client.c