@@ -269,9 +269,15 @@ static inline int exp_connect_flr(struct obd_export *exp)
return !!(exp_connect_flags2(exp) & OBD_CONNECT2_FLR);
}
+static inline int exp_connect_lock_convert(struct obd_export *exp)
+{
+ return !!(exp_connect_flags2(exp) & OBD_CONNECT2_LOCK_CONVERT);
+}
+
struct obd_export *class_conn2export(struct lustre_handle *conn);
#define KKUC_CT_DATA_MAGIC 0x092013cea
+
struct kkuc_ct_data {
u32 kcd_magic;
u32 kcd_archive;
@@ -81,7 +81,7 @@ int ldlm_inodebits_drop(struct ldlm_lock *lock, u64 to_drop)
/* Just return if there are no conflicting bits */
if ((lock->l_policy_data.l_inodebits.bits & to_drop) == 0) {
- LDLM_WARN(lock, "try to drop unset bits %#llx/%#llx\n",
+ LDLM_WARN(lock, "try to drop unset bits %#llx/%#llx",
lock->l_policy_data.l_inodebits.bits, to_drop);
/* nothing to do */
return 0;
@@ -111,7 +111,7 @@ int ldlm_cli_dropbits(struct ldlm_lock *lock, u64 drop_bits)
ldlm_lock2handle(lock, &lockh);
lock_res_and_lock(lock);
- /* check if all bits are cancelled */
+ /* check if all bits are blocked */
if (!(lock->l_policy_data.l_inodebits.bits & ~drop_bits)) {
unlock_res_and_lock(lock);
/* return error to continue with cancel */
@@ -119,6 +119,13 @@ int ldlm_cli_dropbits(struct ldlm_lock *lock, u64 drop_bits)
goto exit;
}
+ /* check if no common bits, consider this as successful convert */
+ if (!(lock->l_policy_data.l_inodebits.bits & drop_bits)) {
+ unlock_res_and_lock(lock);
+ rc = 0;
+ goto exit;
+ }
+
/* check if there is race with cancel */
if (ldlm_is_canceling(lock) || ldlm_is_cancel(lock)) {
unlock_res_and_lock(lock);
@@ -167,9 +174,11 @@ int ldlm_cli_dropbits(struct ldlm_lock *lock, u64 drop_bits)
rc = ldlm_cli_convert(lock, &flags);
if (rc) {
lock_res_and_lock(lock);
- ldlm_clear_converting(lock);
- ldlm_set_cbpending(lock);
- ldlm_set_bl_ast(lock);
+ if (ldlm_is_converting(lock)) {
+ ldlm_clear_converting(lock);
+ ldlm_set_cbpending(lock);
+ ldlm_set_bl_ast(lock);
+ }
unlock_res_and_lock(lock);
goto exit;
}
@@ -854,7 +854,7 @@ static int lock_convert_interpret(const struct lu_env *env,
aa->lock_handle.cookie, reply->lock_handle.cookie,
req->rq_export->exp_client_uuid.uuid,
libcfs_id2str(req->rq_peer));
- rc = -ESTALE;
+ rc = ELDLM_NO_LOCK_DATA;
goto out;
}
@@ -905,15 +905,30 @@ static int lock_convert_interpret(const struct lu_env *env,
unlock_res_and_lock(lock);
out:
if (rc) {
+ int flag;
+
lock_res_and_lock(lock);
if (ldlm_is_converting(lock)) {
ldlm_clear_converting(lock);
ldlm_set_cbpending(lock);
ldlm_set_bl_ast(lock);
+ lock->l_policy_data.l_inodebits.cancel_bits = 0;
}
unlock_res_and_lock(lock);
- }
+ /* fallback to normal lock cancel. If rc means there is no
+ * valid lock on server, do only local cancel
+ */
+ if (rc == ELDLM_NO_LOCK_DATA)
+ flag = LCF_LOCAL;
+ else
+ flag = LCF_ASYNC;
+
+ rc = ldlm_cli_cancel(&aa->lock_handle, flag);
+ if (rc < 0)
+ LDLM_DEBUG(lock, "failed to cancel lock: rc = %d\n",
+ rc);
+ }
LDLM_LOCK_PUT(lock);
return rc;
}
@@ -942,6 +957,15 @@ int ldlm_cli_convert(struct ldlm_lock *lock, u32 *flags)
return -EINVAL;
}
+ /* this is better to check earlier and it is done so already,
+ * but this check is kept too as final one to issue an error
+ * if any new code will miss such check.
+ */
+ if (!exp_connect_lock_convert(exp)) {
+ LDLM_ERROR(lock, "server doesn't support lock convert\n");
+ return -EPROTO;
+ }
+
if (lock->l_resource->lr_type != LDLM_IBITS) {
LDLM_ERROR(lock, "convert works with IBITS locks only.");
return -EINVAL;
@@ -970,13 +994,12 @@ int ldlm_cli_convert(struct ldlm_lock *lock, u32 *flags)
ptlrpc_request_set_replen(req);
- /* That could be useful to use cancel portals for convert as well
- * as high-priority handling. This will require changes in
- * ldlm_cancel_handler to understand convert RPC as well.
- *
- * req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
- * req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
+ /*
+ * Use cancel portals for convert as well as high-priority handling.
*/
+ req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
+ req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
+
ptlrpc_at_set_req_timeout(req);
if (exp->exp_obd->obd_svc_stats)
@@ -209,7 +209,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
OBD_CONNECT_GRANT_PARAM |
OBD_CONNECT_SHORTIO | OBD_CONNECT_FLAGS2;
- data->ocd_connect_flags2 = OBD_CONNECT2_FLR;
+ data->ocd_connect_flags2 = OBD_CONNECT2_FLR | OBD_CONNECT2_LOCK_CONVERT;
if (sbi->ll_flags & LL_SBI_LRU_RESIZE)
data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
@@ -371,11 +371,16 @@ void ll_lock_cancel_bits(struct ldlm_lock *lock, u64 to_cancel)
*/
int ll_md_need_convert(struct ldlm_lock *lock)
{
+ struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
struct inode *inode;
u64 wanted = lock->l_policy_data.l_inodebits.cancel_bits;
u64 bits = lock->l_policy_data.l_inodebits.bits & ~wanted;
enum ldlm_mode mode = LCK_MINMODE;
+ if (!lock->l_conn_export ||
+ !exp_connect_lock_convert(lock->l_conn_export))
+ return 0;
+
if (!wanted || !bits || ldlm_is_cancel(lock))
return 0;
@@ -410,7 +415,7 @@ int ll_md_need_convert(struct ldlm_lock *lock)
lock_res_and_lock(lock);
if (ktime_after(ktime_get(),
ktime_add(lock->l_last_used,
- ktime_set(10, 0)))) {
+ ktime_set(ns->ns_dirty_age_limit, 0)))) {
unlock_res_and_lock(lock);
return 0;
}