@@ -446,19 +446,35 @@ done:
}
/*
- * Write commit callback, called if we requested both an ACK and
- * ONDISK commit reply from the OSD.
+ * Write commit request unsafe callback, called to tell us when a
+ * request is unsafe (that is, in flight--has been handed to the
+ * messenger to send to its target osd). It is called again when
+ * we've received a response message indicating the request is
+ * "safe" (its CEPH_OSD_FLAG_ONDISK flag is set), or when a request
+ * is completed early (and unsuccessfully) due to a timeout or
+ * interrupt.
+ *
+ * This is used if we requested both an ACK and ONDISK commit reply
+ * from the OSD.
*/
-static void sync_write_commit(struct ceph_osd_request *req,
- struct ceph_msg *msg)
+static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool
unsafe)
{
struct ceph_inode_info *ci = ceph_inode(req->r_inode);
- dout("sync_write_commit %p tid %llu\n", req, req->r_tid);
- spin_lock(&ci->i_unsafe_lock);
- list_del_init(&req->r_unsafe_item);
- spin_unlock(&ci->i_unsafe_lock);
- ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
+ dout("%s %p tid %llu %ssafe\n", __func__, req, req->r_tid,
+ unsafe ? "un" : "");
+ if (unsafe) {
+ ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR);
+ spin_lock(&ci->i_unsafe_lock);
+ list_add_tail(&req->r_unsafe_item,
+ &ci->i_unsafe_writes);
+ spin_unlock(&ci->i_unsafe_lock);
+ } else {
+ spin_lock(&ci->i_unsafe_lock);
+ list_del_init(&req->r_unsafe_item);
+ spin_unlock(&ci->i_unsafe_lock);
+ ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
+ }
}
/*
@@ -570,7 +586,8 @@ more:
if ((file->f_flags & O_SYNC) == 0) {
/* get a second commit callback */
- req->r_safe_callback = sync_write_commit;
+ req->r_unsafe_callback = ceph_sync_write_unsafe;
+ req->r_inode = inode;
own_pages = true;
}
}
@@ -581,21 +598,8 @@ more:
ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
- if (!ret) {
- if (req->r_safe_callback) {
- /*
- * Add to inode unsafe list only after we
- * start_request so that a tid has been assigned.
- */
- spin_lock(&ci->i_unsafe_lock);
- list_add_tail(&req->r_unsafe_item,
- &ci->i_unsafe_writes);
- spin_unlock(&ci->i_unsafe_lock);
- ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR);
- }
-
+ if (!ret)
ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
- }
if (file->f_flags & O_DIRECT)
ceph_put_page_vector(pages, num_pages, false);
b/include/linux/ceph/osd_client.h
@@ -29,6 +29,7 @@ struct ceph_authorizer;
*/
typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *,
struct ceph_msg *);
+typedef void (*ceph_osdc_unsafe_callback_t)(struct ceph_osd_request *,
bool);
/* a given osd we're communicating with */
struct ceph_osd {
@@ -149,7 +150,8 @@ struct ceph_osd_request {
struct kref r_kref;
bool r_mempool;
struct completion r_completion, r_safe_completion;
- ceph_osdc_callback_t r_callback, r_safe_callback;
+ ceph_osdc_callback_t r_callback;
+ ceph_osdc_unsafe_callback_t r_unsafe_callback;
struct ceph_eversion r_reassert_version;
struct list_head r_unsafe_item;
@@ -1314,8 +1314,14 @@ static void __send_request(struct ceph_osd_client
*osdc,
list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
ceph_msg_get(req->r_request); /* send consumes a ref */
- ceph_con_send(&req->r_osd->o_con, req->r_request);
+
+ /* Mark the request unsafe if this is the first timet's being sent. */
+
+ if (!req->r_sent && req->r_unsafe_callback)
+ req->r_unsafe_callback(req, true);
req->r_sent = req->r_osd->o_incarnation;
+
+ ceph_con_send(&req->r_osd->o_con, req->r_request);
}