From patchwork Thu Aug 16 05:59:29 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dongsheng Yang X-Patchwork-Id: 10566987 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 6DD795A4 for ; Thu, 16 Aug 2018 06:05:30 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 5571C2AB89 for ; Thu, 16 Aug 2018 06:05:30 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 495922AB91; Thu, 16 Aug 2018 06:05:30 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.9 required=2.0 tests=BAYES_00,MAILING_LIST_MULTI, RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 584B22AB89 for ; Thu, 16 Aug 2018 06:05:29 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2388824AbeHPJBb (ORCPT ); Thu, 16 Aug 2018 05:01:31 -0400 Received: from m50149.mail.qiye.163.com ([123.125.50.149]:13150 "EHLO m50149.mail.qiye.163.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S2388779AbeHPJBb (ORCPT ); Thu, 16 Aug 2018 05:01:31 -0400 Received: from atest-guest.localdomain (unknown [218.94.118.90]) by smtp8 (Coremail) with SMTP id RdOowACHY5HTEnVbpKRVAA--.250S3; Thu, 16 Aug 2018 13:59:48 +0800 (CST) From: Dongsheng Yang To: idryomov@gmail.com, sage@redhat.com, elder@kernel.org, jdillama@redhat.com Cc: ceph-devel@vger.kernel.org, dongsheng.yang@easystack.cn Subject: [PATCH 1/4] libceph: support op append Date: Thu, 16 Aug 2018 01:59:29 -0400 Message-Id: <1534399172-27610-2-git-send-email-dongsheng.yang@easystack.cn> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1534399172-27610-1-git-send-email-dongsheng.yang@easystack.cn> References: <1534399172-27610-1-git-send-email-dongsheng.yang@easystack.cn> X-CM-TRANSID: RdOowACHY5HTEnVbpKRVAA--.250S3 X-Coremail-Antispam: 1Uf129KBjvJXoWxCrWftFWrJw4xXr1xtw1UWrg_yoW5trW7pr ZrA3yjyFW3Ga4xZrs7WF95t3yrJ3yvyF42qrWDGrs3C3Z3Jry8Z3Z8Xr9rKw1UZF4Fg348 CF1Y9r90qw1SvrDanT9S1TB71UUUUU7qnTZGkaVYY2UrUUUUjbIjqfuFe4nvWSU5nxnvy2 9KBjDUYxBIdaVFxhVjvjDU0xZFpf9x0J1jdgAUUUUU= X-Originating-IP: [218.94.118.90] X-CM-SenderInfo: 5grqw2pkhqwhp1dqwq5hdv52pwdfyhdfq/1tbifRykelrpNTcvKwAAsK Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP we need to send append operation when we want to support journaling in kernel client. Signed-off-by: Dongsheng Yang --- net/ceph/osd_client.c | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 34b5334..851ff9c 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -378,6 +378,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: + case CEPH_OSD_OP_APPEND: ceph_osd_data_release(&op->extent.osd_data); break; case CEPH_OSD_OP_CALL: @@ -712,13 +713,13 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req, BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO && - opcode != CEPH_OSD_OP_TRUNCATE); + opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_APPEND); op->extent.offset = offset; op->extent.length = length; op->extent.truncate_size = truncate_size; op->extent.truncate_seq = truncate_seq; - if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL) + if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL || opcode == CEPH_OSD_OP_APPEND) payload_len += length; op->indata_len = payload_len; @@ -740,7 +741,7 @@ void osd_req_op_extent_update(struct ceph_osd_request *osd_req, BUG_ON(length > previous); op->extent.length = length; - if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) + if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL || op->op == CEPH_OSD_OP_APPEND) op->indata_len -= previous - length; } EXPORT_SYMBOL(osd_req_op_extent_update); @@ -762,7 +763,7 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, op->extent.offset += offset_inc; op->extent.length -= offset_inc; - if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) + if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL || op->op == CEPH_OSD_OP_APPEND) op->indata_len -= offset_inc; } EXPORT_SYMBOL(osd_req_op_extent_dup_last); @@ -913,6 +914,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst, case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: + case CEPH_OSD_OP_APPEND: case CEPH_OSD_OP_ZERO: case CEPH_OSD_OP_TRUNCATE: dst->extent.offset = cpu_to_le64(src->extent.offset); @@ -998,7 +1000,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE && - opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE); + opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_APPEND); req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, GFP_NOFS); @@ -1872,6 +1874,7 @@ static void setup_request_data(struct ceph_osd_request *req, /* request */ case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: + case CEPH_OSD_OP_APPEND: WARN_ON(op->indata_len != op->extent.length); ceph_osdc_msg_data_add(msg, &op->extent.osd_data); break; From patchwork Thu Aug 16 05:59:30 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dongsheng Yang X-Patchwork-Id: 10566993 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id EE95F5A4 for ; Thu, 16 Aug 2018 06:06:21 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id D20462AB89 for ; Thu, 16 Aug 2018 06:06:21 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id C23E22AB91; Thu, 16 Aug 2018 06:06:21 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.9 required=2.0 tests=BAYES_00,MAILING_LIST_MULTI, RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id ADD362AB89 for ; Thu, 16 Aug 2018 06:06:20 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2388831AbeHPJCX (ORCPT ); Thu, 16 Aug 2018 05:02:23 -0400 Received: from m50149.mail.qiye.163.com ([123.125.50.149]:13703 "EHLO m50149.mail.qiye.163.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S2388759AbeHPJCX (ORCPT ); Thu, 16 Aug 2018 05:02:23 -0400 Received: from atest-guest.localdomain (unknown [218.94.118.90]) by smtp8 (Coremail) with SMTP id RdOowACHY5HTEnVbpKRVAA--.250S4; Thu, 16 Aug 2018 13:59:48 +0800 (CST) From: Dongsheng Yang To: idryomov@gmail.com, sage@redhat.com, elder@kernel.org, jdillama@redhat.com Cc: ceph-devel@vger.kernel.org, dongsheng.yang@easystack.cn Subject: [PATCH 2/4] libceph: introduce cls_journaler_client Date: Thu, 16 Aug 2018 01:59:30 -0400 Message-Id: <1534399172-27610-3-git-send-email-dongsheng.yang@easystack.cn> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1534399172-27610-1-git-send-email-dongsheng.yang@easystack.cn> References: <1534399172-27610-1-git-send-email-dongsheng.yang@easystack.cn> X-CM-TRANSID: RdOowACHY5HTEnVbpKRVAA--.250S4 X-Coremail-Antispam: 1Uf129KBjvAXoWfGryxWFWrAr45urW5uF4rKrg_yoW8Wr48Ao WIkr4UGrn5JF4DArWvkrn2gFyjgayrKFn5Cw1FvFsruanrA34fKw13Kw43ta43CF1ayrsr Kw4xJ3WfJr48A3W7n29KB7ZKAUJUUUU8529EdanIXcx71UUUUU7v73VFW2AGmfu7bjvjm3 AaLaJ3UbIYCTnIWIevJa73UjIFyTuYvjfUJT5lUUUUU X-Originating-IP: [218.94.118.90] X-CM-SenderInfo: 5grqw2pkhqwhp1dqwq5hdv52pwdfyhdfq/1tbidRSkeln5eVKkJwAAs3 Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP This is a cls client module for journaler. Signed-off-by: Dongsheng Yang --- include/linux/ceph/cls_journaler_client.h | 87 ++++++ net/ceph/cls_journaler_client.c | 501 ++++++++++++++++++++++++++++++ 2 files changed, 588 insertions(+) create mode 100644 include/linux/ceph/cls_journaler_client.h create mode 100644 net/ceph/cls_journaler_client.c diff --git a/include/linux/ceph/cls_journaler_client.h b/include/linux/ceph/cls_journaler_client.h new file mode 100644 index 0000000..cc9be96 --- /dev/null +++ b/include/linux/ceph/cls_journaler_client.h @@ -0,0 +1,87 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +#ifndef _LINUX_CEPH_CLS_JOURNAL_CLIENT_H +#define _LINUX_CEPH_CLS_JOURNAL_CLIENT_H + +#include + +struct ceph_journaler; +struct ceph_journaler_client; + +struct ceph_journaler_object_pos { + struct list_head node; + u64 object_num; + u64 tag_tid; + u64 entry_tid; +}; + +struct ceph_journaler_client { + struct list_head node; + size_t id_len; + char *id; + size_t data_len; + char *data; + struct list_head object_positions; +}; + +struct ceph_journaler_tag { + uint64_t tid; + uint64_t tag_class; + size_t data_len; + char *data; +}; + +int ceph_cls_journaler_get_immutable_metas(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint8_t *order, + uint8_t *splay_width, + int64_t *pool_id); + +int ceph_cls_journaler_get_mutable_metas(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t *minimum_set, uint64_t *active_set); + +int ceph_cls_journaler_client_list(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + struct ceph_journaler_client **clients, + uint32_t *client_num); + +int ceph_cls_journaler_get_next_tag_tid(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t *tag_tid); + +int ceph_cls_journaler_get_tag(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t tag_tid, struct ceph_journaler_tag *tag); + +int ceph_cls_journaler_tag_create(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t tag_tid, uint64_t tag_class, + void *buf, uint32_t buf_len); + +int ceph_cls_journaler_client_committed(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + struct ceph_journaler_client *client, + struct list_head *object_positions); + +int ceph_cls_journaler_set_active_set(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t active_set); + +int ceph_cls_journaler_set_minimum_set(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t minimum_set); + +int ceph_cls_journaler_guard_append(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t soft_limit); +#endif diff --git a/net/ceph/cls_journaler_client.c b/net/ceph/cls_journaler_client.c new file mode 100644 index 0000000..971fc5d --- /dev/null +++ b/net/ceph/cls_journaler_client.c @@ -0,0 +1,501 @@ +// SPDX-License-Identifier: GPL-2.0 +#include + +#include +#include + +#include +#include +#include + +//TODO get all metas in one single request +int ceph_cls_journaler_get_immutable_metas(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint8_t *order, + uint8_t *splay_width, + int64_t *pool_id) +{ + struct page *reply_page; + size_t reply_len = sizeof(*order); + int ret; + + reply_page = alloc_page(GFP_NOIO); + if (!reply_page) + return -ENOMEM; + + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_order", + CEPH_OSD_FLAG_READ, NULL, + 0, reply_page, &reply_len); + + if (!ret) { + memcpy(order, page_address(reply_page), reply_len); + } + + reply_len = sizeof(*splay_width); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_splay_width", + CEPH_OSD_FLAG_READ, NULL, + 0, reply_page, &reply_len); + + if (!ret) { + memcpy(splay_width, page_address(reply_page), reply_len); + } + + reply_len = sizeof(*pool_id); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_pool_id", + CEPH_OSD_FLAG_READ, NULL, + 0, reply_page, &reply_len); + + if (!ret) { + memcpy(pool_id, page_address(reply_page), reply_len); + } + + dout("%s: status %d, order: %d\n", __func__, ret, *order); + + __free_page(reply_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_get_immutable_metas); + +//TODO get all metas in one single request +int ceph_cls_journaler_get_mutable_metas(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t *minimum_set, uint64_t *active_set) +{ + struct page *reply_page; + int ret; + size_t reply_len = sizeof(*minimum_set); + + reply_page = alloc_page(GFP_NOIO); + if (!reply_page) + return -ENOMEM; + + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_minimum_set", + CEPH_OSD_FLAG_READ, NULL, + 0, reply_page, &reply_len); + + if (!ret) { + memcpy(minimum_set, page_address(reply_page), reply_len); + } + + reply_len = sizeof(active_set); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_active_set", + CEPH_OSD_FLAG_READ, NULL, + 0, reply_page, &reply_len); + + if (!ret) { + memcpy(active_set, page_address(reply_page), reply_len); + } + + dout("%s: status %d, minimum_set: %llu, active_set: %llu\n", __func__, ret, *minimum_set, *active_set); + + __free_page(reply_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_get_mutable_metas); + +static int decode_object_position(void **p, void *end, struct ceph_journaler_object_pos *pos) +{ + u8 struct_v; + u32 struct_len; + int ret = 0; + + u64 object_num = 0; + u64 tag_tid = 0; + u64 entry_tid = 0; + + ret = ceph_start_decoding(p, end, 1, "cls_journal_object_position", + &struct_v, &struct_len); + if (ret) + return ret; + + object_num = ceph_decode_64(p); + + tag_tid = ceph_decode_64(p); + + entry_tid = ceph_decode_64(p); + + dout("object_num: %llu, tag_tid: %llu, entry_tid: %llu", object_num, tag_tid, entry_tid); + + pos->object_num = object_num; + pos->tag_tid = tag_tid; + pos->entry_tid = entry_tid; + + return ret; +} + +static int decode_client(void **p, void *end, struct ceph_journaler_client *client) +{ + u8 struct_v; + u32 struct_len; + int ret = 0; + int num, i; + u8 state_raw; + + INIT_LIST_HEAD(&client->node); + INIT_LIST_HEAD(&client->object_positions); + ret = ceph_start_decoding(p, end, 1, "cls_journal_get_client_reply", + &struct_v, &struct_len); + dout("%s, ret from ceph_start_decoding: %d", __func__, ret); + if (ret) + return ret; + + client->id = ceph_extract_encoded_string(p, end, &client->id_len, GFP_NOIO); + + client->data = ceph_extract_encoded_string(p, end, &client->data_len, GFP_NOIO); + + ret = ceph_start_decoding(p, end, 1, "cls_joural_client_object_set_position", + &struct_v, &struct_len); + if (ret) + return ret; + + num = ceph_decode_32(p); + + for (i = 0; i < num; i++) { + struct ceph_journaler_object_pos *pos = kzalloc(sizeof(*pos), GFP_KERNEL); + if (!pos) + return -ENOMEM; + + ret = decode_object_position(p, end, pos); + if (ret) + return ret; + list_add_tail(&pos->node, &client->object_positions); + } + + state_raw = ceph_decode_8(p); + + return ret; +} + +static int decode_clients(void **p, void *end, struct ceph_journaler_client **clients, uint32_t *client_num) +{ + int i; + int ret = 0; + + *client_num = ceph_decode_32(p); + if (ret) + return ret; + + *clients = kcalloc(*client_num, sizeof(**clients), GFP_NOIO); + if (!*clients) + return -ENOMEM; + + for (i = 0; i < *client_num; i++) { + ret = decode_client(p, end, *clients + i); + if (ret) + return ret; + } + + return ret; +} + +int ceph_cls_journaler_client_list(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + struct ceph_journaler_client **clients, + uint32_t *client_num) +{ + struct page *reply_page; + struct page *req_page; + int ret; + size_t reply_len = PAGE_SIZE; + int buf_size; + void *p, *end; + char name[] = ""; + + buf_size = strlen(name) + sizeof(__le32) + sizeof(uint64_t); + + if (buf_size > PAGE_SIZE) + return -E2BIG; + + reply_page = alloc_page(GFP_NOIO); + if (!reply_page) + return -ENOMEM; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) + return -ENOMEM; + + p = page_address(req_page); + end = p + buf_size; + + ceph_encode_string(&p, end, name, strlen(name)); + ceph_encode_64(&p, (uint64_t)256); + + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "client_list", + CEPH_OSD_FLAG_READ, req_page, + buf_size, reply_page, &reply_len); + + if (!ret) { + p = page_address(reply_page); + end = p + reply_len; + + ret = decode_clients(&p, end, clients, client_num); + } + + __free_page(reply_page); + __free_page(req_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_client_list); + +int ceph_cls_journaler_get_next_tag_tid(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t *tag_tid) +{ + struct page *reply_page; + int ret; + size_t reply_len = PAGE_SIZE; + + reply_page = alloc_page(GFP_NOIO); + if (!reply_page) + return -ENOMEM; + + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_next_tag_tid", + CEPH_OSD_FLAG_READ, NULL, + 0, reply_page, &reply_len); + + if (!ret) { + memcpy(tag_tid, page_address(reply_page), reply_len); + } + + __free_page(reply_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_get_next_tag_tid); + +int ceph_cls_journaler_tag_create(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t tag_tid, uint64_t tag_class, + void *buf, uint32_t buf_len) +{ + struct page *req_page; + int ret; + int buf_size; + void *p, *end; + + buf_size = buf_len + sizeof(__le32) + sizeof(uint64_t) + sizeof(uint64_t); + + if (buf_size > PAGE_SIZE) + return -E2BIG; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) + return -ENOMEM; + + p = page_address(req_page); + end = p + buf_size; + + ceph_encode_64(&p, tag_tid); + ceph_encode_64(&p, tag_class); + ceph_encode_string(&p, end, buf, buf_len); + + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "tag_create", + CEPH_OSD_FLAG_WRITE, req_page, + buf_size, NULL, NULL); + + __free_page(req_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_tag_create); + +int decode_tag(void **p, void *end, struct ceph_journaler_tag *tag) +{ + int ret = 0; + u8 struct_v; + u32 struct_len; + + ret = ceph_start_decoding(p, end, 1, "cls_journaler_tag", + &struct_v, &struct_len); + if (ret) + return ret; + + tag->tid = ceph_decode_64(p); + tag->tag_class = ceph_decode_64(p); + tag->data = ceph_extract_encoded_string(p, end, &tag->data_len, GFP_NOIO); + + return 0; +} + +int ceph_cls_journaler_get_tag(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t tag_tid, struct ceph_journaler_tag *tag) +{ + struct page *reply_page; + struct page *req_page; + int ret; + size_t reply_len = PAGE_SIZE; + int buf_size; + void *p, *end; + + buf_size = sizeof(tag_tid); + + reply_page = alloc_page(GFP_NOIO); + if (!reply_page) + return -ENOMEM; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) + return -ENOMEM; + + p = page_address(req_page); + end = p + buf_size; + + ceph_encode_64(&p, tag_tid); + + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_tag", + CEPH_OSD_FLAG_READ, req_page, + buf_size, reply_page, &reply_len); + + if (!ret) { + p = page_address(reply_page); + end = p + reply_len; + + ret = decode_tag(&p, end, tag); + } + + __free_page(reply_page); + __free_page(req_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_get_tag); + +static int version_len = 6; + +int ceph_cls_journaler_client_committed(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + struct ceph_journaler_client *client, + struct list_head *object_positions) +{ + struct page *req_page; + int ret; + int buf_size; + void *p, *end; + struct ceph_journaler_object_pos *position = NULL; + + int object_position_len = version_len + 8 + 8 + 8; + + int pos_num = 0; + + buf_size = 4 + client->id_len + version_len + 4; + + list_for_each_entry(position, object_positions, node) { + buf_size += object_position_len; + pos_num++; + } + + if (buf_size > PAGE_SIZE) + return -E2BIG; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) + return -ENOMEM; + + p = page_address(req_page); + end = p + buf_size; + + ceph_encode_string(&p, end, client->id, client->id_len); + + ceph_start_encoding(&p, 1, 1, buf_size - client->id_len - version_len - 4); + + ceph_encode_32(&p, pos_num); + + list_for_each_entry(position, object_positions, node) { + ceph_start_encoding(&p, 1, 1, 24); + ceph_encode_64(&p, position->object_num); + ceph_encode_64(&p, position->tag_tid); + ceph_encode_64(&p, position->entry_tid); + } + + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "client_commit", + CEPH_OSD_FLAG_WRITE, req_page, + buf_size, NULL, NULL); + + __free_page(req_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_client_committed); + + +int ceph_cls_journaler_set_minimum_set(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t minimum_set) +{ + struct page *req_page; + int ret; + void *p; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) + return -ENOMEM; + + p = page_address(req_page); + + ceph_encode_64(&p, minimum_set); + + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "set_minimum_set", + CEPH_OSD_FLAG_WRITE, req_page, + 8, NULL, NULL); + + __free_page(req_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_set_minimum_set); + +int ceph_cls_journaler_set_active_set(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t active_set) +{ + struct page *req_page; + int ret; + void *p; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) + return -ENOMEM; + + p = page_address(req_page); + + ceph_encode_64(&p, active_set); + + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "set_active_set", + CEPH_OSD_FLAG_WRITE, req_page, + 8, NULL, NULL); + + __free_page(req_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_set_active_set); + +int ceph_cls_journaler_guard_append(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + uint64_t soft_limit) +{ + struct page *req_page; + int ret; + void *p; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) + return -ENOMEM; + + p = page_address(req_page); + + ceph_encode_64(&p, soft_limit); + + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "guard_append", + CEPH_OSD_FLAG_READ, req_page, + 8, NULL, NULL); + + __free_page(req_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journaler_guard_append); From patchwork Thu Aug 16 05:59:31 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dongsheng Yang X-Patchwork-Id: 10566997 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 5D3DE5A4 for ; Thu, 16 Aug 2018 06:09:10 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 393442ABD0 for ; Thu, 16 Aug 2018 06:09:10 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 2D2E82ABE2; Thu, 16 Aug 2018 06:09:10 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.9 required=2.0 tests=BAYES_00,MAILING_LIST_MULTI, RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 292F22ABD4 for ; Thu, 16 Aug 2018 06:09:08 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2388770AbeHPJFL (ORCPT ); Thu, 16 Aug 2018 05:05:11 -0400 Received: from m50149.mail.qiye.163.com ([123.125.50.149]:13138 "EHLO m50149.mail.qiye.163.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S2388305AbeHPJFL (ORCPT ); Thu, 16 Aug 2018 05:05:11 -0400 Received: from atest-guest.localdomain (unknown [218.94.118.90]) by smtp8 (Coremail) with SMTP id RdOowACHY5HTEnVbpKRVAA--.250S5; Thu, 16 Aug 2018 13:59:48 +0800 (CST) From: Dongsheng Yang To: idryomov@gmail.com, sage@redhat.com, elder@kernel.org, jdillama@redhat.com Cc: ceph-devel@vger.kernel.org, dongsheng.yang@easystack.cn Subject: [PATCH 3/4] libceph: introduce generic journaling Date: Thu, 16 Aug 2018 01:59:31 -0400 Message-Id: <1534399172-27610-4-git-send-email-dongsheng.yang@easystack.cn> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1534399172-27610-1-git-send-email-dongsheng.yang@easystack.cn> References: <1534399172-27610-1-git-send-email-dongsheng.yang@easystack.cn> X-CM-TRANSID: RdOowACHY5HTEnVbpKRVAA--.250S5 X-Coremail-Antispam: 1Uf129KBjvAXoWDJF47XFWUGFyUArWfGr1fXrb_yoWrWr18Co WIqF4UCF1rGa47uFW8Kr1xW34Sqa48Ja4rAr4YqF4a9an7A340v3y7Gr15t34Yyw45ArsF qw4xJwn3Jr4DA3WUn29KB7ZKAUJUUUU8529EdanIXcx71UUUUU7v73VFW2AGmfu7bjvjm3 AaLaJ3UbIYCTnIWIevJa73UjIFyTuYvjfUJT5lUUUUU X-Originating-IP: [218.94.118.90] X-CM-SenderInfo: 5grqw2pkhqwhp1dqwq5hdv52pwdfyhdfq/1tbighWkelsflp-gHwAAs7 Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP This is the generic journaling in ceph kernel client. There are three works this module has to do: (1) journal recording: generic journaling module provide an api named as ceph_journaler_append(). this function is used to append event entries to journaling. (2) journal replaying: When we are going to make sure the data and journal are consistent in opening journal, we can call the api of start_replay() to replay the all events recorded but not committed. (3) journal trimming: This is transparent to upper layer, when some old journal entries is not needed, we need to trim the releated objects Signed-off-by: Dongsheng Yang --- include/linux/ceph/journaler.h | 131 +++++ net/ceph/Makefile | 3 +- net/ceph/journaler.c | 1208 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 1341 insertions(+), 1 deletion(-) create mode 100644 include/linux/ceph/journaler.h create mode 100644 net/ceph/journaler.c diff --git a/include/linux/ceph/journaler.h b/include/linux/ceph/journaler.h new file mode 100644 index 0000000..28dfda4 --- /dev/null +++ b/include/linux/ceph/journaler.h @@ -0,0 +1,131 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +#ifndef _FS_CEPH_JOURNAL_H +#define _FS_CEPH_JOURNAL_H + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +struct ceph_msg; +struct ceph_snap_context; +struct ceph_osd_request; +struct ceph_osd_client; + +#define JOURNAL_HEADER_PREFIX "journal." +#define JOURNAL_OBJECT_PREFIX "journal_data." + +#define LOCAL_MIRROR_UUID "" + +static const uint32_t HEADER_FIXED_SIZE = 25; /// preamble, version, entry tid, tag id +static const uint32_t REMAINDER_FIXED_SIZE = 8; /// data size, crc + +static const uint64_t PREAMBLE = 0x3141592653589793; + +struct ceph_journaler_future { + uint64_t tag_tid; + uint64_t entry_tid; + uint64_t commit_tid; +}; + +struct ceph_journaler_entry { + uint64_t tag_tid; + uint64_t entry_tid; + ssize_t data_len; + char *data; + + struct list_head node; +}; + +struct ceph_journaler_entry *ceph_journaler_entry_decode(void **p, void *end); +void ceph_journaler_entry_encode(struct ceph_journaler_entry *entry, void **p, void *end); + +struct entry_tid { + struct list_head node; + uint64_t tag_tid; + uint64_t entry_tid; +}; + +struct commit_entry { + struct rb_node r_node; + uint64_t commit_tid; + uint64_t object_num; + uint64_t tag_tid; + uint64_t entry_tid; + bool committed; +}; + +struct ceph_journaler { + struct ceph_osd_client *osdc; + struct ceph_object_locator data_oloc; + + struct ceph_object_id header_oid; + struct ceph_object_locator header_oloc; + + char *object_oid_prefix; + + struct mutex mutex; + spinlock_t meta_lock; + spinlock_t entry_tid_lock; + spinlock_t commit_lock; + uint8_t order; + uint8_t splay_width; + int64_t pool_id; + + uint64_t commit_tid; + + uint64_t minimum_set; + uint64_t active_set; + + struct list_head clients; + struct ceph_journaler_client *client; + struct ceph_journaler_client *clients_array; + struct list_head object_positions_pending; + + struct rb_root commit_entries; + + struct list_head entry_tids; + + struct workqueue_struct *task_wq; + struct work_struct notify_update_work; + struct work_struct commit_work; + + uint64_t active_tag_tid; + bool commit_pos_valid; + struct ceph_journaler_object_pos *commit_pos; + uint64_t splay_offset; + + int (*handle_entry)(void *entry_handler, struct ceph_journaler_entry *entry); + void *entry_handler; + + struct ceph_osd_linger_request *watch_handle; +}; + +struct ceph_journaler *ceph_journaler_create(struct ceph_osd_client *osdc, + const char *journal_id, + struct ceph_object_locator*_oloc); + +void ceph_journaler_destroy(struct ceph_journaler *journal); + +int ceph_journaler_open(struct ceph_journaler *journal); +void ceph_journaler_close(struct ceph_journaler *journal); + +void start_replay(struct ceph_journaler *journaler); + +int ceph_journaler_append(struct ceph_journaler *journaler, uint64_t tag_tid, char *data, ssize_t data_len, struct ceph_journaler_future **future); +void ceph_journaler_client_committed(struct ceph_journaler *journaler, uint64_t commit_tid); + +int ceph_journaler_allocate_tag(struct ceph_journaler *journaler, uint64_t tag_class, void *buf, uint32_t buf_len, struct ceph_journaler_tag *tag); + +int ceph_journaler_get_cached_client(struct ceph_journaler *journaler, char *client_id, uint32_t id_len, struct ceph_journaler_client *client_result); +#endif diff --git a/net/ceph/Makefile b/net/ceph/Makefile index 12bf497..0572f20 100644 --- a/net/ceph/Makefile +++ b/net/ceph/Makefile @@ -14,5 +14,6 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \ crypto.o armor.o \ auth_x.o \ ceph_fs.o ceph_strings.o ceph_hash.o \ - pagevec.o snapshot.o string_table.o + pagevec.o snapshot.o string_table.o \ + journaler.o cls_journaler_client.o diff --git a/net/ceph/journaler.c b/net/ceph/journaler.c new file mode 100644 index 0000000..3876ed2 --- /dev/null +++ b/net/ceph/journaler.c @@ -0,0 +1,1208 @@ +// SPDX-License-Identifier: GPL-2.0 + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef CONFIG_BLOCK +#include +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define LOCAL_MIRROR_UUID "" + +static char *object_oid_prefix(int pool_id, const char *journal_id) +{ + char *prefix = NULL; + ssize_t len = snprintf(NULL, 0, "%s%d.%s.", JOURNAL_OBJECT_PREFIX, pool_id, journal_id); + + prefix = kzalloc(len + 1, GFP_KERNEL); + + if (!prefix) + return prefix; + + WARN_ON(snprintf(prefix, len + 1, "%s%d.%s.", JOURNAL_OBJECT_PREFIX, pool_id, journal_id) != len); + + return prefix; +} + +static void notify_update(struct ceph_journaler* journaler); +static void journaler_notify_update(struct work_struct *work) +{ + struct ceph_journaler *journaler = container_of(work, struct ceph_journaler, + notify_update_work); + + notify_update(journaler); +} + +static int copy_object_pos(struct ceph_journaler_object_pos *pos, struct ceph_journaler_object_pos **new_pos) +{ + struct ceph_journaler_object_pos *temp_pos; + + temp_pos = kzalloc(sizeof(*temp_pos), GFP_KERNEL); + if (temp_pos == NULL) { + return -ENOMEM; + } + INIT_LIST_HEAD(&temp_pos->node); + temp_pos->object_num = pos->object_num; + temp_pos->tag_tid = pos->tag_tid; + temp_pos->entry_tid = pos->entry_tid; + + *new_pos = temp_pos; + + return 0; +} + +static void journaler_client_commit(struct work_struct *work) +{ + struct ceph_journaler *journaler = container_of(work, struct ceph_journaler, + commit_work); + + struct list_head object_positions; + struct ceph_journaler_object_pos *pos = NULL, *next = NULL; + int ret = 0; + + INIT_LIST_HEAD(&object_positions); + spin_lock(&journaler->commit_lock); + list_for_each_entry_safe(pos, next, &journaler->object_positions_pending, node) { + struct ceph_journaler_object_pos *new_pos = NULL; + + ret = copy_object_pos(pos, &new_pos); + list_add_tail(&new_pos->node, &object_positions); + } + spin_unlock(&journaler->commit_lock); + + ret = ceph_cls_journaler_client_committed(journaler->osdc, &journaler->header_oid, &journaler->header_oloc, journaler->client, &object_positions); + + list_for_each_entry_safe(pos, next, &object_positions, node) { + list_del(&pos->node); + kfree(pos); + } + + queue_work(journaler->task_wq, &journaler->notify_update_work); + return; +} + + +struct ceph_journaler *ceph_journaler_create(struct ceph_osd_client *osdc, + const char *journal_id, + struct ceph_object_locator *oloc) +{ + struct ceph_journaler *journaler = NULL; + int ret = 0; + + journaler = kzalloc(sizeof(struct ceph_journaler), GFP_KERNEL); + if (!journaler) + return NULL; + + ceph_oid_init(&journaler->header_oid); + ret = ceph_oid_aprintf(&journaler->header_oid, GFP_KERNEL, "%s%s", + JOURNAL_HEADER_PREFIX, journal_id); + if (ret) { + pr_err("aprintf error : %d", ret); + goto err_free_journaler; + } + + journaler->osdc = osdc; + ceph_oloc_init(&journaler->header_oloc); + ceph_oloc_copy(&journaler->header_oloc, oloc); + ceph_oloc_init(&journaler->data_oloc); + INIT_LIST_HEAD(&journaler->clients); + INIT_LIST_HEAD(&journaler->entry_tids); + INIT_LIST_HEAD(&journaler->object_positions_pending); + journaler->commit_entries = RB_ROOT; + journaler->object_oid_prefix = object_oid_prefix(journaler->header_oloc.pool, journal_id); + journaler->commit_tid = 0; + journaler->client = NULL; + journaler->clients_array = NULL; + + spin_lock_init(&journaler->meta_lock); + spin_lock_init(&journaler->entry_tid_lock); + spin_lock_init(&journaler->commit_lock); + mutex_init(&journaler->mutex); + INIT_WORK(&journaler->notify_update_work, journaler_notify_update); + INIT_WORK(&journaler->commit_work, journaler_client_commit); + + journaler->task_wq = alloc_ordered_workqueue("journaler-tasks", WQ_MEM_RECLAIM); + if (!journaler->task_wq) + goto err_free_oid_prefix; + + return journaler; + +err_free_oid_prefix: + kfree(journaler->object_oid_prefix); + ceph_oid_destroy(&journaler->header_oid); + ceph_oloc_destroy(&journaler->header_oloc); + ceph_oloc_destroy(&journaler->data_oloc); +err_free_journaler: + kfree(journaler); + return NULL; +} +EXPORT_SYMBOL(ceph_journaler_create); + +void ceph_journaler_destroy(struct ceph_journaler *journaler) +{ + destroy_workqueue(journaler->task_wq); + kfree(journaler->object_oid_prefix); + ceph_oid_destroy(&journaler->header_oid); + ceph_oloc_destroy(&journaler->header_oloc); + ceph_oloc_destroy(&journaler->data_oloc); + kfree(journaler); +} +EXPORT_SYMBOL(ceph_journaler_destroy); + +static void notify_update(struct ceph_journaler* journaler) +{ + int ret; + + ret = ceph_osdc_notify(journaler->osdc, &journaler->header_oid, + &journaler->header_oloc, NULL, 0, + 5000, NULL, NULL); + + if (ret) + pr_err("notify_update failed: %d", ret); +} + +static int set_minimum_set(struct ceph_journaler* journaler, uint64_t minimum_set) +{ + int ret = 0; + + ret = ceph_cls_journaler_set_minimum_set(journaler->osdc, &journaler->header_oid, &journaler->header_oloc, minimum_set); + if (ret < 0) { + pr_err("%s: failed to set_minimum_set: %d", __func__, ret); + return ret; + } + + queue_work(journaler->task_wq, &journaler->notify_update_work); + + return ret; +} + +static int ceph_journaler_obj_remove_sync(struct ceph_journaler *journaler, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc); +static int remove_set(struct ceph_journaler *journaler, uint64_t object_set) +{ + uint64_t object_num = 0; + int splay_offset = 0; + struct ceph_object_id object_oid; + int ret = 0; + + ceph_oid_init(&object_oid); + for (splay_offset = 0; splay_offset < journaler->splay_width; splay_offset++) { + object_num = splay_offset + (object_set * journaler->splay_width); + if (!ceph_oid_empty(&object_oid)) { + ceph_oid_destroy(&object_oid); + ceph_oid_init(&object_oid); + } + ret = ceph_oid_aprintf(&object_oid, GFP_KERNEL, "%s%llu", + journaler->object_oid_prefix, object_num); + if (ret) { + pr_err("aprintf error : %d", ret); + return ret; + } + ret = ceph_journaler_obj_remove_sync(journaler, &object_oid, &journaler->data_oloc); + if (ret < 0) { + pr_err("%s: failed to remove object: %llu", __func__, object_num); + return ret; + } + } + + return ret; +} + +static void destroy_client(struct ceph_journaler_client *client); +static int refresh(struct ceph_journaler *journaler) +{ + int ret = 0; + int i = 0; + uint32_t client_num = 0; + struct ceph_journaler_client *clients = NULL; + struct ceph_journaler_client *client, *next; + uint64_t minimum_commit_set; + uint64_t minimum_set; + + ret = ceph_cls_journaler_get_mutable_metas(journaler->osdc, &journaler->header_oid, &journaler->header_oloc, &journaler->minimum_set, &journaler->active_set); + if (ret) + return ret; + + ret = ceph_cls_journaler_client_list(journaler->osdc, &journaler->header_oid, &journaler->header_oloc, &clients, &client_num); + if (ret) + return ret; + + list_for_each_entry_safe(client, next, &journaler->clients, node) { + list_del(&client->node); + destroy_client(client); + } + + if (journaler->clients_array != NULL) + kfree(journaler->clients_array); + + journaler->clients_array = clients; + + for (i = 0; i < client_num; i++) { + struct ceph_journaler_client *client = &clients[i]; + + list_add_tail(&client->node, &journaler->clients); + if (!memcmp(client->id, LOCAL_MIRROR_UUID, sizeof(LOCAL_MIRROR_UUID))) { + journaler->client = client; + } + } + + minimum_commit_set = journaler->active_set; + list_for_each_entry(client, &journaler->clients, node) { + //TODO check the state of client + struct ceph_journaler_object_pos *pos; + list_for_each_entry(pos, &client->object_positions, node) { + uint64_t object_set = pos->object_num / journaler->splay_width; + if (object_set < journaler->minimum_set) { + minimum_commit_set = object_set; + } + } + } + + minimum_set = journaler->minimum_set; + pr_debug("minimum_commit_set: %llu, minimum_set: %llu", minimum_commit_set, minimum_set); + if (minimum_commit_set > minimum_set) { + uint64_t trim_set = minimum_set; + while (trim_set < minimum_commit_set) { + ret = remove_set(journaler, trim_set); + if (ret < 0) { + pr_err("failed to trim object_set: %llu", trim_set); + return ret; + } + trim_set++; + } + ret = set_minimum_set(journaler, minimum_commit_set); + if (ret < 0) { + pr_err("failed to set minimum set to %llu", minimum_commit_set); + return ret; + } + } + + return 0; + +} + +static void journaler_watch_cb(void *arg, u64 notify_id, u64 cookie, + u64 notifier_id, void *data, size_t data_len) +{ + struct ceph_journaler *journaler = arg; + int ret; + + ret = ceph_osdc_notify_ack(journaler->osdc, &journaler->header_oid, + &journaler->header_oloc, notify_id, cookie, + NULL, 0); + + if (ret) + pr_err("acknowledge_notify failed: %d", ret); + + mutex_lock(&journaler->mutex); + ret = refresh(journaler); + mutex_unlock(&journaler->mutex); + + if (ret < 0) { + pr_err("%s: failed to refresh journaler: %d", __func__, ret); + } +} + +static void journaler_watch_errcb(void *arg, u64 cookie, int err) +{ + pr_err("journaler watch error: %d", err); +} + +static int journaler_watch(struct ceph_journaler *journaler) +{ + struct ceph_osd_client *osdc = journaler->osdc; + struct ceph_osd_linger_request *handle; + + handle = ceph_osdc_watch(osdc, &journaler->header_oid, + &journaler->header_oloc, journaler_watch_cb, + journaler_watch_errcb, journaler); + if (IS_ERR(handle)) + return PTR_ERR(handle); + + journaler->watch_handle = handle; + return 0; +} + +static void journaler_unwatch(struct ceph_journaler *journaler) +{ + struct ceph_osd_client *osdc = journaler->osdc; + int ret = 0; + + ret = ceph_osdc_unwatch(osdc, journaler->watch_handle); + if (ret) + pr_err("%s: failed to unwatch: %d", __func__, ret); + + journaler->watch_handle = NULL; +} + +static int ceph_journaler_obj_remove_sync(struct ceph_journaler *journaler, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc) + +{ + struct ceph_osd_client *osdc = journaler->osdc; + struct ceph_osd_request *req; + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, oid); + ceph_oloc_copy(&req->r_base_oloc, oloc); + req->r_flags = CEPH_OSD_FLAG_WRITE; + + ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); + if (ret) + goto out_req; + + osd_req_op_init(req, 0, CEPH_OSD_OP_DELETE, 0); + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + +out_req: + ceph_osdc_put_request(req); + return ret; +} + +//TODO make it async +static int ceph_journaler_obj_write_sync(struct ceph_journaler *journaler, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + void *buf, int buf_len) + +{ + struct ceph_osd_client *osdc = journaler->osdc; + struct ceph_osd_request *req; + struct page **pages; + int num_pages = calc_pages_for(0, buf_len); + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, oid); + ceph_oloc_copy(&req->r_base_oloc, oloc); + req->r_flags = CEPH_OSD_FLAG_WRITE; + + ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); + if (ret) + goto out_req; + + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) { + ret = PTR_ERR(pages); + goto out_req; + } + + ceph_copy_to_page_vector(pages, buf, 0, buf_len); + + osd_req_op_extent_init(req, 0, CEPH_OSD_OP_APPEND, 0, buf_len, 0, 0); + osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, + true); + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + +out_req: + ceph_osdc_put_request(req); + return ret; +} + +static int ceph_journaler_obj_read_sync(struct ceph_journaler *journaler, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + void *buf, uint32_t read_off, uint64_t buf_len) + +{ + struct ceph_osd_client *osdc = journaler->osdc; + struct ceph_osd_request *req; + struct page **pages; + int num_pages = calc_pages_for(0, buf_len); + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, oid); + ceph_oloc_copy(&req->r_base_oloc, oloc); + req->r_flags = CEPH_OSD_FLAG_READ; + + ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); + if (ret) + goto out_req; + + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) { + ret = PTR_ERR(pages); + goto out_req; + } + + osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, read_off, buf_len, 0, 0); + osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, + true); + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + if (ret >= 0) + ceph_copy_from_page_vector(pages, buf, 0, ret); + +out_req: + ceph_osdc_put_request(req); + return ret; +} + +static bool entry_is_readable(struct ceph_journaler *journaler, void *buf, void *end, uint32_t *bytes_needed) +{ + uint32_t remaining = end - buf; + uint64_t preamble; + uint32_t data_size; + void *origin_buf = buf; + uint32_t crc = 0, crc_encoded = 0; + + if (remaining < HEADER_FIXED_SIZE) { + *bytes_needed = HEADER_FIXED_SIZE - remaining; + return false; + } + + preamble = ceph_decode_64(&buf); + if (PREAMBLE != preamble) { + pr_err("preamble is not correct"); + *bytes_needed = 0; + return false; + } + + buf += (HEADER_FIXED_SIZE - sizeof(preamble)); + remaining = end - buf; + if (remaining < sizeof(uint32_t)) { + *bytes_needed = sizeof(uint32_t) - remaining; + return false; + } + + data_size = ceph_decode_32(&buf); + remaining = end - buf; + if (remaining < data_size) { + *bytes_needed = data_size - remaining; + return false; + } + + buf += data_size; + + remaining = end - buf; + if (remaining < sizeof(uint32_t)) { + *bytes_needed = sizeof(uint32_t) - remaining; + return false; + } + + *bytes_needed = 0; + crc = crc32c(0, origin_buf, buf - origin_buf); + + crc_encoded = ceph_decode_32(&buf); + + if (crc != crc_encoded) { + return false; + } + return true; +} + +static void playback_entry(struct ceph_journaler *journaler, struct ceph_journaler_entry *entry) +{ + //TODO verify the entry, skip stale tag_tid or others. + if (journaler->handle_entry != NULL) { + journaler->handle_entry(journaler->entry_handler, entry); + } +} + +static void reserve_entry_tid(struct ceph_journaler *journaler, uint64_t tag_tid, uint64_t entry_tid) +{ + struct entry_tid *pos; + + spin_lock(&journaler->entry_tid_lock); + list_for_each_entry(pos, &journaler->entry_tids, node) { + if (pos->tag_tid == tag_tid) { + if (pos->entry_tid < entry_tid) { + pos->entry_tid = entry_tid; + } + + spin_unlock(&journaler->entry_tid_lock); + return; + } + } + + pos = kzalloc(sizeof(struct entry_tid), GFP_KERNEL); + WARN_ON(pos == NULL); + + pos->tag_tid = tag_tid; + pos->entry_tid = entry_tid; + INIT_LIST_HEAD(&pos->node); + + list_add_tail(&pos->node, &journaler->entry_tids); + spin_unlock(&journaler->entry_tid_lock); + + return; +} + +DEFINE_RB_INSDEL_FUNCS(commit_entry, struct commit_entry, commit_tid, r_node) + +static int add_commit_entry(struct ceph_journaler *journaler, uint64_t commit_tid, uint64_t object_num, uint64_t tag_tid, uint64_t entry_tid) +{ + struct commit_entry *entry = NULL; + int ret = 0; + + entry = kzalloc(sizeof(*entry), GFP_KERNEL); + if (entry == NULL) { + ret = -ENOMEM; + goto out; + } + RB_CLEAR_NODE(&entry->r_node); + + entry->commit_tid = commit_tid; + entry->object_num = object_num; + entry->tag_tid = tag_tid; + entry->entry_tid = entry_tid; + + spin_lock(&journaler->commit_lock); + insert_commit_entry(&journaler->commit_entries, entry); + spin_unlock(&journaler->commit_lock); + +out: + return ret; +} + +static uint64_t allocate_commit_tid(struct ceph_journaler *journaler, + uint64_t object_num, uint64_t tag_tid, + uint64_t entry_tid) +{ + return ++journaler->commit_tid; +} + +void ceph_journaler_client_committed(struct ceph_journaler *journaler, uint64_t commit_tid); +static void process_entries(struct ceph_journaler *journaler, struct list_head *entry_list, struct ceph_journaler_object_pos *position) +{ + struct ceph_journaler_entry *entry, *next; + bool found_commit = false; + uint64_t commit_tid; + + list_for_each_entry_safe(entry, next, entry_list, node) { + if (entry->tag_tid == position->tag_tid && + entry->entry_tid == position->entry_tid) { + found_commit = true; + continue; + } else if (found_commit) { + reserve_entry_tid(journaler, entry->tag_tid, entry->entry_tid); + commit_tid = allocate_commit_tid(journaler, position->object_num, entry->tag_tid, entry->entry_tid); + playback_entry(journaler, entry); + add_commit_entry(journaler, commit_tid, position->object_num, entry->tag_tid, entry->entry_tid); + ceph_journaler_client_committed(journaler, commit_tid); + } else { + reserve_entry_tid(journaler, entry->tag_tid, entry->entry_tid); + } + list_del(&entry->node); + kfree(entry); + } + return; +} + +static struct ceph_journaler_entry *journaler_entry_decode(void **p, void *end); +static void fetch(struct ceph_journaler *journaler, uint64_t object_num) +{ + struct ceph_object_id object_oid; + int ret = 0; + void *buf = NULL, *read_buf = NULL, *buf_p = NULL; + void *end = NULL; + uint64_t read_len = 2 << journaler->order; + uint32_t read_off = 0; + uint64_t buf_len = read_len; + struct list_head entry_list; + bool position_found = false; + + struct ceph_journaler_object_pos *pos; + + list_for_each_entry(pos, &journaler->client->object_positions, node) { + if (pos->object_num == object_num) { + position_found = true; + break; + } + } + + if (!position_found) { + return; + } + + INIT_LIST_HEAD(&entry_list); + ceph_oid_init(&object_oid); + ret = ceph_oid_aprintf(&object_oid, GFP_KERNEL, "%s%llu", + journaler->object_oid_prefix, object_num); + if (ret) { + pr_err("aprintf error : %d", ret); + return; + } + + buf = vmalloc(buf_len); + if (!buf) { + pr_err("failed to vmalloc buf: %llu", buf_len); + goto err_free_object_oid; + } + read_buf = buf; + buf_p = buf; + +refetch: + ret = ceph_journaler_obj_read_sync(journaler, &object_oid, &journaler->data_oloc, read_buf, read_off, read_len); + if (ret == -ENOENT) { + pr_err("no such object: %d", ret); + goto err_free_buf; + } else if (ret < 0) { + pr_err("failed to read: %d", ret); + goto err_free_buf; + } else if (ret == 0) { + pr_err("no data: %d", ret); + goto err_free_buf; + } + read_off = read_off + ret; + + end = read_buf + ret; + while (buf < end) { + uint32_t bytes_needed = 0; + struct ceph_journaler_entry *entry = NULL; + + if (!entry_is_readable(journaler, buf, end, &bytes_needed)) { + uint64_t remain = end - buf; + if (bytes_needed != 0) { + void *new_buf = vmalloc(read_len + remain); + if (!new_buf) { + pr_err("failed to alloc new buf"); + goto err_free_buf; + } + memcpy(new_buf, buf, remain); + vfree(buf_p); + buf_p = new_buf; + buf = new_buf; + read_buf = buf + remain; + goto refetch; + } else { + pr_err("entry corruption"); + goto err_free_buf; + } + } + entry = journaler_entry_decode(&buf, end); + if (!entry) + goto err_free_buf; + + list_add_tail(&entry->node, &entry_list); + } + + process_entries(journaler, &entry_list, pos); + +err_free_buf: + vfree(buf_p); +err_free_object_oid: + ceph_oid_destroy(&object_oid); + return; +} + +void start_replay(struct ceph_journaler *journaler) +{ + struct ceph_journaler_object_pos *active_pos = NULL; + struct ceph_journaler_client *client = NULL; + uint64_t *fetch_objects = kzalloc(sizeof(uint64_t) * journaler->splay_width, GFP_KERNEL); + int index = 0; + int i = 0; + + mutex_lock(&journaler->mutex); + client = journaler->client; + active_pos = list_first_entry(&journaler->client->object_positions, struct ceph_journaler_object_pos, node); + + journaler->active_tag_tid = active_pos->tag_tid; + journaler->commit_pos_valid = true; + journaler->commit_pos = active_pos; + journaler->splay_offset = active_pos->object_num % journaler->splay_width; + + list_for_each_entry(active_pos, &client->object_positions, node) { + fetch_objects[index++] = active_pos->object_num; + } + + for (i = 0; i < index; i++) { + fetch(journaler, fetch_objects[i]); + } + mutex_unlock(&journaler->mutex); +} +EXPORT_SYMBOL(start_replay); + +int ceph_journaler_open(struct ceph_journaler *journaler) +{ + struct ceph_journaler_object_pos *pos = NULL; + int ret = 0; + + ret = journaler_watch(journaler); + if (ret) { + pr_err("journaler_watch error: %d", ret); + return ret; + } + + ret = ceph_cls_journaler_get_immutable_metas(journaler->osdc, + &journaler->header_oid, + &journaler->header_oloc, + &journaler->order, + &journaler->splay_width, + &journaler->pool_id); + if (ret) { + pr_err("failed to get immutable metas.");; + goto err_unwatch; + } + + if (journaler->pool_id == -1) { + ceph_oloc_copy(&journaler->data_oloc, &journaler->header_oloc); + journaler->pool_id = journaler->data_oloc.pool; + } else { + journaler->data_oloc.pool = journaler->pool_id; + } + + mutex_lock(&journaler->mutex); + ret = refresh(journaler); + mutex_unlock(&journaler->mutex); + if (ret) + goto err_unwatch; + + list_for_each_entry(pos, &journaler->client->object_positions, node) { + struct ceph_journaler_object_pos *new_pos = NULL; + + ret = copy_object_pos(pos, &new_pos); + if (ret) { + goto err_unwatch; + } + + list_add_tail(&new_pos->node, &journaler->object_positions_pending); + } + + return 0; + +err_unwatch: + journaler_unwatch(journaler); + + return ret; +} +EXPORT_SYMBOL(ceph_journaler_open); + +static void destroy_client(struct ceph_journaler_client *client) +{ + struct ceph_journaler_object_pos *pos, *next; + + list_for_each_entry_safe(pos, next, &client->object_positions, node) { + list_del(&pos->node); + kfree(pos); + } + kfree(client->id); + kfree(client->data); + +} + +void ceph_journaler_close(struct ceph_journaler *journaler) +{ + struct ceph_journaler_client *client = NULL, *next = NULL; + struct commit_entry *entry = NULL; + struct entry_tid *entry_tid = NULL, *entry_tid_next = NULL; + struct rb_node *n; + + journaler_unwatch(journaler); + + list_for_each_entry_safe(client, next, &journaler->clients, node) { + list_del(&client->node); + destroy_client(client); + } + + if (journaler->clients_array != NULL) + kfree(journaler->clients_array); + + for (n = rb_first(&journaler->commit_entries); n;) { + entry = rb_entry(n, struct commit_entry, r_node); + + n = rb_next(n); + erase_commit_entry(&journaler->commit_entries, entry); + kfree(entry); + } + + + list_for_each_entry_safe(entry_tid, entry_tid_next, &journaler->entry_tids, node) { + list_del(&entry_tid->node); + kfree(entry_tid); + } + + return; +} +EXPORT_SYMBOL(ceph_journaler_close); + +static struct ceph_journaler_entry *journaler_entry_decode(void **p, void *end) +{ + struct ceph_journaler_entry *entry = NULL; + uint64_t preamble = 0; + uint8_t version = 0; + uint32_t crc = 0, crc_encoded = 0; + void *start = *p; + + preamble = ceph_decode_64(p); + if (PREAMBLE != preamble) { + return NULL; + } + + version = ceph_decode_8(p); + + if (version != 1) + return NULL; + + entry = kzalloc(sizeof(struct ceph_journaler_entry), GFP_KERNEL); + + INIT_LIST_HEAD(&entry->node); + entry->entry_tid = ceph_decode_64(p); + entry->tag_tid = ceph_decode_64(p); + entry->data = ceph_extract_encoded_string(p, end, &entry->data_len, GFP_NOIO); + if (!entry->data) + goto error; + + crc = crc32c(0, start, *p - start); + + crc_encoded = ceph_decode_32(p); + + if (crc != crc_encoded) + goto free_data; + + return entry; +free_data: + kfree(entry->data); +error: + kfree(entry); + return NULL; +} + +static void journaler_entry_encode(struct ceph_journaler_entry *entry, void **p, void *end) +{ + void *start = *p; + uint32_t crc = 0; + + ceph_encode_64(p, PREAMBLE); + ceph_encode_8(p, (uint8_t)1); + ceph_encode_64(p, entry->entry_tid); + ceph_encode_64(p, entry->tag_tid); + ceph_encode_string(p, end, entry->data, entry->data_len); + + crc = crc32c(0, start, *p - start); + + ceph_encode_32(p, crc); + + return; +} + +// record +static ssize_t ceph_entry_buf_size(struct ceph_journaler_entry *entry) +{ + // PEAMBLE(8) + version(1) + entry_tid(8) + tag_tid(8) + string_len(4) + crc(4) = 33 + return entry->data_len + 33; +} + +static uint64_t get_object(struct ceph_journaler *journaler, uint64_t splay_offset) +{ + return splay_offset + (journaler->splay_width * journaler->active_set); +} + +static void advance_object_set(struct ceph_journaler *journaler) +{ + int ret = 0; + + journaler->active_set++; + + ret = ceph_cls_journaler_set_active_set(journaler->osdc, &journaler->header_oid, &journaler->header_oloc, journaler->active_set); +} + +static int ceph_journaler_object_append(struct ceph_journaler *journaler, uint64_t object_num, + struct ceph_journaler_future *future, + struct ceph_journaler_entry *entry) +{ + void *buf = NULL; + void *start_buf = NULL; + void *end = NULL; + ssize_t buf_len; + struct ceph_object_id object_oid; + int ret = 0; + + + buf_len = ceph_entry_buf_size(entry); + buf = vmalloc(buf_len); + end = buf + buf_len; + start_buf = buf; + + journaler_entry_encode(entry, &buf, end); + + ceph_oid_init(&object_oid); + +retry: + if (!ceph_oid_empty(&object_oid)) { + ceph_oid_destroy(&object_oid); + ceph_oid_init(&object_oid); + } + ret = ceph_oid_aprintf(&object_oid, GFP_KERNEL, "%s%llu", + journaler->object_oid_prefix, object_num); + + //TODO send "guard append" and "write" in a single request + ret = ceph_cls_journaler_guard_append(journaler->osdc, &object_oid, &journaler->header_oloc, 1 << journaler->order); + + if (ret == -EOVERFLOW) { + pr_debug("overflow: %llu", journaler->active_set); + advance_object_set(journaler); + object_num = get_object(journaler, entry->entry_tid % journaler->splay_width); + goto retry; + } + + ret = ceph_journaler_obj_write_sync(journaler, &object_oid, &journaler->data_oloc, start_buf, buf_len); + + if (ret) { + pr_err("error in write entry: %d", ret); + } + + pr_debug("write event: %d, tagtid: %llu", ret, entry->tag_tid); + + return ret; +} + +static uint64_t allocate_entry_tid(struct ceph_journaler *journaler, uint64_t tag_tid) +{ + struct entry_tid *pos = NULL; + uint64_t entry_tid = 0; + + spin_lock(&journaler->entry_tid_lock); + list_for_each_entry(pos, &journaler->entry_tids, node) { + if (pos->tag_tid == tag_tid) { + entry_tid = pos->entry_tid++; + spin_unlock(&journaler->entry_tid_lock); + return entry_tid; + } + } + pos = kzalloc(sizeof(struct entry_tid), GFP_KERNEL); + WARN_ON(pos == NULL); + + pos->tag_tid = tag_tid; + pos->entry_tid = 0; + INIT_LIST_HEAD(&pos->node); + + list_add_tail(&pos->node, &journaler->entry_tids); + entry_tid = pos->entry_tid++; + spin_unlock(&journaler->entry_tid_lock); + + return entry_tid; +} + +static struct ceph_journaler_future *create_future(uint64_t tag_tid, uint64_t entry_tid, uint64_t commit_tid) +{ + struct ceph_journaler_future *future = NULL; + + future = kzalloc(sizeof(struct ceph_journaler_future), GFP_KERNEL); + if (!future) + return NULL; + future->tag_tid = tag_tid; + future->entry_tid = entry_tid; + future->commit_tid = commit_tid; + + return future; +} + +static struct ceph_journaler_entry *create_entry(uint64_t tag_tid, uint64_t entry_tid, char* data, ssize_t data_len) +{ + struct ceph_journaler_entry *entry = NULL; + + entry = kzalloc(sizeof(struct ceph_journaler_entry), GFP_KERNEL); + if (!entry) + return NULL; + entry->tag_tid = tag_tid; + entry->entry_tid = entry_tid; + entry->data = data; + entry->data_len = data_len; + + return entry; +} + +int ceph_journaler_append(struct ceph_journaler *journaler, uint64_t tag_tid, char *data, ssize_t data_len, struct ceph_journaler_future **journal_future) +{ + uint64_t entry_tid; + uint8_t splay_width; + uint8_t splay_offset; + + uint64_t object_num; + uint64_t commit_tid; + + struct ceph_journaler_future *future; + + struct ceph_journaler_entry *entry; + + int ret = 0; + + spin_lock(&journaler->meta_lock); + entry_tid = allocate_entry_tid(journaler, tag_tid); + splay_width = journaler->splay_width; + splay_offset = entry_tid % splay_width; + + object_num = get_object(journaler, splay_offset); + commit_tid = allocate_commit_tid(journaler, object_num, tag_tid, entry_tid); + + future = create_future(tag_tid, entry_tid, commit_tid); + + entry = create_entry(tag_tid, entry_tid, data, data_len); + spin_unlock(&journaler->meta_lock); + + ret = ceph_journaler_object_append(journaler, object_num, future, entry); + if (ret) + goto out; + + ret = add_commit_entry(journaler, commit_tid, object_num, tag_tid, entry_tid); + if (ret) + goto out; + + *journal_future = future; +out: + return ret; +} +EXPORT_SYMBOL(ceph_journaler_append); + +static int add_object_position(struct commit_entry *entry, struct list_head *object_positions, uint64_t splay_width) +{ + struct ceph_journaler_object_pos *position = NULL; + uint8_t splay_offset = entry->object_num % splay_width; + bool found = false; + int ret = 0; + + list_for_each_entry(position, object_positions, node) { + if (splay_offset == position->object_num % splay_width) { + found = true; + break; + } + } + + if (!found) { + position = kzalloc(sizeof(*position), GFP_KERNEL); + + if (!position) { + pr_err("failed to allocate position"); + return -ENOMEM; + } + list_add(&position->node, object_positions); + } else { + list_move(&position->node, object_positions); + } + + position->object_num = entry->object_num; + position->tag_tid = entry->tag_tid; + position->entry_tid = entry->entry_tid; + + return ret; +} + +void ceph_journaler_client_committed(struct ceph_journaler *journaler, uint64_t commit_tid) +{ + struct commit_entry *entry = NULL; + bool update_client_commit = true; + struct list_head object_positions; + struct rb_node *n; + + INIT_LIST_HEAD(&object_positions); + spin_lock(&journaler->commit_lock); + for (n = rb_first(&journaler->commit_entries); n; n = rb_next(n)) { + entry = rb_entry(n, struct commit_entry, r_node); + + if (entry->commit_tid == commit_tid) { + entry->committed = true; + break; + } + + if (entry->committed == false) { + update_client_commit = false; + } + } + + if (update_client_commit) { + for (n = rb_first(&journaler->commit_entries); n;) { + entry = rb_entry(n, struct commit_entry, r_node); + n = rb_next(n); + + if (entry->commit_tid > commit_tid) + break; + add_object_position(entry, &journaler->object_positions_pending, journaler->splay_width); + erase_commit_entry(&journaler->commit_entries, entry); + kfree(entry); + } + } + spin_unlock(&journaler->commit_lock); + + if (update_client_commit) { + queue_work(journaler->task_wq, &journaler->commit_work); + } +} +EXPORT_SYMBOL(ceph_journaler_client_committed); + +int ceph_journaler_allocate_tag(struct ceph_journaler *journaler, uint64_t tag_class, void *buf, uint32_t buf_len, struct ceph_journaler_tag *tag) +{ + uint64_t tag_tid = 0; + int ret = 0; + + ret = ceph_cls_journaler_get_next_tag_tid(journaler->osdc, + &journaler->header_oid, + &journaler->header_oloc, + &tag_tid); + if (ret) + goto out; + + ret = ceph_cls_journaler_tag_create(journaler->osdc, + &journaler->header_oid, + &journaler->header_oloc, + tag_tid, tag_class, + buf, buf_len); + if (ret) + goto out; + + ret = ceph_cls_journaler_get_tag(journaler->osdc, + &journaler->header_oid, + &journaler->header_oloc, + tag_tid, tag); + if (ret) + goto out; + +out: + return ret; +} +EXPORT_SYMBOL(ceph_journaler_allocate_tag); + +int ceph_journaler_get_cached_client(struct ceph_journaler *journaler, char *client_id, uint32_t id_len, struct ceph_journaler_client *client_result) +{ + struct ceph_journaler_client *client = list_first_entry(&journaler->clients, struct ceph_journaler_client, node); + + int ret = -ENOENT; + + list_for_each_entry(client, &journaler->clients, node) { + if (!memcmp(client->id, client_id, sizeof(*client_id))) { + client_result = client; + ret = 0; + break; + } + } + + return ret; +} +EXPORT_SYMBOL(ceph_journaler_get_cached_client); From patchwork Thu Aug 16 05:59:32 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dongsheng Yang X-Patchwork-Id: 10566991 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 825EE913 for ; Thu, 16 Aug 2018 06:06:09 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 6745A2AB89 for ; Thu, 16 Aug 2018 06:06:09 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 5A2BC2AB91; Thu, 16 Aug 2018 06:06:09 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.9 required=2.0 tests=BAYES_00,MAILING_LIST_MULTI, RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 620262AB89 for ; Thu, 16 Aug 2018 06:06:08 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2388827AbeHPJCL (ORCPT ); Thu, 16 Aug 2018 05:02:11 -0400 Received: from m50149.mail.qiye.163.com ([123.125.50.149]:13155 "EHLO m50149.mail.qiye.163.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S2388770AbeHPJCL (ORCPT ); Thu, 16 Aug 2018 05:02:11 -0400 Received: from atest-guest.localdomain (unknown [218.94.118.90]) by smtp8 (Coremail) with SMTP id RdOowACHY5HTEnVbpKRVAA--.250S6; Thu, 16 Aug 2018 13:59:49 +0800 (CST) From: Dongsheng Yang To: idryomov@gmail.com, sage@redhat.com, elder@kernel.org, jdillama@redhat.com Cc: ceph-devel@vger.kernel.org, dongsheng.yang@easystack.cn Subject: [PATCH 4/4] rbd: enable journaling Date: Thu, 16 Aug 2018 01:59:32 -0400 Message-Id: <1534399172-27610-5-git-send-email-dongsheng.yang@easystack.cn> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1534399172-27610-1-git-send-email-dongsheng.yang@easystack.cn> References: <1534399172-27610-1-git-send-email-dongsheng.yang@easystack.cn> X-CM-TRANSID: RdOowACHY5HTEnVbpKRVAA--.250S6 X-Coremail-Antispam: 1Uf129KBjvAXoW3CrW5GryrKryDKFyrWF43GFg_yoW8WF4xto WxJr1fXa1rXF4UAFykur4IqFyrW3s7G3Z0yrWYqa1q9an2k34FkwnrCa13t3sYyF1fAFs7 Gw48Xwn3GFW8tw45n29KB7ZKAUJUUUU8529EdanIXcx71UUUUU7v73VFW2AGmfu7bjvjm3 AaLaJ3UbIYCTnIWIevJa73UjIFyTuYvjfUJT5lUUUUU X-Originating-IP: [218.94.118.90] X-CM-SenderInfo: 5grqw2pkhqwhp1dqwq5hdv52pwdfyhdfq/1tbigRWkelsflplyVwAAsk Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP Signed-off-by: Dongsheng Yang --- drivers/block/rbd.c | 478 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 472 insertions(+), 6 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 2b4e90d..90ed6f4 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -34,10 +34,12 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -115,12 +117,14 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_FEATURE_LAYERING (1ULL<<0) #define RBD_FEATURE_STRIPINGV2 (1ULL<<1) #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2) +#define RBD_FEATURE_JOURNALING (1ULL<<6) #define RBD_FEATURE_DATA_POOL (1ULL<<7) #define RBD_FEATURE_OPERATIONS (1ULL<<8) #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ RBD_FEATURE_STRIPINGV2 | \ RBD_FEATURE_EXCLUSIVE_LOCK | \ + RBD_FEATURE_JOURNALING | \ RBD_FEATURE_DATA_POOL | \ RBD_FEATURE_OPERATIONS) @@ -293,6 +297,8 @@ struct rbd_img_request { u32 obj_request_count; u32 pending_count; + struct ceph_journaler_future *journal_future; + struct kref kref; }; @@ -375,6 +381,8 @@ struct rbd_device { atomic_t parent_ref; struct rbd_device *parent; + struct rbd_journal *journal; + /* Block layer tags. */ struct blk_mq_tag_set tag_set; @@ -402,6 +410,14 @@ enum rbd_dev_flags { RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */ }; +struct rbd_journal { + struct ceph_journaler *journaler; + + struct ceph_journaler_client *client; + + uint64_t tag_tid; +}; + static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ static LIST_HEAD(rbd_dev_list); /* devices */ @@ -2562,12 +2578,19 @@ static void rbd_img_end_child_request(struct rbd_img_request *img_req) static void rbd_img_end_request(struct rbd_img_request *img_req) { rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags)); - rbd_assert((!img_req->result && - img_req->xferred == blk_rq_bytes(img_req->rq)) || - (img_req->result < 0 && !img_req->xferred)); - blk_mq_end_request(img_req->rq, - errno_to_blk_status(img_req->result)); + if (img_req->rq) { + rbd_assert((!img_req->result && + img_req->xferred == blk_rq_bytes(img_req->rq)) || + (img_req->result < 0 && !img_req->xferred)); + blk_mq_end_request(img_req->rq, + errno_to_blk_status(img_req->result)); + } + + if (img_req->journal_future) { + ceph_journaler_client_committed(img_req->rbd_dev->journal->journaler, img_req->journal_future->commit_tid); + } + rbd_img_request_put(img_req); } @@ -2576,8 +2599,9 @@ static void rbd_obj_handle_request(struct rbd_obj_request *obj_req) struct rbd_img_request *img_req; again: - if (!__rbd_obj_handle_request(obj_req)) + if (!__rbd_obj_handle_request(obj_req)) { return; + } img_req = obj_req->img_request; spin_lock(&img_req->completion_lock); @@ -3602,6 +3626,10 @@ static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire) return ret; } +static int rbd_journal_append(struct rbd_device *rbd_dev, struct bio *bio, + u64 offset, u64 length, enum obj_operation_type op_type, + struct ceph_journaler_future **journal_future); + static void rbd_queue_workfn(struct work_struct *work) { struct request *rq = blk_mq_rq_from_pdu(work); @@ -3707,6 +3735,13 @@ static void rbd_queue_workfn(struct work_struct *work) if (result) goto err_img_request; + if (rbd_dev->header.features & RBD_FEATURE_JOURNALING) { + result = rbd_journal_append(rbd_dev, rq->bio, offset, length, op_type, &img_request->journal_future); + if (result) { + goto err_unlock; + } + } + rbd_img_request_submit(img_request); if (must_be_locked) up_read(&rbd_dev->lock_rwsem); @@ -5507,6 +5542,7 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) if (ret) goto err_out_disk; + set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only); @@ -5548,8 +5584,320 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev) return ret; } +enum rbd_journal_event_type { + EVENT_TYPE_AIO_DISCARD = 0, + EVENT_TYPE_AIO_WRITE = 1, + EVENT_TYPE_AIO_FLUSH = 2, + EVENT_TYPE_OP_FINISH = 3, + EVENT_TYPE_SNAP_CREATE = 4, + EVENT_TYPE_SNAP_REMOVE = 5, + EVENT_TYPE_SNAP_RENAME = 6, + EVENT_TYPE_SNAP_PROTECT = 7, + EVENT_TYPE_SNAP_UNPROTECT = 8, + EVENT_TYPE_SNAP_ROLLBACK = 9, + EVENT_TYPE_RENAME = 10, + EVENT_TYPE_RESIZE = 11, + EVENT_TYPE_FLATTEN = 12, + EVENT_TYPE_DEMOTE_PROMOTE = 13, + EVENT_TYPE_SNAP_LIMIT = 14, + EVENT_TYPE_UPDATE_FEATURES = 15, + EVENT_TYPE_METADATA_SET = 16, + EVENT_TYPE_METADATA_REMOVE = 17, + EVENT_TYPE_AIO_WRITESAME = 18, + EVENT_TYPE_AIO_COMPARE_AND_WRITE = 19, +}; + +static struct bio_vec *setup_write_bvecs(void *buf, u64 offset, u64 length) +{ + u32 i; + struct bio_vec *bvecs = NULL; + u32 bvec_count = 0; + + bvec_count = calc_pages_for(offset, length); + bvecs = kcalloc(bvec_count, sizeof(*bvecs), GFP_NOIO); + if (!bvecs) + goto err; + + offset = offset % PAGE_SIZE; + for (i = 0; i < bvec_count; i++) { + unsigned int len = min(length, (u64)PAGE_SIZE - offset); + + bvecs[i].bv_page = alloc_page(GFP_NOIO); + if (!bvecs[i].bv_page) + goto free_bvecs; + + bvecs[i].bv_offset = offset; + bvecs[i].bv_len = len; + memcpy(page_address(bvecs[i].bv_page) + bvecs[i].bv_offset, buf, bvecs[i].bv_len); + length -= len; + buf += len; + offset = 0; + } + + rbd_assert(!length); + + return bvecs; + +free_bvecs: +err: + return NULL; +} + +static int rbd_journal_handle_aio_discard(struct rbd_device *rbd_dev, void **p, void *end, u8 struct_v) +{ + uint64_t offset; + uint64_t length; + int result = 0; + enum obj_operation_type op_type; + struct rbd_img_request *img_request; + struct ceph_snap_context *snapc = NULL; + + offset = ceph_decode_64(p); + length = ceph_decode_64(p); + + snapc = rbd_dev->header.snapc; + ceph_get_snap_context(snapc); + op_type = OBJ_OP_WRITE; + + img_request = rbd_img_request_create(rbd_dev, op_type, snapc); + if (!img_request) { + result = -ENOMEM; + goto err; + } + + result = rbd_img_fill_nodata(img_request, offset, length); + if (result) + goto err; + + rbd_img_request_submit(img_request); + +err: + return result; +} + +static int rbd_journal_handle_aio_write(struct rbd_device *rbd_dev, void **p, void *end, u8 struct_v) +{ + uint64_t offset; + uint64_t length; + char *data; + ssize_t data_len; + int result = 0; + enum obj_operation_type op_type; + struct ceph_snap_context *snapc = NULL; + struct rbd_img_request *img_request; + + struct ceph_file_extent ex; + struct bio_vec *bvecs; + + offset = ceph_decode_64(p); + length = ceph_decode_64(p); + + data = ceph_extract_encoded_string(p, end, &data_len, GFP_NOIO); + if (!data) + return -ENOMEM; + + snapc = rbd_dev->header.snapc; + ceph_get_snap_context(snapc); + op_type = OBJ_OP_WRITE; + + img_request = rbd_img_request_create(rbd_dev, op_type, snapc); + if (!img_request) { + result = -ENOMEM; + goto err; + } + + snapc = NULL; /* img_request consumes a ref */ + + ex.fe_off = offset; + ex.fe_len = length; + + bvecs = setup_write_bvecs(data, offset, length); + if (!bvecs) + pr_err("failed to alloc bvecs."); + result = rbd_img_fill_from_bvecs(img_request, + &ex, 1, bvecs); + + if (result) + goto err; + + rbd_img_request_submit(img_request); + +err: + return result; +} + +static int rbd_journal_replay(void *entry_handler, struct ceph_journaler_entry *entry) +{ + struct rbd_device *rbd_dev = entry_handler; + void **p = (void **)(&entry->data); + void *end = *p + entry->data_len; + uint32_t event_type; + u8 struct_v; + u32 struct_len; + int ret = 0; + + ret = ceph_start_decoding(p, end, 1, "rbd_decode_entry", + &struct_v, &struct_len); + if (ret) + return -EINVAL; + + event_type = ceph_decode_32(p); + + switch (event_type) { + case EVENT_TYPE_AIO_WRITE: + rbd_journal_handle_aio_write(rbd_dev, p, end, struct_v); + break; + case EVENT_TYPE_AIO_DISCARD: + rbd_journal_handle_aio_discard(rbd_dev, p, end, struct_v); + break; + default: + pr_err("unknown event_type: %u", event_type); + } + return 0; +} + +static int rbd_dev_open_journal(struct rbd_device *rbd_dev) +{ + int ret = 0; + struct ceph_journaler *journaler = NULL; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + + rbd_dev->journal = kmalloc(sizeof(struct rbd_journal), GFP_KERNEL); + if (!rbd_dev->journal) + return -ENOMEM; + + journaler = ceph_journaler_create(osdc, rbd_dev->spec->image_id, &rbd_dev->header_oloc); + if (!journaler) { + ret = -ENOMEM; + goto err_free_journal; + } + + journaler->entry_handler = rbd_dev; + journaler->handle_entry = rbd_journal_replay; + + ret = ceph_journaler_open(journaler); + if (ret) + goto err_destroy_journaler; + + + rbd_dev->journal->journaler = journaler; + + start_replay(journaler); + + return ret; + +err_destroy_journaler: + ceph_journaler_destroy(journaler); +err_free_journal: + kfree(rbd_dev->journal); + rbd_dev->journal = NULL; + return ret; +} + +static void rbd_dev_close_journal(struct rbd_device *rbd_dev) +{ + struct ceph_journaler *journaler = NULL; + + if (!rbd_dev->journal) + return; + + journaler = rbd_dev->journal->journaler; + ceph_journaler_close(journaler); + ceph_journaler_destroy(journaler); + kfree(rbd_dev->journal); + rbd_dev->journal = NULL; +} + +#define LOCAL_MIRROR_UUID "" + +typedef struct rbd_journal_tag_predecessor { + bool commit_valid; + uint64_t tag_tid; + uint64_t entry_tid; + uint32_t uuid_len; + char *mirror_uuid; +} rbd_journal_tag_predecessor; + +typedef struct rbd_journal_tag_data { + struct rbd_journal_tag_predecessor predecessor; + uint32_t uuid_len; + char *mirror_uuid; +} rbd_journal_tag_data; + +static uint32_t tag_data_encoding_size(struct rbd_journal_tag_data *tag_data) +{ + // sizeof(uuid_len) 4 + uuid_len + 1 commit_valid + 8 tag_tid + 8 entry_tid + 4 sizeof(uuid_len) + uuid_len + return (4 + tag_data->uuid_len + 1 + 8 + 8 + 4 + tag_data->predecessor.uuid_len); +} + +static void predecessor_encode(void **p, void *end, struct rbd_journal_tag_predecessor *predecessor) +{ + ceph_encode_string(p, end, predecessor->mirror_uuid, predecessor->uuid_len); + ceph_encode_8(p, predecessor->commit_valid); + ceph_encode_64(p, predecessor->tag_tid); + ceph_encode_64(p, predecessor->entry_tid); +} + +static int rbd_journal_encode_tag_data(void **p, void *end, struct rbd_journal_tag_data *tag_data) +{ + struct rbd_journal_tag_predecessor *predecessor = &tag_data->predecessor; + + ceph_encode_string(p, end, tag_data->mirror_uuid, tag_data->uuid_len); + predecessor_encode(p, end, predecessor); + + return 0; +} + +static int rbd_dev_allocate_journal_tag(struct rbd_device *rbd_dev) +{ + struct ceph_journaler_tag tag; + struct ceph_journaler *journaler = rbd_dev->journal->journaler; + struct rbd_journal_tag_predecessor *predecessor; + struct ceph_journaler_object_pos *position; + struct rbd_journal_tag_data tag_data; + void *buf = NULL, *p = NULL, *end = NULL; + uint32_t buf_len; + int ret = 0; + + predecessor = &tag_data.predecessor; + + position = list_first_entry(&journaler->client->object_positions, struct ceph_journaler_object_pos, node); + + predecessor->commit_valid = true; + predecessor->tag_tid = position->tag_tid; + predecessor->entry_tid = position->entry_tid; + predecessor->uuid_len = 0; + predecessor->mirror_uuid = LOCAL_MIRROR_UUID; + + tag_data.uuid_len = 0; + tag_data.mirror_uuid = LOCAL_MIRROR_UUID; + + buf_len = tag_data_encoding_size(&tag_data); + + p = kmalloc(buf_len, GFP_KERNEL); + if (!p) { + pr_err("failed to allocate tag data"); + return -ENOMEM; + } + + end = p + buf_len; + buf = p; + ret = rbd_journal_encode_tag_data(&p, end, &tag_data); + if (ret) { + pr_err("error in tag data"); + return ret; + } + + ret = ceph_journaler_allocate_tag(journaler, 0, buf, buf_len, &tag); + + rbd_dev->journal->tag_tid = tag.tid; + + return ret; +} + static void rbd_dev_image_release(struct rbd_device *rbd_dev) { + rbd_dev_close_journal(rbd_dev); rbd_dev_unprobe(rbd_dev); if (rbd_dev->opts) rbd_unregister_watch(rbd_dev); @@ -5558,6 +5906,117 @@ static void rbd_dev_image_release(struct rbd_device *rbd_dev) rbd_dev->spec->image_id = NULL; } +struct AioWriteEvent { + enum rbd_journal_event_type type; + uint64_t offset; + uint64_t length; + char data[0]; +}; + +static uint32_t EVENT_FIXED_SIZE = CEPH_ENCODING_START_BLK_LEN + 4; /// version encoding, type +//static uint32_t METADATA_FIXED_SIZE = CEPH_ENCODING_START_BLK_LEN + 8; /// version encoding, timestamp + +static int copy_data_from_bio(char *data, struct bio *bio, u64 length) +{ + struct bio_vec bv; + struct bvec_iter iter; + char *buf = NULL; + u64 offset = 0; + +next: + bio_for_each_segment(bv, bio, iter) { + buf = page_address(bv.bv_page) + bv.bv_offset; + memcpy(data + offset, buf, bv.bv_len); + offset += bv.bv_len; + } + + if (bio->bi_next) { + bio = bio->bi_next; + goto next; + } + + return 0; +} + +static int rbd_journal_append_write_event(struct rbd_device *rbd_dev, struct bio *bio, + u64 offset, u64 length, struct ceph_journaler_future **journal_future) +{ + char *data = NULL; + + void *p = NULL; + void *end = NULL; + void *buf = NULL; + uint32_t buf_len = 0; + + data = kmalloc(length, GFP_KERNEL); + if (!data) { + pr_err("failed to alloc memeory for data"); + return -ENOMEM; + } + + copy_data_from_bio(data, bio, length); + + buf_len = EVENT_FIXED_SIZE + 8 + 8 + 4 + length; + + p = kmalloc(buf_len, GFP_KERNEL); + end = p + buf_len; + buf = p; + + ceph_start_encoding(&p, 1, 1, buf_len - 6); + + ceph_encode_32(&p, EVENT_TYPE_AIO_WRITE); + + ceph_encode_64(&p, offset); + ceph_encode_64(&p, length); + + ceph_encode_string(&p, end, data, length); + + ceph_journaler_append(rbd_dev->journal->journaler, rbd_dev->journal->tag_tid, buf, buf_len, journal_future); + kfree(data); + kfree(buf); + + return 0; +} + +static int rbd_journal_append_discard_event(struct rbd_device *rbd_dev, struct bio *bio, + u64 offset, u64 length, struct ceph_journaler_future **journal_future) +{ + void *p = NULL; + void *buf = NULL; + uint32_t buf_len = 0; + + buf_len = EVENT_FIXED_SIZE + 8 + 8; + + p = kmalloc(buf_len, GFP_KERNEL); + buf = p; + + ceph_start_encoding(&p, 1, 1, buf_len - 6); + + ceph_encode_32(&p, EVENT_TYPE_AIO_DISCARD); + + ceph_encode_64(&p, offset); + ceph_encode_64(&p, length); + + ceph_journaler_append(rbd_dev->journal->journaler, rbd_dev->journal->tag_tid, buf, buf_len, journal_future); + kfree(buf); + + return 0; +} + +static int rbd_journal_append(struct rbd_device *rbd_dev, struct bio *bio, + u64 offset, u64 length, enum obj_operation_type op_type, + struct ceph_journaler_future **journal_future) +{ + switch (op_type) { + case OBJ_OP_WRITE: + return rbd_journal_append_write_event(rbd_dev, bio, offset, length, journal_future); + case OBJ_OP_DISCARD: + return rbd_journal_append_discard_event(rbd_dev, bio, offset, length, journal_future); + default: + return 0; + } +} + /* * Probe for the existence of the header object for the given rbd * device. If this image is the one being mapped (i.e., not a @@ -5634,6 +6093,13 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) if (ret) goto err_out_probe; + if (rbd_dev->header.features & RBD_FEATURE_JOURNALING) { + ret = rbd_dev_open_journal(rbd_dev); + if (ret) + goto err_out_probe; + rbd_dev_allocate_journal_tag(rbd_dev); + } + dout("discovered format %u image, header name is %s\n", rbd_dev->image_format, rbd_dev->header_oid.name); return 0;