@@ -1490,9 +1490,10 @@ struct cl_read_ahead {
* function should be called to release it.
*/
void (*cra_release)(const struct lu_env *env,
- void *cbdata);
+ struct cl_read_ahead *ra);
/* Callback data for cra_release routine */
- void *cra_cbdata;
+ void *cra_dlmlock;
+ void *cra_oio;
/* whether lock is in contention */
bool cra_contention;
};
@@ -1501,7 +1502,7 @@ static inline void cl_read_ahead_release(const struct lu_env *env,
struct cl_read_ahead *ra)
{
if (ra->cra_release)
- ra->cra_release(env, ra->cra_cbdata);
+ ra->cra_release(env, ra);
memset(ra, 0, sizeof(*ra));
}
@@ -1624,6 +1625,13 @@ struct cl_io_operations {
const struct cl_io_slice *slice,
pgoff_t start, struct cl_read_ahead *ra);
/**
+ *
+ * Reserve LRU slots before IO.
+ */
+ int (*cio_lru_reserve)(const struct lu_env *env,
+ const struct cl_io_slice *slice,
+ loff_t pos, size_t bytes);
+ /**
* Optional debugging helper. Print given io slice.
*/
int (*cio_print)(const struct lu_env *env, void *cookie,
@@ -2445,6 +2453,8 @@ int cl_io_commit_async(const struct lu_env *env, struct cl_io *io,
struct cl_page_list *queue, int from, int to,
cl_commit_cbt cb);
void cl_io_extent_release(const struct lu_env *env, struct cl_io *io);
+int cl_io_lru_reserve(const struct lu_env *env, struct cl_io *io,
+ loff_t pos, size_t bytes);
int cl_io_read_ahead(const struct lu_env *env, struct cl_io *io,
pgoff_t start, struct cl_read_ahead *ra);
@@ -142,7 +142,9 @@ struct osc_io {
/* true if this io is counted as active IO */
oi_is_active:1,
/** true if this io has CAP_SYS_RESOURCE */
- oi_cap_sys_resource:1;
+ oi_cap_sys_resource:1,
+ /** true if this io issued by readahead */
+ oi_is_readahead:1;
/* how many LRU pages are reserved for this IO */
unsigned long oi_lru_reserved;
@@ -694,8 +696,6 @@ void osc_io_extent_release(const struct lu_env *env,
int osc_io_iter_init(const struct lu_env *env, const struct cl_io_slice *ios);
void osc_io_iter_fini(const struct lu_env *env,
const struct cl_io_slice *ios);
-int osc_io_rw_iter_init(const struct lu_env *env,
- const struct cl_io_slice *ios);
void osc_io_rw_iter_fini(const struct lu_env *env,
const struct cl_io_slice *ios);
int osc_io_fault_start(const struct lu_env *env, const struct cl_io_slice *ios);
@@ -710,11 +710,13 @@ int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj,
struct cl_fsync_io *fio);
void osc_io_fsync_end(const struct lu_env *env,
const struct cl_io_slice *slice);
-void osc_read_ahead_release(const struct lu_env *env, void *cbdata);
+void osc_read_ahead_release(const struct lu_env *env, struct cl_read_ahead *ra);
int osc_io_lseek_start(const struct lu_env *env,
const struct cl_io_slice *slice);
void osc_io_lseek_end(const struct lu_env *env,
const struct cl_io_slice *slice);
+int osc_io_lru_reserve(const struct lu_env *env, const struct cl_io_slice *ios,
+ loff_t pos, size_t count);
/* osc_lock.c */
void osc_lock_to_lockless(const struct lu_env *env, struct osc_lock *ols,
@@ -1564,8 +1564,10 @@ static void ll_heat_add(struct inode *inode, enum cl_io_type iot,
struct file *file, enum cl_io_type iot,
loff_t *ppos, size_t count)
{
- struct ll_inode_info *lli = ll_i2info(file_inode(file));
+ struct inode *inode = file_inode(file);
+ struct ll_inode_info *lli = ll_i2info(inode);
struct ll_file_data *fd = file->private_data;
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
struct vvp_io *vio = vvp_env_io(env);
struct range_lock range;
struct cl_io *io;
@@ -1575,10 +1577,18 @@ static void ll_heat_add(struct inode *inode, enum cl_io_type iot,
unsigned int dio_lock = 0;
bool is_aio = false;
struct cl_dio_aio *ci_aio = NULL;
+ size_t per_bytes;
+ bool partial_io = false;
+ size_t max_io_pages, max_cached_pages;
CDEBUG(D_VFSTRACE, "file: %pD, type: %d ppos: %llu, count: %zu\n",
file, iot, *ppos, count);
+ max_io_pages = PTLRPC_MAX_BRW_PAGES * OBD_MAX_RIF_DEFAULT;
+ max_cached_pages = sbi->ll_cache->ccc_lru_max;
+ if (max_io_pages > (max_cached_pages >> 2))
+ max_io_pages = max_cached_pages >> 2;
+
io = vvp_env_thread_io(env);
if (file->f_flags & O_DIRECT) {
if (!is_sync_kiocb(args->u.normal.via_iocb))
@@ -1591,19 +1601,29 @@ static void ll_heat_add(struct inode *inode, enum cl_io_type iot,
}
restart:
+ /**
+ * IO block size need be aware of cached page limit, otherwise
+ * if we have small max_cached_mb but large block IO issued, io
+ * could not be finished and blocked whole client.
+ */
+ if (file->f_flags & O_DIRECT)
+ per_bytes = count;
+ else
+ per_bytes = min(max_io_pages << PAGE_SHIFT, count);
+ partial_io = per_bytes < count;
io = vvp_env_thread_io(env);
ll_io_init(io, file, iot == CIT_WRITE, args);
io->ci_aio = ci_aio;
io->ci_dio_lock = dio_lock;
io->ci_ndelay_tried = retried;
- if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
+ if (cl_io_rw_init(env, io, iot, *ppos, per_bytes) == 0) {
bool range_locked = false;
if (file->f_flags & O_APPEND)
range_lock_init(&range, 0, LUSTRE_EOF);
else
- range_lock_init(&range, *ppos, *ppos + count - 1);
+ range_lock_init(&range, *ppos, *ppos + per_bytes - 1);
vio->vui_fd = file->private_data;
vio->vui_iter = args->u.normal.via_iter;
@@ -1656,6 +1676,16 @@ static void ll_heat_add(struct inode *inode, enum cl_io_type iot,
/* prepare IO restart */
if (count > 0)
args->u.normal.via_iter = vio->vui_iter;
+
+ if (partial_io) {
+ /**
+ * Reexpand iov count because it was zero
+ * after IO finish.
+ */
+ iov_iter_reexpand(vio->vui_iter, count);
+ if (per_bytes == io->ci_nob)
+ io->ci_need_restart = 1;
+ }
}
out:
cl_io_fini(env, io);
@@ -86,7 +86,14 @@ static unsigned long ll_ra_count_get(struct ll_sb_info *sbi,
struct ll_ra_info *ra = &sbi->ll_ra_info;
long ret;
- /* If read-ahead pages left are less than 1M, do not do read-ahead,
+ /**
+ * Don't try readahead agreesively if we are limited
+ * LRU pages, otherwise, it could cause deadlock.
+ */
+ pages = min(sbi->ll_cache->ccc_lru_max >> 2, pages);
+
+ /*
+ * If read-ahead pages left are less than 1M, do not do read-ahead,
* otherwise it will form small read RPC(< 1M), which hurt server
* performance a lot.
*/
@@ -701,11 +708,24 @@ static int ll_readahead(const struct lu_env *env, struct cl_io *io,
struct inode *inode;
struct ra_io_arg *ria = <i->lti_ria;
struct cl_object *clob;
+ struct ll_sb_info *sbi;
+ struct ll_ra_info *ra;
int ret = 0;
u64 kms;
clob = io->ci_obj;
inode = vvp_object_inode(clob);
+ sbi = ll_i2sbi(inode);
+ ra = &sbi->ll_ra_info;
+
+ /**
+ * In case we have a limited max_cached_mb, readahead
+ * should be stopped if it have run out of all LRU slots.
+ */
+ if (atomic_read(&ra->ra_cur_pages) >= sbi->ll_cache->ccc_lru_max) {
+ ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
+ return 0;
+ }
memset(ria, 0, sizeof(*ria));
ret = ll_readahead_file_kms(env, io, &kms);
@@ -1706,6 +1726,15 @@ static int kickoff_async_readahead(struct file *file, unsigned long pages)
pgoff_t start_idx = ras_align(ras, ras->ras_next_readahead_idx);
pgoff_t end_idx = start_idx + pages - 1;
+ /**
+ * In case we have a limited max_cached_mb, readahead
+ * should be stopped if it have run out of all LRU slots.
+ */
+ if (atomic_read(&ra->ra_cur_pages) >= sbi->ll_cache->ccc_lru_max) {
+ ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
+ return 0;
+ }
+
throttle = min(ra->ra_async_pages_per_file_threshold,
ra->ra_max_pages_per_file);
/*
@@ -798,6 +798,12 @@ static int vvp_io_read_start(const struct lu_env *env,
if (!can_populate_pages(env, io, inode))
return 0;
+ if (!(file->f_flags & O_DIRECT)) {
+ result = cl_io_lru_reserve(env, io, pos, cnt);
+ if (result)
+ return result;
+ }
+
/* Unless this is reading a sparse file, otherwise the lock has already
* been acquired so vvp_prep_size() is an empty op.
*/
@@ -1175,6 +1181,12 @@ static int vvp_io_write_start(const struct lu_env *env,
if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_IMUTEX_NOSEC) && lock_inode)
return -EINVAL;
+ if (!(file->f_flags & O_DIRECT)) {
+ result = cl_io_lru_reserve(env, io, pos, cnt);
+ if (result)
+ return result;
+ }
+
if (!vio->vui_iter) {
/* from a temp io in ll_cl_init(). */
result = 0;
@@ -1179,6 +1179,58 @@ static int lov_io_read_ahead(const struct lu_env *env,
return 0;
}
+int lov_io_lru_reserve(const struct lu_env *env,
+ const struct cl_io_slice *ios, loff_t pos, size_t bytes)
+{
+ struct lov_io *lio = cl2lov_io(env, ios);
+ struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
+ struct lov_io_sub *sub;
+ struct lu_extent ext;
+ int index;
+ int rc = 0;
+
+ ext.e_start = pos;
+ ext.e_end = pos + bytes;
+ lov_foreach_io_layout(index, lio, &ext) {
+ struct lov_layout_entry *le = lov_entry(lio->lis_object, index);
+ struct lov_layout_raid0 *r0 = &le->lle_raid0;
+ u64 start;
+ u64 end;
+ int stripe;
+
+ if (!lsm_entry_inited(lsm, index))
+ continue;
+
+ if (!le->lle_valid && !ios->cis_io->ci_designated_mirror) {
+ CERROR(DFID": I/O to invalid component: %d, mirror: %d\n",
+ PFID(lu_object_fid(lov2lu(lio->lis_object))),
+ index, lio->lis_mirror_index);
+ return -EIO;
+ }
+
+ for (stripe = 0; stripe < r0->lo_nr; stripe++) {
+ if (!lov_stripe_intersects(lsm, index, stripe,
+ &ext, &start, &end))
+ continue;
+
+ if (unlikely(!r0->lo_sub[stripe]))
+ return -EIO;
+
+ sub = lov_sub_get(env, lio,
+ lov_comp_index(index, stripe));
+ if (IS_ERR(sub))
+ return PTR_ERR(sub);
+
+ rc = cl_io_lru_reserve(sub->sub_env, &sub->sub_io,
+ start, end - start + 1);
+ if (rc != 0)
+ return rc;
+ }
+ }
+
+ return 0;
+}
+
/**
* lov implementation of cl_operations::cio_submit() method. It takes a list
* of pages in @queue, splits it into per-stripe sub-lists, invokes
@@ -1581,6 +1633,7 @@ static void lov_io_lseek_end(const struct lu_env *env,
}
},
.cio_read_ahead = lov_io_read_ahead,
+ .cio_lru_reserve = lov_io_lru_reserve,
.cio_submit = lov_io_submit,
.cio_commit_async = lov_io_commit_async,
};
@@ -1113,12 +1113,14 @@ static int mdc_io_read_ahead(const struct lu_env *env,
pgoff_t start, struct cl_read_ahead *ra)
{
struct osc_object *osc = cl2osc(ios->cis_obj);
+ struct osc_io *oio = cl2osc_io(env, ios);
struct ldlm_lock *dlmlock;
dlmlock = mdc_dlmlock_at_pgoff(env, osc, start, 0);
if (!dlmlock)
return -ENODATA;
+ oio->oi_is_readahead = 1;
if (dlmlock->l_req_mode != LCK_PR) {
struct lustre_handle lockh;
@@ -1130,7 +1132,8 @@ static int mdc_io_read_ahead(const struct lu_env *env,
ra->cra_rpc_pages = osc_cli(osc)->cl_max_pages_per_rpc;
ra->cra_end_idx = CL_PAGE_EOF;
ra->cra_release = osc_read_ahead_release;
- ra->cra_cbdata = dlmlock;
+ ra->cra_dlmlock = dlmlock;
+ ra->cra_oio = oio;
return 0;
}
@@ -1287,12 +1290,12 @@ static void mdc_io_data_version_end(const struct lu_env *env,
static const struct cl_io_operations mdc_io_ops = {
.op = {
[CIT_READ] = {
- .cio_iter_init = osc_io_rw_iter_init,
+ .cio_iter_init = osc_io_iter_init,
.cio_iter_fini = osc_io_rw_iter_fini,
.cio_start = osc_io_read_start,
},
[CIT_WRITE] = {
- .cio_iter_init = osc_io_rw_iter_init,
+ .cio_iter_init = osc_io_iter_init,
.cio_iter_fini = osc_io_rw_iter_fini,
.cio_start = osc_io_write_start,
.cio_end = osc_io_end,
@@ -1323,6 +1326,7 @@ static void mdc_io_data_version_end(const struct lu_env *env,
},
},
.cio_read_ahead = mdc_io_read_ahead,
+ .cio_lru_reserve = osc_io_lru_reserve,
.cio_submit = osc_io_submit,
.cio_commit_async = osc_io_commit_async,
.cio_extent_release = osc_io_extent_release,
@@ -573,6 +573,34 @@ int cl_io_read_ahead(const struct lu_env *env, struct cl_io *io,
EXPORT_SYMBOL(cl_io_read_ahead);
/**
+ * Called before io start, to reserve enough LRU slots to avoid
+ * deadlock.
+ *
+ * \see cl_io_operations::cio_lru_reserve()
+ */
+int cl_io_lru_reserve(const struct lu_env *env, struct cl_io *io,
+ loff_t pos, size_t bytes)
+{
+ const struct cl_io_slice *scan;
+ int result = 0;
+
+ LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
+ LINVRNT(cl_io_invariant(io));
+
+ list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
+ if (scan->cis_iop->cio_lru_reserve) {
+ result = scan->cis_iop->cio_lru_reserve(env, scan,
+ pos, bytes);
+ if (result)
+ break;
+ }
+ }
+
+ return result;
+}
+EXPORT_SYMBOL(cl_io_lru_reserve);
+
+/**
* Commit a list of contiguous pages into writeback cache.
*
* \returns 0 if all pages committed, or errcode if error occurred.
@@ -59,11 +59,13 @@ static void osc_io_fini(const struct lu_env *env, const struct cl_io_slice *io)
{
}
-void osc_read_ahead_release(const struct lu_env *env, void *cbdata)
+void osc_read_ahead_release(const struct lu_env *env, struct cl_read_ahead *ra)
{
- struct ldlm_lock *dlmlock = cbdata;
+ struct ldlm_lock *dlmlock = ra->cra_dlmlock;
+ struct osc_io *oio = ra->cra_oio;
struct lustre_handle lockh;
+ oio->oi_is_readahead = 0;
ldlm_lock2handle(dlmlock, &lockh);
ldlm_lock_decref(&lockh, LCK_PR);
LDLM_LOCK_PUT(dlmlock);
@@ -75,9 +77,11 @@ static int osc_io_read_ahead(const struct lu_env *env,
pgoff_t start, struct cl_read_ahead *ra)
{
struct osc_object *osc = cl2osc(ios->cis_obj);
+ struct osc_io *oio = cl2osc_io(env, ios);
struct ldlm_lock *dlmlock;
int result = -ENODATA;
+ oio->oi_is_readahead = true;
dlmlock = osc_dlmlock_at_pgoff(env, osc, start, 0);
if (dlmlock) {
LASSERT(dlmlock->l_ast_data == osc);
@@ -93,7 +97,8 @@ static int osc_io_read_ahead(const struct lu_env *env,
ra->cra_end_idx = cl_index(osc2cl(osc),
dlmlock->l_policy_data.l_extent.end);
ra->cra_release = osc_read_ahead_release;
- ra->cra_cbdata = dlmlock;
+ ra->cra_dlmlock = dlmlock;
+ ra->cra_oio = oio;
if (ra->cra_end_idx != CL_PAGE_EOF)
ra->cra_contention = true;
result = 0;
@@ -421,27 +426,6 @@ int osc_io_iter_init(const struct lu_env *env, const struct cl_io_slice *ios)
}
EXPORT_SYMBOL(osc_io_iter_init);
-int osc_io_rw_iter_init(const struct lu_env *env,
- const struct cl_io_slice *ios)
-{
- struct cl_io *io = ios->cis_io;
- struct osc_io *oio = osc_env_io(env);
- struct osc_object *osc = cl2osc(ios->cis_obj);
- unsigned long npages;
-
- if (cl_io_is_append(io))
- return osc_io_iter_init(env, ios);
-
- npages = io->u.ci_rw.crw_count >> PAGE_SHIFT;
- if (io->u.ci_rw.crw_pos & ~PAGE_MASK)
- ++npages;
-
- oio->oi_lru_reserved = osc_lru_reserve(osc_cli(osc), npages);
-
- return osc_io_iter_init(env, ios);
-}
-EXPORT_SYMBOL(osc_io_rw_iter_init);
-
void osc_io_iter_fini(const struct lu_env *env,
const struct cl_io_slice *ios)
{
@@ -1177,16 +1161,40 @@ void osc_io_lseek_end(const struct lu_env *env,
}
EXPORT_SYMBOL(osc_io_lseek_end);
+int osc_io_lru_reserve(const struct lu_env *env,
+ const struct cl_io_slice *ios,
+ loff_t pos, size_t bytes)
+{
+ struct osc_object *osc = cl2osc(ios->cis_obj);
+ struct osc_io *oio = osc_env_io(env);
+ unsigned long npages = 0;
+ size_t page_offset;
+
+ page_offset = pos & ~PAGE_MASK;
+ if (page_offset) {
+ ++npages;
+ if (bytes > PAGE_SIZE - page_offset)
+ bytes -= (PAGE_SIZE - page_offset);
+ else
+ bytes = 0;
+ }
+ npages += (bytes + PAGE_SIZE - 1) >> PAGE_SHIFT;
+ oio->oi_lru_reserved = osc_lru_reserve(osc_cli(osc), npages);
+
+ return 0;
+}
+EXPORT_SYMBOL(osc_io_lru_reserve);
+
static const struct cl_io_operations osc_io_ops = {
.op = {
[CIT_READ] = {
- .cio_iter_init = osc_io_rw_iter_init,
+ .cio_iter_init = osc_io_iter_init,
.cio_iter_fini = osc_io_rw_iter_fini,
.cio_start = osc_io_read_start,
.cio_fini = osc_io_fini
},
[CIT_WRITE] = {
- .cio_iter_init = osc_io_rw_iter_init,
+ .cio_iter_init = osc_io_iter_init,
.cio_iter_fini = osc_io_rw_iter_fini,
.cio_start = osc_io_write_start,
.cio_end = osc_io_end,
@@ -1229,6 +1237,7 @@ void osc_io_lseek_end(const struct lu_env *env,
}
},
.cio_read_ahead = osc_io_read_ahead,
+ .cio_lru_reserve = osc_io_lru_reserve,
.cio_submit = osc_io_submit,
.cio_commit_async = osc_io_commit_async,
.cio_extent_release = osc_io_extent_release
@@ -793,6 +793,13 @@ static int osc_lru_alloc(const struct lu_env *env, struct client_obd *cli,
break;
if (rc > 0)
continue;
+ /* IO issued by readahead, don't try hard */
+ if (oio->oi_is_readahead) {
+ if (atomic_long_read(cli->cl_lru_left) > 0)
+ continue;
+ rc = -EBUSY;
+ break;
+ }
cond_resched();
@@ -824,18 +831,23 @@ unsigned long osc_lru_reserve(struct client_obd *cli, unsigned long npages)
unsigned long reserved = 0;
unsigned long max_pages;
unsigned long c;
+ int rc;
- /*
- * reserve a full RPC window at most to avoid that a thread accidentally
- * consumes too many LRU slots
- */
- max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
- if (npages > max_pages)
- npages = max_pages;
-
+again:
c = atomic_long_read(cli->cl_lru_left);
if (c < npages && osc_lru_reclaim(cli, npages) > 0)
c = atomic_long_read(cli->cl_lru_left);
+
+ if (c < npages) {
+ /*
+ * Trigger writeback in the hope some LRU slot could
+ * be freed.
+ */
+ rc = ptlrpcd_queue_work(cli->cl_writeback_work);
+ if (rc)
+ return 0;
+ }
+
while (c >= npages) {
if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) {
reserved = npages;
@@ -843,6 +855,16 @@ unsigned long osc_lru_reserve(struct client_obd *cli, unsigned long npages)
}
c = atomic_long_read(cli->cl_lru_left);
}
+
+ if (reserved != npages) {
+ cond_resched();
+ rc = l_wait_event_abortable(
+ osc_lru_waitq,
+ atomic_long_read(cli->cl_lru_left) > 0);
+ goto again;
+ }
+
+ max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
if (atomic_long_read(cli->cl_lru_left) < max_pages) {
/*
* If there aren't enough pages in the per-OSC LRU then