@@ -1996,7 +1996,13 @@ struct cl_io {
/**
* Sequential read hints.
*/
- ci_seq_read:1;
+ ci_seq_read:1,
+ /**
+ * Do parallel (async) submission of DIO RPCs. Note DIO is still sync
+ * to userspace, only the RPCs are submitted async, then waited for at
+ * the llite layer before returning.
+ */
+ ci_parallel_dio:1;
/**
* Bypass quota check
*/
@@ -2585,6 +2591,8 @@ int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
long timeout);
void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
int ioret);
+int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
+ long timeout, int ioret);
struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb);
void cl_aio_free(struct cl_dio_aio *aio);
@@ -602,7 +602,7 @@ int osc_teardown_async_page(const struct lu_env *env, struct osc_object *obj,
struct osc_page *ops);
int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
struct osc_page *ops);
-int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+int osc_queue_sync_pages(const struct lu_env *env, struct cl_io *io,
struct osc_object *obj, struct list_head *list,
int brw_flags);
int osc_cache_truncate_start(const struct lu_env *env, struct osc_object *obj,
@@ -1619,12 +1619,15 @@ static void ll_heat_add(struct inode *inode, enum cl_io_type iot,
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct vvp_io *vio = vvp_env_io(env);
struct range_lock range;
+ bool range_locked = false;
struct cl_io *io;
ssize_t result = 0;
int rc = 0;
+ int rc2 = 0;
unsigned int retried = 0;
unsigned int dio_lock = 0;
bool is_aio = false;
+ bool is_parallel_dio = false;
struct cl_dio_aio *ci_aio = NULL;
size_t per_bytes;
bool partial_io = false;
@@ -1642,6 +1645,17 @@ static void ll_heat_add(struct inode *inode, enum cl_io_type iot,
if (file->f_flags & O_DIRECT) {
if (!is_sync_kiocb(args->u.normal.via_iocb))
is_aio = true;
+
+ /* the kernel does not support AIO on pipes, and parallel DIO
+ * uses part of the AIO path, so we must not do parallel dio
+ * to pipes
+ */
+ is_parallel_dio = !iov_iter_is_pipe(args->u.normal.via_iter) &&
+ !is_aio;
+
+ if (!ll_sbi_has_parallel_dio(sbi))
+ is_parallel_dio = false;
+
ci_aio = cl_aio_alloc(args->u.normal.via_iocb);
if (!ci_aio) {
rc = -ENOMEM;
@@ -1665,10 +1679,9 @@ static void ll_heat_add(struct inode *inode, enum cl_io_type iot,
io->ci_aio = ci_aio;
io->ci_dio_lock = dio_lock;
io->ci_ndelay_tried = retried;
+ io->ci_parallel_dio = is_parallel_dio;
if (cl_io_rw_init(env, io, iot, *ppos, per_bytes) == 0) {
- bool range_locked = false;
-
if (file->f_flags & O_APPEND)
range_lock_init(&range, 0, LUSTRE_EOF);
else
@@ -1697,17 +1710,41 @@ static void ll_heat_add(struct inode *inode, enum cl_io_type iot,
ll_cl_add(file, env, io, LCC_RW);
rc = cl_io_loop(env, io);
ll_cl_remove(file, env);
- if (range_locked) {
+ if (range_locked && !is_parallel_dio) {
CDEBUG(D_VFSTRACE, "Range unlock [%llu, %llu]\n",
range.rl_start,
range.rl_last);
range_unlock(&lli->lli_write_tree, &range);
+ range_locked = false;
}
} else {
/* cl_io_rw_init() handled IO */
rc = io->ci_result;
}
+ /* N/B: parallel DIO may be disabled during i/o submission;
+ * if that occurs, async RPCs are resolved before we get here, and this
+ * wait call completes immediately.
+ */
+ if (is_parallel_dio) {
+ struct cl_sync_io *anchor = &io->ci_aio->cda_sync;
+
+ /* for dio, EIOCBQUEUED is an implementation detail,
+ * and we don't return it to userspace
+ */
+ if (rc == -EIOCBQUEUED)
+ rc = 0;
+
+ rc2 = cl_sync_io_wait_recycle(env, anchor, 0, 0);
+ if (rc2 < 0)
+ rc = rc2;
+
+ if (range_locked) {
+ range_unlock(&lli->lli_write_tree, &range);
+ range_locked = false;
+ }
+ }
+
/*
* In order to move forward AIO, ci_nob was increased,
* but that doesn't mean io have been finished, it just
@@ -1717,8 +1754,12 @@ static void ll_heat_add(struct inode *inode, enum cl_io_type iot,
*/
if (io->ci_nob > 0) {
if (!is_aio) {
- result += io->ci_nob;
- *ppos = io->u.ci_wr.wr.crw_pos; /* for splice */
+ if (rc2 == 0) {
+ result += io->ci_nob;
+ *ppos = io->u.ci_wr.wr.crw_pos; /* for splice */
+ } else if (rc2) {
+ result = 0;
+ }
}
count -= io->ci_nob;
@@ -631,6 +631,9 @@ enum stats_track_type {
#define LL_SBI_FOREIGN_SYMLINK 0x20000000 /* foreign fake-symlink support */
/* foreign fake-symlink upcall registered */
#define LL_SBI_FOREIGN_SYMLINK_UPCALL 0x40000000
+#define LL_SBI_PARALLEL_DIO 0x80000000 /* parallel (async) submission of
+ * RPCs for DIO
+ */
#define LL_SBI_FLAGS { \
"nolck", \
@@ -664,6 +667,7 @@ enum stats_track_type {
"noencrypt", \
"foreign_symlink", \
"foreign_symlink_upcall", \
+ "parallel_dio", \
}
/*
@@ -1001,6 +1005,11 @@ static inline bool ll_sbi_has_foreign_symlink(struct ll_sb_info *sbi)
return !!(sbi->ll_flags & LL_SBI_FOREIGN_SYMLINK);
}
+static inline bool ll_sbi_has_parallel_dio(struct ll_sb_info *sbi)
+{
+ return !!(sbi->ll_flags & LL_SBI_PARALLEL_DIO);
+}
+
void ll_ras_enter(struct file *f, loff_t pos, size_t count);
/* llite/lcommon_misc.c */
@@ -179,6 +179,7 @@ static struct ll_sb_info *ll_init_sbi(void)
sbi->ll_flags |= LL_SBI_AGL_ENABLED;
sbi->ll_flags |= LL_SBI_FAST_READ;
sbi->ll_flags |= LL_SBI_TINY_WRITE;
+ sbi->ll_flags |= LL_SBI_PARALLEL_DIO;
ll_sbi_set_encrypt(sbi, true);
/* root squash */
@@ -1100,6 +1100,42 @@ static ssize_t tiny_write_store(struct kobject *kobj,
}
LUSTRE_RW_ATTR(tiny_write);
+static ssize_t parallel_dio_show(struct kobject *kobj,
+ struct attribute *attr,
+ char *buf)
+{
+ struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+ ll_kset.kobj);
+
+ return snprintf(buf, PAGE_SIZE, "%u\n",
+ !!(sbi->ll_flags & LL_SBI_PARALLEL_DIO));
+}
+
+static ssize_t parallel_dio_store(struct kobject *kobj,
+ struct attribute *attr,
+ const char *buffer,
+ size_t count)
+{
+ struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+ ll_kset.kobj);
+ bool val;
+ int rc;
+
+ rc = kstrtobool(buffer, &val);
+ if (rc)
+ return rc;
+
+ spin_lock(&sbi->ll_lock);
+ if (val)
+ sbi->ll_flags |= LL_SBI_PARALLEL_DIO;
+ else
+ sbi->ll_flags &= ~LL_SBI_PARALLEL_DIO;
+ spin_unlock(&sbi->ll_lock);
+
+ return count;
+}
+LUSTRE_RW_ATTR(parallel_dio);
+
static ssize_t max_read_ahead_async_active_show(struct kobject *kobj,
struct attribute *attr,
char *buf)
@@ -1685,6 +1721,7 @@ struct ldebugfs_vars lprocfs_llite_obd_vars[] = {
&lustre_attr_xattr_cache.attr,
&lustre_attr_fast_read.attr,
&lustre_attr_tiny_write.attr,
+ &lustre_attr_parallel_dio.attr,
&lustre_attr_file_heat.attr,
&lustre_attr_heat_decay_percentage.attr,
&lustre_attr_heat_period_second.attr,
@@ -404,39 +404,23 @@ static ssize_t ll_direct_IO(struct kiocb *iocb, struct iov_iter *iter)
out:
aio->cda_bytes += tot_bytes;
- if (is_sync_kiocb(iocb)) {
- struct cl_sync_io *anchor = &aio->cda_sync;
- ssize_t rc2;
+ if (rw == WRITE)
+ vio->u.readwrite.vui_written += tot_bytes;
+ else
+ vio->u.readwrite.vui_read += tot_bytes;
- /**
- * @anchor was inited as 1 to prevent end_io to be
- * called before we add all pages for IO, so drop
- * one extra reference to make sure we could wait
- * count to be zero.
- */
- cl_sync_io_note(env, anchor, result);
+ /* If async dio submission is not allowed, we must wait here. */
+ if (is_sync_kiocb(iocb) && !io->ci_parallel_dio) {
+ ssize_t rc2;
- rc2 = cl_sync_io_wait(env, anchor, 0);
+ rc2 = cl_sync_io_wait_recycle(env, &aio->cda_sync, 0, 0);
if (result == 0 && rc2)
result = rc2;
- /**
- * One extra reference again, as if @anchor is
- * reused we assume it as 1 before using.
- */
- atomic_add(1, &anchor->csi_sync_nr);
- if (result == 0) {
- /* no commit async for direct IO */
- vio->u.readwrite.vui_written += tot_bytes;
- result = tot_bytes;
- }
- } else {
- if (rw == WRITE)
- vio->u.readwrite.vui_written += tot_bytes;
- else
- vio->u.readwrite.vui_read += tot_bytes;
if (result == 0)
- result = -EIOCBQUEUED;
+ result = tot_bytes;
+ } else if (result == 0) {
+ result = -EIOCBQUEUED;
}
return result;
@@ -526,6 +526,7 @@ static void vvp_io_advance(const struct lu_env *env,
* of relying on VFS, we move iov iter by ourselves.
*/
iov_iter_advance(vio->vui_iter, nob);
+ CDEBUG(D_VFSTRACE, "advancing %ld bytes\n", nob);
vio->vui_tot_count -= nob;
iov_iter_reexpand(vio->vui_iter, vio->vui_tot_count);
}
@@ -1202,3 +1202,32 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
}
}
EXPORT_SYMBOL(cl_sync_io_note);
+
+
+int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
+ long timeout, int ioret)
+{
+ int rc = 0;
+
+ /*
+ * @anchor was inited as 1 to prevent end_io to be
+ * called before we add all pages for IO, so drop
+ * one extra reference to make sure we could wait
+ * count to be zero.
+ */
+ cl_sync_io_note(env, anchor, ioret);
+ /* Wait for completion of normal dio.
+ * This replaces the EIOCBQEUED return from the DIO/AIO
+ * path, and this is where AIO and DIO implementations
+ * split.
+ */
+ rc = cl_sync_io_wait(env, anchor, timeout);
+ /**
+ * One extra reference again, as if @anchor is
+ * reused we assume it as 1 before using.
+ */
+ atomic_add(1, &anchor->csi_sync_nr);
+
+ return rc;
+}
+EXPORT_SYMBOL(cl_sync_io_wait_recycle);
@@ -2640,7 +2640,7 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
return rc;
}
-int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+int osc_queue_sync_pages(const struct lu_env *env, struct cl_io *io,
struct osc_object *obj, struct list_head *list,
int brw_flags)
{
@@ -2701,6 +2701,7 @@ int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
grants += (1 << cli->cl_chunkbits) *
((page_count + ppc - 1) / ppc);
+ CDEBUG(D_CACHE, "requesting %d bytes grant\n", grants);
spin_lock(&cli->cl_loi_list_lock);
if (osc_reserve_grant(cli, grants) == 0) {
list_for_each_entry(oap, list, oap_pending_item) {
@@ -2710,6 +2711,15 @@ int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
}
osc_unreserve_grant_nolock(cli, grants, 0);
ext->oe_grants = grants;
+ } else {
+ /* We cannot report ENOSPC correctly if we do parallel
+ * DIO (async RPC submission), so turn off parallel dio
+ * if there is not sufficient grant available. This
+ * makes individual RPCs synchronous.
+ */
+ io->ci_parallel_dio = false;
+ CDEBUG(D_CACHE,
+ "not enough grant available, switching to sync for this i/o\n");
}
spin_unlock(&cli->cl_loi_list_lock);
}