Message ID | 20181009104806.6821-4-lhenriques@suse.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | copy_file_range in cephfs kernel client | expand |
On Tue, Oct 9, 2018 at 6:49 PM Luis Henriques <lhenriques@suse.com> wrote: > > This commit implements support for the copy_file_range syscall in cephfs. > It is implemented using the RADOS 'copy-from' operation, which allows to > do a remote object copy, without the need to download/upload data from/to > the OSDs. > > Some manual copy may however be required if the source/destination file > offsets aren't object aligned or if the copy lenght is smaller than the > object size. > > Signed-off-by: Luis Henriques <lhenriques@suse.com> > --- > fs/ceph/file.c | 297 ++++++++++++++++++++++++++++++++++++++++++++++++- > 1 file changed, 296 insertions(+), 1 deletion(-) > > diff --git a/fs/ceph/file.c b/fs/ceph/file.c > index 92ab20433682..ee4a390dd772 100644 > --- a/fs/ceph/file.c > +++ b/fs/ceph/file.c > @@ -1,5 +1,6 @@ > // SPDX-License-Identifier: GPL-2.0 > #include <linux/ceph/ceph_debug.h> > +#include <linux/ceph/striper.h> > > #include <linux/module.h> > #include <linux/sched.h> > @@ -1829,6 +1830,300 @@ static long ceph_fallocate(struct file *file, int mode, > return ret; > } > > +/* > + * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for > + * src_ci. Two attempts are made to obtain both caps, and an error is return if > + * this fails; zero is returned on success. > + */ > +static int get_rd_wr_caps(struct ceph_inode_info *src_ci, > + loff_t src_endoff, int *src_got, > + struct ceph_inode_info *dst_ci, > + loff_t dst_endoff, int *dst_got) > +{ > + int ret = 0; > + bool retrying = false; > + > +retry_caps: > + ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, > + dst_endoff, dst_got, NULL); > + if (ret < 0) > + return ret; > + > + /* > + * Since we're already holding the FILE_WR capability for the dst file, > + * we would risk a deadlock by using ceph_get_caps. Thus, we'll do some > + * retry dance instead to try to get both capabilities. > + */ > + ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED, > + false, src_got); > + if (ret <= 0) { > + /* Start by dropping dst_ci caps and getting src_ci caps */ > + ceph_put_cap_refs(dst_ci, *dst_got); > + if (retrying) { > + if (!ret) > + /* ceph_try_get_caps masks EAGAIN */ > + ret = -EAGAIN; > + return ret; > + } > + ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD, > + CEPH_CAP_FILE_SHARED, src_endoff, > + src_got, NULL); > + if (ret < 0) > + return ret; > + /*... drop src_ci caps too, and retry */ > + ceph_put_cap_refs(src_ci, *src_got); > + retrying = true; > + goto retry_caps; > + } > + return ret; > +} > + > +static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got, > + struct ceph_inode_info *dst_ci, int dst_got) > +{ > + ceph_put_cap_refs(src_ci, src_got); > + ceph_put_cap_refs(dst_ci, dst_got); > +} > + > +/* > + * This function does several size-related checks, returning an error if: > + * - source file is smaller than off+len > + * - destination file size is OK (inode_newsize_ok()) > + * - quotas won't be exceeded > + */ > +static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode, > + loff_t src_off, loff_t dst_off, size_t len) > +{ > + loff_t size, endoff; > + > + size = i_size_read(src_inode); > + /* > + * Don't copy beyond source file EOF. Instead of simply setting lenght > + * to (size - src_off), just drop to VFS default implementation, as the > + * local i_size may be stale due to other clients writing to the source > + * inode. > + */ > + if (src_off + len > size) { > + dout("Copy beyond EOF (%llu + %ld > %llu)\n", > + src_off, len, size); > + return -EOPNOTSUPP; > + } > + size = i_size_read(dst_inode); > + > + endoff = dst_off + len; > + if(inode_newsize_ok(dst_inode, endoff)) > + return -EOPNOTSUPP; > + > + if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff)) > + return -EDQUOT; > + > + return 0; > +} > + > +static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off, > + struct file *dst_file, loff_t dst_off, > + size_t len, unsigned int flags) > +{ > + struct inode *src_inode = file_inode(src_file); > + struct inode *dst_inode = file_inode(dst_file); > + struct ceph_inode_info *src_ci = ceph_inode(src_inode); > + struct ceph_inode_info *dst_ci = ceph_inode(dst_inode); > + struct ceph_cap_flush *prealloc_cf; > + struct ceph_object_locator src_oloc, dst_oloc; > + struct ceph_object_id src_oid, dst_oid; > + loff_t endoff = 0, size; > + ssize_t ret = -EIO; > + u64 src_objnum, dst_objnum, src_objoff, dst_objoff; > + u32 src_objlen, dst_objlen, object_size; > + int src_got = 0, dst_got = 0, err, dirty; > + > + if (src_inode == dst_inode) > + return -EINVAL; > + if (ceph_snap(dst_inode) != CEPH_NOSNAP) > + return -EROFS; > + > + /* > + * Some of the checks below will return -EOPNOTSUPP, which will force a > + * fallback to the default VFS copy_file_range implementation. This is > + * desirable in several cases (for ex, the 'len' is smaller than the > + * size of the objects, or in cases where that would be more > + * efficient). > + */ > + > + if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) || > + (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) || > + (src_ci->i_layout.object_size != dst_ci->i_layout.object_size)) > + return -EOPNOTSUPP; > + > + if (len < src_ci->i_layout.object_size) > + return -EOPNOTSUPP; /* no remote copy will be done */ > + > + prealloc_cf = ceph_alloc_cap_flush(); > + if (!prealloc_cf) > + return -ENOMEM; > + > + /* Start by sync'ing the source file */ > + ret = file_write_and_wait_range(src_file, src_off, (src_off + len)); > + if (ret < 0) > + goto out; > + > + /* > + * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other > + * clients may have dirty data in their caches. And OSDs know nothing > + * about caps, so they can't safely do the remote object copies. > + */ > + err = get_rd_wr_caps(src_ci, (src_off + len), &src_got, > + dst_ci, (dst_off + len), &dst_got); > + if (err < 0) { > + dout("get_rd_wr_caps returned %d\n", err); > + ret = -EOPNOTSUPP; > + goto out; > + } > + > + ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len); > + if (ret < 0) > + goto out_caps; > + > + size = i_size_read(dst_inode); > + endoff = dst_off + len; > + > + /* Drop dst file cached pages */ > + ret = invalidate_inode_pages2_range(dst_inode->i_mapping, > + dst_off >> PAGE_SHIFT, > + endoff >> PAGE_SHIFT); > + if (ret < 0) { > + dout("Failed to invalidate inode pages (%ld)\n", ret); > + ret = 0; /* XXX */ > + } > + src_oloc.pool = src_ci->i_layout.pool_id; > + src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns); > + dst_oloc.pool = dst_ci->i_layout.pool_id; > + dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns); > + > + ceph_calc_file_object_mapping(&src_ci->i_layout, src_off, > + src_ci->i_layout.object_size, > + &src_objnum, &src_objoff, &src_objlen); > + ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off, > + dst_ci->i_layout.object_size, > + &dst_objnum, &dst_objoff, &dst_objlen); > + /* object-level offsets need to the same */ > + if (src_objoff != dst_objoff) { > + ret = -EOPNOTSUPP; > + goto out_caps; > + } > + > + /* > + * Do a manual copy if the object offset isn't object aligned. > + * 'src_objlen' contains the bytes left until the end of the object, > + * starting at the src_off > + */ > + if (src_objoff) { > + /* > + * we need to temporarily drop all caps as we'll be calling > + * {read,write}_iter, which will get caps again. > + */ > + put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got); > + ret = do_splice_direct(src_file, &src_off, dst_file, > + &dst_off, src_objlen, flags); > + if (ret < 0) { > + dout("do_splice_direct returned %d\n", err); > + goto out; > + } > + len -= ret; > + err = get_rd_wr_caps(src_ci, (src_off + len), > + &src_got, dst_ci, > + (dst_off + len), &dst_got); > + if (err < 0) > + goto out; > + err = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len); > + if (err < 0) > + goto out_caps; > + } > + object_size = src_ci->i_layout.object_size; > + while (len >= object_size) { > + ceph_calc_file_object_mapping(&src_ci->i_layout, src_off, > + object_size, &src_objnum, > + &src_objoff, &src_objlen); > + ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off, > + object_size, &dst_objnum, > + &dst_objoff, &dst_objlen); > + ceph_oid_init(&src_oid); > + ceph_oid_printf(&src_oid, "%llx.%08llx", > + src_ci->i_vino.ino, src_objnum); > + ceph_oid_init(&dst_oid); > + ceph_oid_printf(&dst_oid, "%llx.%08llx", > + dst_ci->i_vino.ino, dst_objnum); > + /* Do an object remote copy */ > + err = ceph_osdc_copy_from( > + &ceph_inode_to_client(src_inode)->client->osdc, > + src_ci->i_vino.snap, 0, > + &src_oid, &src_oloc, > + CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | > + CEPH_OSD_OP_FLAG_FADVISE_WILLNEED, > + dst_ci->i_vino.snap, &dst_oid, &dst_oloc, > + CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | > + CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); > + ceph_oid_destroy(&src_oid); > + ceph_oid_destroy(&dst_oid); > + if (err) { > + dout("ceph_osdc_copy_from returned %d\n", err); > + goto out_caps; > + } > + len -= object_size; > + src_off += object_size; > + dst_off += object_size; > + ret += object_size; > + } > + > + if (len) { > + ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off, > + len, &dst_objnum, &dst_objoff, > + &dst_objlen); > + put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got); > + err = do_splice_direct(src_file, &src_off, dst_file, > + &dst_off, len, flags); > + if (err < 0) { > + dout("do_splice_direct returned %d\n", err); > + goto out; > + } > + len -= err; > + ret += err; > + err = get_rd_wr_caps(src_ci, (src_off + len), > + &src_got, dst_ci, > + (dst_off + len), &dst_got); > + if (err < 0) { > + goto out; > + } > + } > + > + file_update_time(dst_file); > + if (endoff > size) { > + int caps_flags = 0; > + > + /* Let the MDS know about dst file size change */ > + if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff)) > + caps_flags |= CHECK_CAPS_NODELAY; > + if (ceph_inode_set_size(dst_inode, endoff)) > + caps_flags |= CHECK_CAPS_AUTHONLY; > + if (caps_flags) > + ceph_check_caps(dst_ci, caps_flags, NULL); > + } > + /* Mark Fw dirty */ > + spin_lock(&dst_ci->i_ceph_lock); > + dst_ci->i_inline_version = CEPH_INLINE_NONE; > + dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf); > + spin_unlock(&dst_ci->i_ceph_lock); > + if (dirty) > + __mark_inode_dirty(dst_inode, dirty); Above code (from file_update_time) should be placed before code block of the second do_splice_direct(). We need to mark caps dirty before droping cap reference. Otherwise, the code looks great. Regards Yan, Zheng > + > +out_caps: > + put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got); > +out: > + ceph_free_cap_flush(prealloc_cf); > + > + return ret; > +} > + > const struct file_operations ceph_file_fops = { > .open = ceph_open, > .release = ceph_release, > @@ -1844,5 +2139,5 @@ const struct file_operations ceph_file_fops = { > .unlocked_ioctl = ceph_ioctl, > .compat_ioctl = ceph_ioctl, > .fallocate = ceph_fallocate, > + .copy_file_range = ceph_copy_file_range, > }; > -
"Yan, Zheng" <ukernel@gmail.com> writes: > On Tue, Oct 9, 2018 at 6:49 PM Luis Henriques <lhenriques@suse.com> wrote: <snip> >> + file_update_time(dst_file); >> + if (endoff > size) { >> + int caps_flags = 0; >> + >> + /* Let the MDS know about dst file size change */ >> + if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff)) >> + caps_flags |= CHECK_CAPS_NODELAY; >> + if (ceph_inode_set_size(dst_inode, endoff)) >> + caps_flags |= CHECK_CAPS_AUTHONLY; >> + if (caps_flags) >> + ceph_check_caps(dst_ci, caps_flags, NULL); >> + } >> + /* Mark Fw dirty */ >> + spin_lock(&dst_ci->i_ceph_lock); >> + dst_ci->i_inline_version = CEPH_INLINE_NONE; >> + dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf); >> + spin_unlock(&dst_ci->i_ceph_lock); >> + if (dirty) >> + __mark_inode_dirty(dst_inode, dirty); > > > Above code (from file_update_time) should be placed before code block > of the second do_splice_direct(). We need to mark caps dirty before > droping cap reference. So, in that case I guess I'll also need to replace 'endoff' by 'dst_off' in that code block. Ok, I'll change that, thanks. Cheers,
diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 92ab20433682..ee4a390dd772 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -1,5 +1,6 @@ // SPDX-License-Identifier: GPL-2.0 #include <linux/ceph/ceph_debug.h> +#include <linux/ceph/striper.h> #include <linux/module.h> #include <linux/sched.h> @@ -1829,6 +1830,300 @@ static long ceph_fallocate(struct file *file, int mode, return ret; } +/* + * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for + * src_ci. Two attempts are made to obtain both caps, and an error is return if + * this fails; zero is returned on success. + */ +static int get_rd_wr_caps(struct ceph_inode_info *src_ci, + loff_t src_endoff, int *src_got, + struct ceph_inode_info *dst_ci, + loff_t dst_endoff, int *dst_got) +{ + int ret = 0; + bool retrying = false; + +retry_caps: + ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, + dst_endoff, dst_got, NULL); + if (ret < 0) + return ret; + + /* + * Since we're already holding the FILE_WR capability for the dst file, + * we would risk a deadlock by using ceph_get_caps. Thus, we'll do some + * retry dance instead to try to get both capabilities. + */ + ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED, + false, src_got); + if (ret <= 0) { + /* Start by dropping dst_ci caps and getting src_ci caps */ + ceph_put_cap_refs(dst_ci, *dst_got); + if (retrying) { + if (!ret) + /* ceph_try_get_caps masks EAGAIN */ + ret = -EAGAIN; + return ret; + } + ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD, + CEPH_CAP_FILE_SHARED, src_endoff, + src_got, NULL); + if (ret < 0) + return ret; + /*... drop src_ci caps too, and retry */ + ceph_put_cap_refs(src_ci, *src_got); + retrying = true; + goto retry_caps; + } + return ret; +} + +static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got, + struct ceph_inode_info *dst_ci, int dst_got) +{ + ceph_put_cap_refs(src_ci, src_got); + ceph_put_cap_refs(dst_ci, dst_got); +} + +/* + * This function does several size-related checks, returning an error if: + * - source file is smaller than off+len + * - destination file size is OK (inode_newsize_ok()) + * - quotas won't be exceeded + */ +static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode, + loff_t src_off, loff_t dst_off, size_t len) +{ + loff_t size, endoff; + + size = i_size_read(src_inode); + /* + * Don't copy beyond source file EOF. Instead of simply setting lenght + * to (size - src_off), just drop to VFS default implementation, as the + * local i_size may be stale due to other clients writing to the source + * inode. + */ + if (src_off + len > size) { + dout("Copy beyond EOF (%llu + %ld > %llu)\n", + src_off, len, size); + return -EOPNOTSUPP; + } + size = i_size_read(dst_inode); + + endoff = dst_off + len; + if(inode_newsize_ok(dst_inode, endoff)) + return -EOPNOTSUPP; + + if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff)) + return -EDQUOT; + + return 0; +} + +static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off, + struct file *dst_file, loff_t dst_off, + size_t len, unsigned int flags) +{ + struct inode *src_inode = file_inode(src_file); + struct inode *dst_inode = file_inode(dst_file); + struct ceph_inode_info *src_ci = ceph_inode(src_inode); + struct ceph_inode_info *dst_ci = ceph_inode(dst_inode); + struct ceph_cap_flush *prealloc_cf; + struct ceph_object_locator src_oloc, dst_oloc; + struct ceph_object_id src_oid, dst_oid; + loff_t endoff = 0, size; + ssize_t ret = -EIO; + u64 src_objnum, dst_objnum, src_objoff, dst_objoff; + u32 src_objlen, dst_objlen, object_size; + int src_got = 0, dst_got = 0, err, dirty; + + if (src_inode == dst_inode) + return -EINVAL; + if (ceph_snap(dst_inode) != CEPH_NOSNAP) + return -EROFS; + + /* + * Some of the checks below will return -EOPNOTSUPP, which will force a + * fallback to the default VFS copy_file_range implementation. This is + * desirable in several cases (for ex, the 'len' is smaller than the + * size of the objects, or in cases where that would be more + * efficient). + */ + + if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) || + (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) || + (src_ci->i_layout.object_size != dst_ci->i_layout.object_size)) + return -EOPNOTSUPP; + + if (len < src_ci->i_layout.object_size) + return -EOPNOTSUPP; /* no remote copy will be done */ + + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + + /* Start by sync'ing the source file */ + ret = file_write_and_wait_range(src_file, src_off, (src_off + len)); + if (ret < 0) + goto out; + + /* + * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other + * clients may have dirty data in their caches. And OSDs know nothing + * about caps, so they can't safely do the remote object copies. + */ + err = get_rd_wr_caps(src_ci, (src_off + len), &src_got, + dst_ci, (dst_off + len), &dst_got); + if (err < 0) { + dout("get_rd_wr_caps returned %d\n", err); + ret = -EOPNOTSUPP; + goto out; + } + + ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len); + if (ret < 0) + goto out_caps; + + size = i_size_read(dst_inode); + endoff = dst_off + len; + + /* Drop dst file cached pages */ + ret = invalidate_inode_pages2_range(dst_inode->i_mapping, + dst_off >> PAGE_SHIFT, + endoff >> PAGE_SHIFT); + if (ret < 0) { + dout("Failed to invalidate inode pages (%ld)\n", ret); + ret = 0; /* XXX */ + } + src_oloc.pool = src_ci->i_layout.pool_id; + src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns); + dst_oloc.pool = dst_ci->i_layout.pool_id; + dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns); + + ceph_calc_file_object_mapping(&src_ci->i_layout, src_off, + src_ci->i_layout.object_size, + &src_objnum, &src_objoff, &src_objlen); + ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off, + dst_ci->i_layout.object_size, + &dst_objnum, &dst_objoff, &dst_objlen); + /* object-level offsets need to the same */ + if (src_objoff != dst_objoff) { + ret = -EOPNOTSUPP; + goto out_caps; + } + + /* + * Do a manual copy if the object offset isn't object aligned. + * 'src_objlen' contains the bytes left until the end of the object, + * starting at the src_off + */ + if (src_objoff) { + /* + * we need to temporarily drop all caps as we'll be calling + * {read,write}_iter, which will get caps again. + */ + put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got); + ret = do_splice_direct(src_file, &src_off, dst_file, + &dst_off, src_objlen, flags); + if (ret < 0) { + dout("do_splice_direct returned %d\n", err); + goto out; + } + len -= ret; + err = get_rd_wr_caps(src_ci, (src_off + len), + &src_got, dst_ci, + (dst_off + len), &dst_got); + if (err < 0) + goto out; + err = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len); + if (err < 0) + goto out_caps; + } + object_size = src_ci->i_layout.object_size; + while (len >= object_size) { + ceph_calc_file_object_mapping(&src_ci->i_layout, src_off, + object_size, &src_objnum, + &src_objoff, &src_objlen); + ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off, + object_size, &dst_objnum, + &dst_objoff, &dst_objlen); + ceph_oid_init(&src_oid); + ceph_oid_printf(&src_oid, "%llx.%08llx", + src_ci->i_vino.ino, src_objnum); + ceph_oid_init(&dst_oid); + ceph_oid_printf(&dst_oid, "%llx.%08llx", + dst_ci->i_vino.ino, dst_objnum); + /* Do an object remote copy */ + err = ceph_osdc_copy_from( + &ceph_inode_to_client(src_inode)->client->osdc, + src_ci->i_vino.snap, 0, + &src_oid, &src_oloc, + CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | + CEPH_OSD_OP_FLAG_FADVISE_WILLNEED, + dst_ci->i_vino.snap, &dst_oid, &dst_oloc, + CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | + CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + ceph_oid_destroy(&src_oid); + ceph_oid_destroy(&dst_oid); + if (err) { + dout("ceph_osdc_copy_from returned %d\n", err); + goto out_caps; + } + len -= object_size; + src_off += object_size; + dst_off += object_size; + ret += object_size; + } + + if (len) { + ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off, + len, &dst_objnum, &dst_objoff, + &dst_objlen); + put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got); + err = do_splice_direct(src_file, &src_off, dst_file, + &dst_off, len, flags); + if (err < 0) { + dout("do_splice_direct returned %d\n", err); + goto out; + } + len -= err; + ret += err; + err = get_rd_wr_caps(src_ci, (src_off + len), + &src_got, dst_ci, + (dst_off + len), &dst_got); + if (err < 0) { + goto out; + } + } + + file_update_time(dst_file); + if (endoff > size) { + int caps_flags = 0; + + /* Let the MDS know about dst file size change */ + if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff)) + caps_flags |= CHECK_CAPS_NODELAY; + if (ceph_inode_set_size(dst_inode, endoff)) + caps_flags |= CHECK_CAPS_AUTHONLY; + if (caps_flags) + ceph_check_caps(dst_ci, caps_flags, NULL); + } + /* Mark Fw dirty */ + spin_lock(&dst_ci->i_ceph_lock); + dst_ci->i_inline_version = CEPH_INLINE_NONE; + dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf); + spin_unlock(&dst_ci->i_ceph_lock); + if (dirty) + __mark_inode_dirty(dst_inode, dirty); + +out_caps: + put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got); +out: + ceph_free_cap_flush(prealloc_cf); + + return ret; +} + const struct file_operations ceph_file_fops = { .open = ceph_open, .release = ceph_release, @@ -1844,5 +2139,5 @@ const struct file_operations ceph_file_fops = { .unlocked_ioctl = ceph_ioctl, .compat_ioctl = ceph_ioctl, .fallocate = ceph_fallocate, + .copy_file_range = ceph_copy_file_range, }; -
This commit implements support for the copy_file_range syscall in cephfs. It is implemented using the RADOS 'copy-from' operation, which allows to do a remote object copy, without the need to download/upload data from/to the OSDs. Some manual copy may however be required if the source/destination file offsets aren't object aligned or if the copy lenght is smaller than the object size. Signed-off-by: Luis Henriques <lhenriques@suse.com> --- fs/ceph/file.c | 297 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 296 insertions(+), 1 deletion(-)