@@ -2592,7 +2592,8 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
int ioret);
int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
long timeout, int ioret);
-struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj);
+struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
+ struct cl_dio_aio *ll_aio);
void cl_aio_free(const struct lu_env *env, struct cl_dio_aio *aio);
static inline void cl_sync_io_init(struct cl_sync_io *anchor, int nr)
@@ -2626,7 +2627,9 @@ struct cl_dio_aio {
struct cl_object *cda_obj;
struct kiocb *cda_iocb;
ssize_t cda_bytes;
- unsigned int cda_no_aio_complete:1;
+ struct cl_dio_aio *cda_ll_aio;
+ unsigned int cda_no_aio_complete:1,
+ cda_no_aio_free:1;
};
/** @} cl_sync_io */
@@ -1684,7 +1684,7 @@ static void ll_heat_add(struct inode *inode, enum cl_io_type iot,
is_parallel_dio = false;
ci_aio = cl_aio_alloc(args->u.normal.via_iocb,
- ll_i2info(inode)->lli_clob);
+ ll_i2info(inode)->lli_clob, NULL);
if (!ci_aio) {
rc = -ENOMEM;
goto out;
@@ -330,7 +330,8 @@ static ssize_t ll_direct_IO(struct kiocb *iocb, struct iov_iter *iter)
struct cl_io *io;
struct file *file = iocb->ki_filp;
struct inode *inode = file->f_mapping->host;
- struct cl_dio_aio *aio;
+ struct cl_dio_aio *ll_aio;
+ struct cl_dio_aio *ldp_aio;
size_t count = iov_iter_count(iter);
ssize_t tot_bytes = 0, result = 0;
loff_t file_offset = iocb->ki_pos;
@@ -365,12 +366,12 @@ static ssize_t ll_direct_IO(struct kiocb *iocb, struct iov_iter *iter)
io = lcc->lcc_io;
LASSERT(io);
- aio = io->ci_aio;
- LASSERT(aio);
- LASSERT(aio->cda_iocb == iocb);
+ ll_aio = io->ci_aio;
+ LASSERT(ll_aio);
+ LASSERT(ll_aio->cda_iocb == iocb);
while (iov_iter_count(iter)) {
- struct ll_dio_pages pvec = { .ldp_aio = aio };
+ struct ll_dio_pages pvec = {};
struct page **pages;
count = min_t(size_t, iov_iter_count(iter), MAX_DIO_SIZE);
@@ -382,10 +383,23 @@ static ssize_t ll_direct_IO(struct kiocb *iocb, struct iov_iter *iter)
count = i_size_read(inode) - file_offset;
}
+ /* this aio is freed on completion from cl_sync_io_note, so we
+ * do not need to directly free the memory here
+ */
+ ldp_aio = cl_aio_alloc(iocb, ll_i2info(inode)->lli_clob,
+ ll_aio);
+ if (!ldp_aio) {
+ result = -ENOMEM;
+ goto out;
+ }
+ pvec.ldp_aio = ldp_aio;
+
result = ll_get_user_pages(rw, iter, &pages,
&pvec.ldp_count, count);
- if (unlikely(result <= 0))
+ if (unlikely(result <= 0)) {
+ cl_sync_io_note(env, &ldp_aio->cda_sync, result);
goto out;
+ }
count = result;
pvec.ldp_file_offset = file_offset;
@@ -393,6 +407,10 @@ static ssize_t ll_direct_IO(struct kiocb *iocb, struct iov_iter *iter)
result = ll_direct_rw_pages(env, io, count,
rw, inode, &pvec);
+ /* We've submitted pages and can now remove the extra
+ * reference for that
+ */
+ cl_sync_io_note(env, &ldp_aio->cda_sync, result);
ll_free_user_pages(pages, pvec.ldp_count);
if (unlikely(result < 0))
@@ -404,7 +422,7 @@ static ssize_t ll_direct_IO(struct kiocb *iocb, struct iov_iter *iter)
}
out:
- aio->cda_bytes += tot_bytes;
+ ll_aio->cda_bytes += tot_bytes;
if (rw == WRITE)
vio->u.readwrite.vui_written += tot_bytes;
@@ -424,7 +442,7 @@ static ssize_t ll_direct_IO(struct kiocb *iocb, struct iov_iter *iter)
ssize_t rc2;
/* Wait here rather than doing async submission */
- rc2 = cl_sync_io_wait_recycle(env, &aio->cda_sync, 0, 0);
+ rc2 = cl_sync_io_wait_recycle(env, &ll_aio->cda_sync, 0, 0);
if (result == 0 && rc2)
result = rc2;
@@ -1138,9 +1138,13 @@ static void cl_aio_end(const struct lu_env *env, struct cl_sync_io *anchor)
if (!aio->cda_no_aio_complete)
aio->cda_iocb->ki_complete(aio->cda_iocb,
ret ?: aio->cda_bytes, 0);
+
+ if (aio->cda_ll_aio)
+ cl_sync_io_note(env, &aio->cda_ll_aio->cda_sync, ret);
}
-struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj)
+struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
+ struct cl_dio_aio *ll_aio)
{
struct cl_dio_aio *aio;
@@ -1153,12 +1157,30 @@ struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj)
cl_sync_io_init_notify(&aio->cda_sync, 1, aio, cl_aio_end);
cl_page_list_init(&aio->cda_pages);
aio->cda_iocb = iocb;
- if (is_sync_kiocb(iocb))
+ if (is_sync_kiocb(iocb) || ll_aio)
aio->cda_no_aio_complete = 1;
else
aio->cda_no_aio_complete = 0;
+ /* in the case of a lower level aio struct (ll_aio is set), or
+ * true AIO (!is_sync_kiocb()), the memory is freed by
+ * the daemons calling cl_sync_io_note, because they are the
+ * last users of the aio struct
+ *
+ * in other cases, the last user is cl_sync_io_wait, and in
+ * that case, the caller frees the aio struct after that call
+ * completes
+ */
+ if (ll_aio || !is_sync_kiocb(iocb))
+ aio->cda_no_aio_free = 0;
+ else
+ aio->cda_no_aio_free = 1;
+
cl_object_get(obj);
aio->cda_obj = obj;
+ aio->cda_ll_aio = ll_aio;
+
+ if (ll_aio)
+ atomic_add(1, &ll_aio->cda_sync.csi_sync_nr);
}
return aio;
}
@@ -1206,14 +1228,7 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
spin_unlock(&anchor->csi_waitq.lock);
- /**
- * For AIO (!is_sync_kiocb), we are responsible for freeing
- * memory here. This is because we are the last user of this
- * aio struct, whereas in other cases, we will call
- * cl_sync_io_wait to wait after this, and so the memory is
- * freed after that call.
- */
- if (aio && !is_sync_kiocb(aio->cda_iocb))
+ if (aio && !aio->cda_no_aio_free)
cl_aio_free(env, aio);
}
}
@@ -1223,8 +1238,15 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
long timeout, int ioret)
{
+ bool no_aio_free = anchor->csi_aio->cda_no_aio_free;
int rc = 0;
+ /* for true AIO, the daemons running cl_sync_io_note would normally
+ * free the aio struct, but if we're waiting on it, we need them to not
+ * do that. This ensures the aio is not freed when we drop the
+ * reference count to zero in cl_sync_io_note below
+ */
+ anchor->csi_aio->cda_no_aio_free = 1;
/*
* @anchor was inited as 1 to prevent end_io to be
* called before we add all pages for IO, so drop
@@ -1244,6 +1266,8 @@ int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
*/
atomic_add(1, &anchor->csi_sync_nr);
+ anchor->csi_aio->cda_no_aio_free = no_aio_free;
+
return rc;
}
EXPORT_SYMBOL(cl_sync_io_wait_recycle);