@@ -1789,7 +1789,7 @@ static int __rbd_object_map_load(struct rbd_device *rbd_dev)
rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
"rbd", "object_map_load", CEPH_OSD_FLAG_READ,
- NULL, 0, reply);
+ NULL, reply);
if (ret)
goto out;
@@ -4553,8 +4553,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
size_t inbound_size)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
- struct ceph_databuf *reply;
- struct page *req_page = NULL;
+ struct ceph_databuf *request = NULL, *reply;
+ void *p;
int ret;
/*
@@ -4568,32 +4568,33 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
if (outbound_size > PAGE_SIZE)
return -E2BIG;
- req_page = alloc_page(GFP_KERNEL);
- if (!req_page)
+ request = ceph_databuf_req_alloc(0, outbound_size, GFP_KERNEL);
+ if (!request)
return -ENOMEM;
- memcpy(page_address(req_page), outbound, outbound_size);
+ p = kmap_ceph_databuf_page(request, 0);
+ memcpy(p, outbound, outbound_size);
+ kunmap_local(p);
+ ceph_databuf_added_data(request, outbound_size);
}
reply = ceph_databuf_reply_alloc(1, inbound_size, GFP_KERNEL);
if (!reply) {
- if (req_page)
- __free_page(req_page);
- return -ENOMEM;
+ ret = -ENOMEM;
+ goto out;
}
ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
- CEPH_OSD_FLAG_READ, req_page, outbound_size,
- reply);
+ CEPH_OSD_FLAG_READ, request, reply);
if (!ret) {
ret = ceph_databuf_len(reply);
if (copy_from_iter(inbound, ret, &reply->iter) != ret)
ret = -EIO;
}
- if (req_page)
- __free_page(req_page);
ceph_databuf_release(reply);
+out:
+ ceph_databuf_release(request);
return ret;
}
@@ -5513,7 +5514,7 @@ static int decode_parent_image_spec(void **p, void *end,
}
static int __get_parent_info(struct rbd_device *rbd_dev,
- struct page *req_page,
+ struct ceph_databuf *request,
struct ceph_databuf *reply,
struct parent_image_info *pii)
{
@@ -5524,7 +5525,7 @@ static int __get_parent_info(struct rbd_device *rbd_dev,
ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
"rbd", "parent_get", CEPH_OSD_FLAG_READ,
- req_page, sizeof(u64), reply);
+ request, reply);
if (ret)
return ret == -EOPNOTSUPP ? 1 : ret;
@@ -5539,7 +5540,7 @@ static int __get_parent_info(struct rbd_device *rbd_dev,
ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
"rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
- req_page, sizeof(u64), reply);
+ request, reply);
if (ret)
return ret;
@@ -5563,7 +5564,7 @@ static int __get_parent_info(struct rbd_device *rbd_dev,
* The caller is responsible for @pii.
*/
static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
- struct page *req_page,
+ struct ceph_databuf *request,
struct ceph_databuf *reply,
struct parent_image_info *pii)
{
@@ -5573,7 +5574,7 @@ static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
"rbd", "get_parent", CEPH_OSD_FLAG_READ,
- req_page, sizeof(u64), reply);
+ request, reply);
if (ret)
return ret;
@@ -5604,29 +5605,29 @@ static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev,
struct parent_image_info *pii)
{
- struct ceph_databuf *reply;
- struct page *req_page;
+ struct ceph_databuf *request, *reply;
void *p;
int ret = -ENOMEM;
- req_page = alloc_page(GFP_KERNEL);
- if (!req_page)
+ request = ceph_databuf_req_alloc(0, sizeof(__le64), GFP_KERNEL);
+ if (!request)
goto out;
reply = ceph_databuf_reply_alloc(1, PAGE_SIZE, GFP_KERNEL);
if (!reply)
goto out_free;
- p = kmap_local_page(req_page);
+ p = kmap_ceph_databuf_page(request, 0);
ceph_encode_64(&p, rbd_dev->spec->snap_id);
kunmap_local(p);
- ret = __get_parent_info(rbd_dev, req_page, reply, pii);
+ ceph_databuf_added_data(request, sizeof(__le64));
+ ret = __get_parent_info(rbd_dev, request, reply, pii);
if (ret > 0)
- ret = __get_parent_info_legacy(rbd_dev, req_page, reply, pii);
+ ret = __get_parent_info_legacy(rbd_dev, request, reply, pii);
ceph_databuf_release(reply);
out_free:
- __free_page(req_page);
+ ceph_databuf_release(request);
out:
return ret;
}
@@ -568,7 +568,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
struct ceph_object_locator *oloc,
const char *class, const char *method,
unsigned int flags,
- struct page *req_page, size_t req_len,
+ struct ceph_databuf *request,
struct ceph_databuf *response);
/* watch/notify */
@@ -34,7 +34,7 @@ int ceph_cls_lock(struct ceph_osd_client *osdc,
int tag_len = strlen(tag);
int desc_len = strlen(desc);
void *p, *end;
- struct page *lock_op_page;
+ struct ceph_databuf *lock_op_req;
struct timespec64 mtime;
int ret;
@@ -49,11 +49,11 @@ int ceph_cls_lock(struct ceph_osd_client *osdc,
if (lock_op_buf_size > PAGE_SIZE)
return -E2BIG;
- lock_op_page = alloc_page(GFP_NOIO);
- if (!lock_op_page)
+ lock_op_req = ceph_databuf_req_alloc(0, lock_op_buf_size, GFP_NOIO);
+ if (!lock_op_req)
return -ENOMEM;
- p = page_address(lock_op_page);
+ p = kmap_ceph_databuf_page(lock_op_req, 0);
end = p + lock_op_buf_size;
/* encode cls_lock_lock_op struct */
@@ -69,15 +69,16 @@ int ceph_cls_lock(struct ceph_osd_client *osdc,
ceph_encode_timespec64(p, &mtime);
p += sizeof(struct ceph_timespec);
ceph_encode_8(&p, flags);
+ kunmap_local(p);
+ ceph_databuf_added_data(lock_op_req, lock_op_buf_size);
dout("%s lock_name %s type %d cookie %s tag %s desc %s flags 0x%x\n",
__func__, lock_name, type, cookie, tag, desc, flags);
ret = ceph_osdc_call(osdc, oid, oloc, "lock", "lock",
- CEPH_OSD_FLAG_WRITE, lock_op_page,
- lock_op_buf_size, NULL);
+ CEPH_OSD_FLAG_WRITE, lock_op_req, NULL);
dout("%s: status %d\n", __func__, ret);
- __free_page(lock_op_page);
+ ceph_databuf_release(lock_op_req);
return ret;
}
EXPORT_SYMBOL(ceph_cls_lock);
@@ -99,7 +100,7 @@ int ceph_cls_unlock(struct ceph_osd_client *osdc,
int name_len = strlen(lock_name);
int cookie_len = strlen(cookie);
void *p, *end;
- struct page *unlock_op_page;
+ struct ceph_databuf *unlock_op_req;
int ret;
unlock_op_buf_size = name_len + sizeof(__le32) +
@@ -108,11 +109,11 @@ int ceph_cls_unlock(struct ceph_osd_client *osdc,
if (unlock_op_buf_size > PAGE_SIZE)
return -E2BIG;
- unlock_op_page = alloc_page(GFP_NOIO);
- if (!unlock_op_page)
+ unlock_op_req = ceph_databuf_req_alloc(0, unlock_op_buf_size, GFP_NOIO);
+ if (!unlock_op_req)
return -ENOMEM;
- p = page_address(unlock_op_page);
+ p = kmap_ceph_databuf_page(unlock_op_req, 0);
end = p + unlock_op_buf_size;
/* encode cls_lock_unlock_op struct */
@@ -120,14 +121,15 @@ int ceph_cls_unlock(struct ceph_osd_client *osdc,
unlock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
ceph_encode_string(&p, end, lock_name, name_len);
ceph_encode_string(&p, end, cookie, cookie_len);
+ kunmap_local(p);
+ ceph_databuf_added_data(unlock_op_req, unlock_op_buf_size);
dout("%s lock_name %s cookie %s\n", __func__, lock_name, cookie);
ret = ceph_osdc_call(osdc, oid, oloc, "lock", "unlock",
- CEPH_OSD_FLAG_WRITE, unlock_op_page,
- unlock_op_buf_size, NULL);
+ CEPH_OSD_FLAG_WRITE, unlock_op_req, NULL);
dout("%s: status %d\n", __func__, ret);
- __free_page(unlock_op_page);
+ ceph_databuf_release(unlock_op_req);
return ret;
}
EXPORT_SYMBOL(ceph_cls_unlock);
@@ -150,7 +152,7 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
int break_op_buf_size;
int name_len = strlen(lock_name);
int cookie_len = strlen(cookie);
- struct page *break_op_page;
+ struct ceph_databuf *break_op_req;
void *p, *end;
int ret;
@@ -161,11 +163,11 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
if (break_op_buf_size > PAGE_SIZE)
return -E2BIG;
- break_op_page = alloc_page(GFP_NOIO);
- if (!break_op_page)
+ break_op_req = ceph_databuf_req_alloc(0, break_op_buf_size, GFP_NOIO);
+ if (!break_op_req)
return -ENOMEM;
- p = page_address(break_op_page);
+ p = kmap_ceph_databuf_page(break_op_req, 0);
end = p + break_op_buf_size;
/* encode cls_lock_break_op struct */
@@ -174,15 +176,16 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
ceph_encode_string(&p, end, lock_name, name_len);
ceph_encode_copy(&p, locker, sizeof(*locker));
ceph_encode_string(&p, end, cookie, cookie_len);
+ kunmap_local(p);
+ ceph_databuf_added_data(break_op_req, break_op_buf_size);
dout("%s lock_name %s cookie %s locker %s%llu\n", __func__, lock_name,
cookie, ENTITY_NAME(*locker));
ret = ceph_osdc_call(osdc, oid, oloc, "lock", "break_lock",
- CEPH_OSD_FLAG_WRITE, break_op_page,
- break_op_buf_size, NULL);
+ CEPH_OSD_FLAG_WRITE, break_op_req, NULL);
dout("%s: status %d\n", __func__, ret);
- __free_page(break_op_page);
+ ceph_databuf_release(break_op_req);
return ret;
}
EXPORT_SYMBOL(ceph_cls_break_lock);
@@ -199,7 +202,7 @@ int ceph_cls_set_cookie(struct ceph_osd_client *osdc,
int tag_len = strlen(tag);
int new_cookie_len = strlen(new_cookie);
void *p, *end;
- struct page *cookie_op_page;
+ struct ceph_databuf *cookie_op_req;
int ret;
cookie_op_buf_size = name_len + sizeof(__le32) +
@@ -210,11 +213,11 @@ int ceph_cls_set_cookie(struct ceph_osd_client *osdc,
if (cookie_op_buf_size > PAGE_SIZE)
return -E2BIG;
- cookie_op_page = alloc_page(GFP_NOIO);
- if (!cookie_op_page)
+ cookie_op_req = ceph_databuf_req_alloc(0, cookie_op_buf_size, GFP_NOIO);
+ if (!cookie_op_req)
return -ENOMEM;
- p = page_address(cookie_op_page);
+ p = kmap_ceph_databuf_page(cookie_op_req, 0);
end = p + cookie_op_buf_size;
/* encode cls_lock_set_cookie_op struct */
@@ -225,15 +228,16 @@ int ceph_cls_set_cookie(struct ceph_osd_client *osdc,
ceph_encode_string(&p, end, old_cookie, old_cookie_len);
ceph_encode_string(&p, end, tag, tag_len);
ceph_encode_string(&p, end, new_cookie, new_cookie_len);
+ kunmap_local(p);
+ ceph_databuf_added_data(cookie_op_req, cookie_op_buf_size);
dout("%s lock_name %s type %d old_cookie %s tag %s new_cookie %s\n",
__func__, lock_name, type, old_cookie, tag, new_cookie);
ret = ceph_osdc_call(osdc, oid, oloc, "lock", "set_cookie",
- CEPH_OSD_FLAG_WRITE, cookie_op_page,
- cookie_op_buf_size, NULL);
+ CEPH_OSD_FLAG_WRITE, cookie_op_req, NULL);
dout("%s: status %d\n", __func__, ret);
- __free_page(cookie_op_page);
+ ceph_databuf_release(cookie_op_req);
return ret;
}
EXPORT_SYMBOL(ceph_cls_set_cookie);
@@ -340,7 +344,7 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc,
struct ceph_databuf *reply;
int get_info_op_buf_size;
int name_len = strlen(lock_name);
- struct page *get_info_op_page;
+ struct ceph_databuf *get_info_op_req;
void *p, *end;
int ret;
@@ -349,28 +353,30 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc,
if (get_info_op_buf_size > PAGE_SIZE)
return -E2BIG;
- get_info_op_page = alloc_page(GFP_NOIO);
- if (!get_info_op_page)
+ get_info_op_req = ceph_databuf_req_alloc(0, get_info_op_buf_size,
+ GFP_NOIO);
+ if (!get_info_op_req)
return -ENOMEM;
reply = ceph_databuf_reply_alloc(1, PAGE_SIZE, GFP_NOIO);
if (!reply) {
- __free_page(get_info_op_page);
+ ceph_databuf_release(get_info_op_req);
return -ENOMEM;
}
- p = page_address(get_info_op_page);
+ p = kmap_ceph_databuf_page(get_info_op_req, 0);
end = p + get_info_op_buf_size;
/* encode cls_lock_get_info_op struct */
ceph_start_encoding(&p, 1, 1,
get_info_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
ceph_encode_string(&p, end, lock_name, name_len);
+ kunmap_local(p);
+ ceph_databuf_added_data(get_info_op_req, get_info_op_buf_size);
dout("%s lock_name %s\n", __func__, lock_name);
ret = ceph_osdc_call(osdc, oid, oloc, "lock", "get_info",
- CEPH_OSD_FLAG_READ, get_info_op_page,
- get_info_op_buf_size, reply);
+ CEPH_OSD_FLAG_READ, get_info_op_req, reply);
dout("%s: status %d\n", __func__, ret);
if (ret >= 0) {
@@ -381,8 +387,8 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc,
kunmap_local(p);
}
- __free_page(get_info_op_page);
ceph_databuf_release(reply);
+ ceph_databuf_release(get_info_op_req);
return ret;
}
EXPORT_SYMBOL(ceph_cls_lock_info);
@@ -264,7 +264,7 @@ void osd_req_op_cls_request_databuf(struct ceph_osd_request *osd_req,
BUG_ON(!ceph_databuf_len(dbuf));
osd_data = osd_req_op_data(osd_req, which, cls, request_data);
- ceph_osd_databuf_init(osd_data, dbuf);
+ ceph_osd_databuf_init(osd_data, ceph_databuf_get(dbuf));
osd_req->r_ops[which].cls.indata_len += ceph_databuf_len(dbuf);
osd_req->r_ops[which].indata_len += ceph_databuf_len(dbuf);
}
@@ -283,19 +283,6 @@ void osd_req_op_cls_request_data_pagelist(
}
EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
-static void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
- unsigned int which, struct page **pages, u64 length,
- u32 offset, bool pages_from_pool, bool own_pages)
-{
- struct ceph_osd_data *osd_data;
-
- osd_data = osd_req_op_data(osd_req, which, cls, request_data);
- ceph_osd_data_pages_init(osd_data, pages, length, offset,
- pages_from_pool, own_pages);
- osd_req->r_ops[which].cls.indata_len += length;
- osd_req->r_ops[which].indata_len += length;
-}
-
void osd_req_op_cls_response_databuf(struct ceph_osd_request *osd_req,
unsigned int which,
struct ceph_databuf *dbuf)
@@ -5089,15 +5076,12 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
struct ceph_object_locator *oloc,
const char *class, const char *method,
unsigned int flags,
- struct page *req_page, size_t req_len,
+ struct ceph_databuf *request,
struct ceph_databuf *response)
{
struct ceph_osd_request *req;
int ret;
- if (req_len > PAGE_SIZE)
- return -E2BIG;
-
req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
if (!req)
return -ENOMEM;
@@ -5110,9 +5094,8 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
if (ret)
goto out_put_req;
- if (req_page)
- osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
- 0, false, false);
+ if (request)
+ osd_req_op_cls_request_databuf(req, 0, request);
if (response)
osd_req_op_cls_response_databuf(req, 0, response);
Convert the request data (req_page) of ceph_osdc_call() to ceph_databuf. Signed-off-by: David Howells <dhowells@redhat.com> cc: Viacheslav Dubeyko <slava@dubeyko.com> cc: Alex Markuze <amarkuze@redhat.com> cc: Ilya Dryomov <idryomov@gmail.com> cc: ceph-devel@vger.kernel.org cc: linux-fsdevel@vger.kernel.org --- drivers/block/rbd.c | 53 +++++++++++----------- include/linux/ceph/osd_client.h | 2 +- net/ceph/cls_lock_client.c | 78 ++++++++++++++++++--------------- net/ceph/osd_client.c | 25 ++--------- 4 files changed, 74 insertions(+), 84 deletions(-)