@@ -157,6 +157,7 @@ struct osc_io {
/* write osc_lock for this IO, used by osc_extent_find(). */
struct osc_lock *oi_write_osclock;
+ struct osc_lock *oi_read_osclock;
struct obdo oi_oa;
struct osc_async_cbargs {
bool opc_rpc_sent;
@@ -724,6 +725,8 @@ int osc_lock_enqueue_wait(const struct lu_env *env, struct osc_object *obj,
struct osc_lock *oscl);
void osc_lock_set_writer(const struct lu_env *env, const struct cl_io *io,
struct cl_object *obj, struct osc_lock *oscl);
+void osc_lock_set_reader(const struct lu_env *env, const struct cl_io *io,
+ struct cl_object *obj, struct osc_lock *oscl);
int osc_lock_print(const struct lu_env *env, void *cookie,
lu_printer_t p, const struct cl_lock_slice *slice);
void osc_lock_cancel(const struct lu_env *env,
@@ -966,6 +966,9 @@ int mdc_lock_init(const struct lu_env *env, struct cl_object *obj,
if (io->ci_type == CIT_WRITE || cl_io_is_mkwrite(io))
osc_lock_set_writer(env, io, obj, ols);
+ else if (io->ci_type == CIT_READ ||
+ (io->ci_type == CIT_FAULT && !io->u.ci_fault.ft_mkwrite))
+ osc_lock_set_reader(env, io, obj, ols);
LDLM_DEBUG_NOLOCK("lock %p, mdc lock %p, flags %llx\n",
lock, ols, ols->ols_flags);
@@ -2655,6 +2655,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct cl_io *io,
struct osc_object *obj, struct list_head *list,
int brw_flags)
{
+ struct osc_io *oio = osc_env_io(env);
struct client_obd *cli = osc_cli(obj);
struct osc_extent *ext;
struct osc_async_page *oap;
@@ -2663,6 +2664,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct cl_io *io,
bool can_merge = true;
pgoff_t start = CL_PAGE_EOF;
pgoff_t end = 0;
+ struct osc_lock *oscl;
list_for_each_entry(oap, list, oap_pending_item) {
struct osc_page *opg = oap2osc_page(oap);
@@ -2703,6 +2705,11 @@ int osc_queue_sync_pages(const struct lu_env *env, struct cl_io *io,
ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY);
ext->oe_dio = !!(brw_flags & OBD_BRW_NOCACHE);
+ oscl = oio->oi_write_osclock ? : oio->oi_read_osclock;
+ if (oscl && oscl->ols_dlmlock != NULL) {
+ ext->oe_dlmlock = LDLM_LOCK_GET(oscl->ols_dlmlock);
+ lu_ref_add(&ext->oe_dlmlock->l_reference, "osc_extent", ext);
+ }
if (ext->oe_dio && !ext->oe_rw) { /* direct io write */
int grants;
int ppc;
@@ -461,6 +461,7 @@ void osc_io_rw_iter_fini(const struct lu_env *env,
oio->oi_lru_reserved = 0;
}
oio->oi_write_osclock = NULL;
+ oio->oi_read_osclock = NULL;
osc_io_iter_fini(env, ios);
}
@@ -1178,6 +1178,22 @@ void osc_lock_set_writer(const struct lu_env *env, const struct cl_io *io,
}
EXPORT_SYMBOL(osc_lock_set_writer);
+void osc_lock_set_reader(const struct lu_env *env, const struct cl_io *io,
+ struct cl_object *obj, struct osc_lock *oscl)
+{
+ struct osc_io *oio = osc_env_io(env);
+
+ if (!cl_object_same(io->ci_obj, obj))
+ return;
+
+ if (oscl->ols_glimpse || osc_lock_is_lockless(oscl))
+ return;
+
+ if (oio->oi_read_osclock == NULL)
+ oio->oi_read_osclock = oscl;
+}
+EXPORT_SYMBOL(osc_lock_set_reader);
+
int osc_lock_init(const struct lu_env *env,
struct cl_object *obj, struct cl_lock *lock,
const struct cl_io *io)
@@ -1224,6 +1240,9 @@ int osc_lock_init(const struct lu_env *env,
if (io->ci_type == CIT_WRITE || cl_io_is_mkwrite(io))
osc_lock_set_writer(env, io, obj, oscl);
+ else if (io->ci_type == CIT_READ ||
+ (io->ci_type == CIT_FAULT && !io->u.ci_fault.ft_mkwrite))
+ osc_lock_set_reader(env, io, obj, oscl);
LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %llx",