diff mbox

[RFC] Inline data support

Message ID 1372862768-8194-1-git-send-email-liwang@ubuntukylin.com (mailing list archive)
State New, archived
Headers show

Commit Message

Li Wang July 3, 2013, 2:46 p.m. UTC
This patch gives a preliminary implementation of inline data support for Ceph.
Comments are appreciated.

Signed-off-by: Li Wang <liwang@ubuntukylin.com>
Signed-off-by: Yunchuan Wen <yunchuanwen@ubuntukylin.com>
---
 src/client/Client.cc        |  158 ++++++++++++++++++++++++++++++++++++++-----
 src/client/Inode.h          |    6 ++
 src/include/ceph_fs.h       |    9 +++
 src/mds/CInode.cc           |   22 ++++++
 src/mds/Capability.h        |    3 +
 src/mds/Locker.cc           |   48 +++++++++++++
 src/mds/mdstypes.cc         |    4 ++
 src/mds/mdstypes.h          |    3 +
 src/messages/MClientCaps.h  |    6 ++
 src/messages/MClientReply.h |    4 ++
 10 files changed, 246 insertions(+), 17 deletions(-)

Comments

Sage Weil July 3, 2013, 4:22 p.m. UTC | #1
On Wed, 3 Jul 2013, Li Wang wrote:
> This patch gives a preliminary implementation of inline data support for Ceph.
> Comments are appreciated.
> 
> Signed-off-by: Li Wang <liwang@ubuntukylin.com>
> Signed-off-by: Yunchuan Wen <yunchuanwen@ubuntukylin.com>

A few comments below (although I didn't have time for a thorough review).  
My main concern is that it looks like it is possible to have a file that 
is half-inlined and half not.  That seems to complicate things and it's 
not clear to me that it is much of a win (correct me if I'm wrong here!).  

My other concern is that it is pretty simple to reason about this if the 
shared reader and exclusive writer cases.  For shared write, or mixed 
read/write (LOCK_MIX filelock state on mds) it all sort of falls apart.  
Is the plan for the MDS to uninline in that case before issuing those caps 
to clients?  That's clearly not implemented here (yet), but just want to 
confirm whether that is the rough plan.  (Maybe we discussed this during 
the CDS session but I can't remember.)

There are protocol compatibility issues (noted below), but we can deal 
with that last, I think.

Thanks!
sage

> ---
>  src/client/Client.cc        |  158 ++++++++++++++++++++++++++++++++++++++-----
>  src/client/Inode.h          |    6 ++
>  src/include/ceph_fs.h       |    9 +++
>  src/mds/CInode.cc           |   22 ++++++
>  src/mds/Capability.h        |    3 +
>  src/mds/Locker.cc           |   48 +++++++++++++
>  src/mds/mdstypes.cc         |    4 ++
>  src/mds/mdstypes.h          |    3 +
>  src/messages/MClientCaps.h  |    6 ++
>  src/messages/MClientReply.h |    4 ++
>  10 files changed, 246 insertions(+), 17 deletions(-)
> 
> diff --git a/src/client/Client.cc b/src/client/Client.cc
> index bddfa0a..fe947be 100644
> --- a/src/client/Client.cc
> +++ b/src/client/Client.cc
> @@ -632,6 +632,11 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from, MetaSession *sessi
>      in->layout = st->layout;
>      in->ctime = st->ctime;
>      in->max_size = st->max_size;  // right?
> +
> +    if (st->inline_version > in->inline_version) {
> +      in->inline_version = st->inline_version;
> +      in->inline_data = st->inline_data;
> +    }
>    
>      update_inode_file_bits(in, st->truncate_seq, st->truncate_size, st->size,
>  			   st->time_warp_seq, st->ctime, st->mtime, st->atime,
> @@ -2321,6 +2326,17 @@ void Client::send_cap(Inode *in, MetaSession *session, Cap *cap,
>      ::encode(in->xattrs, m->xattrbl);
>      m->head.xattr_version = in->xattr_version;
>    }
> +
> +  if ((flush & CEPH_CAP_FILE_WR) &&
> +      (flush & CEPH_CAP_FILE_BUFFER)) {
> +    if (in->inline_commit_data.length()) {
> +      m->head.inline_version = 0;
> +      m->inline_data = in->inline_commit_data;
> +    } else {
> +      m->head.inline_version = in->inline_version;
> +      m->inline_data = in->inline_data;

It seems like if the inline data hasn't changed, we should be able to 
avoid sending the old data over the wire and just set the version.  The 
MDS should be able to infer that there was no change from that, and/or 
from the dirty_caps not includeing WR.

> +    }
> +  }
>    
>    m->head.layout = in->layout;
>    m->head.size = in->size;
> @@ -3550,6 +3566,13 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient
>      ::decode(in->xattrs, p);
>      in->xattr_version = m->head.xattr_version;
>    }
> +  if ((new_caps & CEPH_CAP_FILE_CACHE) &&
> +      (m->get_inline_version() >= in->inline_version)) {
> +    ldout(cct, 10) << " update inline version " << m->get_inline_version() << dendl;
> +    in->inline_version = m->get_inline_version();
> +    in->inline_data = m->inline_data;
> +  }
> +
>    update_inode_file_bits(in, m->get_truncate_seq(), m->get_truncate_size(), m->get_size(),
>  			 m->get_time_warp_seq(), m->get_ctime(), m->get_mtime(), m->get_atime(), issued);
>  
> @@ -5637,6 +5660,13 @@ int Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
>    Inode *in = f->inode;
>  
>    //bool lazy = f->mode == CEPH_FILE_MODE_LAZY;
> +  if ((in->inline_version != CEPH_INLINE_DISABLED) &&
> +      !(in->caps_issued(NULL) & CEPH_CAP_FILE_CACHE)) {
> +    ldout(cct, 10) << " read inline from mds." << dendl;
> +    int ret = _getattr(in, CEPH_STAT_CAP_INLINE, -1, -1, true);
> +    if (ret < 0)
> +      return ret;
> +  }
>  
>    int have;
>    int r = get_caps(in, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_CACHE, &have, -1);
> @@ -5650,15 +5680,31 @@ int Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
>      movepos = true;
>    }
>  
> -  if (!conf->client_debug_force_sync_read &&
> -      (cct->_conf->client_oc && (have & CEPH_CAP_FILE_CACHE))) {
> +  unsigned int inlinesize = 0;
> +  if (in->inline_version != CEPH_INLINE_DISABLED) {
> +    if (offset < in->inline_data.length()) {
> +      inlinesize = in->inline_data.length() - offset;
> +      if (inlinesize > size)
> +        inlinesize = size;
> +      in->inline_data.copy(offset, inlinesize, *bl);
> +    }
> +  }

It might be simpler to support either all-inline or no-inline, and not a 
combination of some data inline and some data on the OSD.  It's not clear 
to me what the value of that scenario is, and it complicates the 
implementation.

> +
> +  if (size > inlinesize) {
> +    bufferlist blread;
> +    int64_t bloffset = offset + inlinesize;
> +    uint64_t blsize = size - inlinesize;
> +    if (!conf->client_debug_force_sync_read &&
> +        (cct->_conf->client_oc && (have & CEPH_CAP_FILE_CACHE))) {
>  
> -    if (f->flags & O_RSYNC) {
> -      _flush_range(in, offset, size);
> +      if (f->flags & O_RSYNC) {
> +        _flush_range(in, bloffset, blsize);
> +      }
> +      r = _read_async(f, bloffset, blsize, &blread);
> +    } else {
> +      r = _read_sync(f, bloffset, blsize, &blread);
>      }
> -    r = _read_async(f, offset, size, bl);
> -  } else {
> -    r = _read_sync(f, offset, size, bl);
> +    bl->claim_append(blread);
>    }
>  
>    // don't move pointer if the read failed
> @@ -5935,22 +5981,91 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
>    // time it.
>    utime_t start = ceph_clock_now(cct);
>  
> -  // copy into fresh buffer (since our write may be resub, async)
> -  bufferptr bp;
> -  if (size > 0) bp = buffer::copy(buf, size);
> -  bufferlist bl;
> -  bl.push_back( bp );
> -
>    utime_t lat;
>    uint64_t totalwritten;
>    uint64_t endoff = offset + size;
>    int have;
> -  int r = get_caps(in, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, &have, endoff);
> +  int want;
> +  if (in->inline_version != CEPH_INLINE_DISABLED) {
> +    want = 0;
> +    if (offset < CEPH_INLINE_SIZE)
> +      want |= CEPH_CAP_FILE_CACHE;
> +    if (offset + size > CEPH_INLINE_SIZE)
> +      want |= CEPH_CAP_FILE_BUFFER;
> +  } else {
> +    want = CEPH_CAP_FILE_BUFFER;
> +  }
> +  int r = get_caps(in, CEPH_CAP_FILE_WR, want, &have, endoff);
>    if (r < 0)
>      return r;
>  
>    ldout(cct, 10) << " snaprealm " << *in->snaprealm << dendl;
>  
> +  bufferlist inlinebl, osdbl;
> +  if (size > 0) {
> +    // copy into fresh buffer (since our write may be resub, async)
> +    bufferptr bp(buf, size);
> +    if ((in->inline_version != CEPH_INLINE_DISABLED) &&
> +        (want & CEPH_CAP_FILE_CACHE)) {
> +
> +      if (want & CEPH_CAP_FILE_BUFFER) {
> +        bufferptr inlinebp(bp, 0, CEPH_INLINE_SIZE - offset);
> +        inlinebl.push_back(inlinebp);
> +        bufferptr osdbp(bp, CEPH_INLINE_SIZE - offset, size + offset - CEPH_INLINE_SIZE);
> +        osdbl.push_back(osdbp);
> +      } else {
> +        inlinebl.push_back(bp);
> +      }
> +
> +      if (have & CEPH_CAP_FILE_CACHE) {
> +        bufferlist bl;
> +
> +        uint32_t len = in->inline_data.length();
> +        if (offset < len) {
> +          bl.substr_of(in->inline_data, 0, offset);
> +        } else {
> +          bl.append(in->inline_data);
> +          if (offset > len)
> +            bl.append_zero(offset - len);
> +        }
> +        bl.append(inlinebl);
> +        if (bl.length() < len)
> +          in->inline_data.copy(bl.length(), len - bl.length(), bl);
> +
> +        if (in->inline_version >= CEPH_INLINE_MIGRATION) {
> +          ldout(cct, 10) << " migrate data." << dendl;
> +          bl.append(osdbl);
> +          osdbl.claim(bl);
> +        } else {
> +          in->inline_data.claim(bl);
> +        }
> +      } else {
> +        encode((uint32_t)offset, in->inline_commit_data);
> +        encode(inlinebl, in->inline_commit_data);
> +      }
> +
> +      if ((in->inline_version < CEPH_INLINE_MIGRATION) ||
> +          !(have & CEPH_CAP_FILE_CACHE)) {
> +        mark_caps_dirty(in, CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER);
> +        check_caps(in, true);
> +      }
> +
> +      if (!(have & CEPH_CAP_FILE_CACHE)) {
> +        in->inline_commit_data.clear();
> +      }
> +    } else {
> +      osdbl.push_back(bp);
> +    }
> +  }
> +
> +  uint64_t osdsize = osdbl.length();
> +  int64_t osdoffset = offset + size - osdsize;
> +  if (osdoffset < 0)
> +    osdoffset = 0;
> +
> +  if (!osdsize)
> +    goto skip_osd;
> +
>    if (cct->_conf->client_oc && (have & CEPH_CAP_FILE_BUFFER)) {
>      // do buffered write
>      if (!in->oset.dirty_or_tx)
> @@ -5960,7 +6075,7 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
>  
>      // async, caching, non-blocking.
>      r = objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(),
> -			         offset, size, bl, ceph_clock_now(cct), 0,
> +			         osdoffset, osdsize, osdbl, ceph_clock_now(cct), 0,
>  			         client_lock);
>  
>      put_cap_ref(in, CEPH_CAP_FILE_BUFFER);
> @@ -5972,7 +6087,7 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
>      // O_DSYNC == O_SYNC on linux < 2.6.33
>      // O_SYNC = __O_SYNC | O_DSYNC on linux >= 2.6.33
>      if ((f->flags & O_SYNC) || (f->flags & O_DSYNC)) {
> -      _flush_range(in, offset, size);
> +      _flush_range(in, osdoffset, osdsize);
>      }
>    } else {
>      // simple, non-atomic sync write
> @@ -5986,7 +6101,7 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
>      get_cap_ref(in, CEPH_CAP_FILE_BUFFER);  // released by onsafe callback
>  
>      r = filer->write_trunc(in->ino, &in->layout, in->snaprealm->get_snap_context(),
> -			   offset, size, bl, ceph_clock_now(cct), 0,
> +			   osdoffset, osdsize, osdbl, ceph_clock_now(cct), 0,
>  			   in->truncate_size, in->truncate_seq,
>  			   onfinish, onsafe);
>      if (r < 0)
> @@ -6001,6 +6116,15 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
>    }
>  
>    // if we get here, write was successful, update client metadata
> +skip_osd:
> +  if ((have & CEPH_CAP_FILE_CACHE) &&
> +      (in->inline_version >= CEPH_INLINE_MIGRATION)) {
> +    ldout(cct, 10) << " disable inline." << dendl;
> +    in->inline_version = CEPH_INLINE_DISABLED;
> +    in->inline_data.clear();
> +    mark_caps_dirty(in, CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER);
> +    check_caps(in, true);
> +  }
>  
>    // time
>    lat = ceph_clock_now(cct);
> diff --git a/src/client/Inode.h b/src/client/Inode.h
> index b33c38e..455fc37 100644
> --- a/src/client/Inode.h
> +++ b/src/client/Inode.h
> @@ -111,6 +111,11 @@ class Inode {
>    version_t version;           // auth only
>    version_t xattr_version;
>  
> +  // inline data
> +  bufferlist inline_data;
> +  version_t  inline_version;
> +  bufferlist inline_commit_data;
> +
>    bool is_symlink() const { return (mode & S_IFMT) == S_IFLNK; }
>    bool is_dir()     const { return (mode & S_IFMT) == S_IFDIR; }
>    bool is_file()    const { return (mode & S_IFMT) == S_IFREG; }
> @@ -205,6 +210,7 @@ class Inode {
>        rdev(0), mode(0), uid(0), gid(0), nlink(0),
>        size(0), truncate_seq(1), truncate_size(-1),
>        time_warp_seq(0), max_size(0), version(0), xattr_version(0),
> +      inline_version(0),
>        flags(0),
>        dir_hashed(false), dir_replicated(false), auth_cap(NULL),
>        dirty_caps(0), flushing_caps(0), flushing_cap_seq(0), shared_gen(0), cache_gen(0),
> diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h
> index 6c41d14..bd81687 100644
> --- a/src/include/ceph_fs.h
> +++ b/src/include/ceph_fs.h
> @@ -471,6 +471,7 @@ struct ceph_mds_reply_inode {
>  	struct ceph_file_layout layout;
>  	struct ceph_timespec ctime, mtime, atime;
>  	__le32 time_warp_seq;
> +	__le32 inline_version;

I would go with __le64 here. 

More importantly, though, we need to make this change backward compatible, 
which means we can't modify ceph_mds_reply_indoe directly (at least not 
without keeping the old struct definition around too).  This is probably a 
good opportunity to move from the struct-based message format definition 
to the encode/decode style (as we've recently done with MOSDOp[Reply]). 
(This will need to get fixed up before this can be merged, but let's get 
the rest of this implementation right first... so don't worry about it 
yet.)

>  	__le64 size, max_size, truncate_size;
>  	__le32 truncate_seq;
>  	__le32 mode, uid, gid;
> @@ -522,6 +523,10 @@ struct ceph_filelock {
>  
>  int ceph_flags_to_mode(int flags);
>  
> +/* inline data state */
> +#define CEPH_INLINE_DISABLED   ((__u32)-1)

again, __u64

> +#define CEPH_INLINE_MIGRATION  (CEPH_INLINE_DISABLED >> 1)
> +#define CEPH_INLINE_SIZE       (1 << 8)
>  
>  /* capability bits */
>  #define CEPH_CAP_PIN         1  /* no specific capabilities beyond the pin */
> @@ -580,6 +585,7 @@ int ceph_flags_to_mode(int flags);
>  #define CEPH_STAT_CAP_MTIME    CEPH_CAP_FILE_SHARED
>  #define CEPH_STAT_CAP_SIZE     CEPH_CAP_FILE_SHARED
>  #define CEPH_STAT_CAP_ATIME    CEPH_CAP_FILE_SHARED  /* fixme */
> +#define CEPH_STAT_CAP_INLINE   CEPH_CAP_FILE_SHARED
>  #define CEPH_STAT_CAP_XATTR    CEPH_CAP_XATTR_SHARED
>  #define CEPH_STAT_CAP_INODE_ALL (CEPH_CAP_PIN |			\
>  				 CEPH_CAP_AUTH_SHARED |	\
> @@ -657,6 +663,9 @@ struct ceph_mds_caps {
>  	struct ceph_timespec mtime, atime, ctime;
>  	struct ceph_file_layout layout;
>  	__le32 time_warp_seq;
> +
> +	/* ilnine data */
> +	__le32 inline_version;

64

>  } __attribute__ ((packed));
>  
>  /* cap release msg head */
> diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc
> index 781ed72..3999078 100644
> --- a/src/mds/CInode.cc
> +++ b/src/mds/CInode.cc
> @@ -2706,6 +2706,17 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
>    e.rfiles = i->rstat.rfiles;
>    e.rsubdirs = i->rstat.rsubdirs;
>  
> +  // inline data
> +  bufferlist inline_data;
> +  if (!cap || (cap->client_inline_version < i->inline_version)) {
> +    e.inline_version = i->inline_version;
> +    inline_data = i->inline_data;
> +  }
> +  if (cap && (cap->client_inline_version < i->inline_version)) {
> +    cap->client_inline_version = i->inline_version;
> +    cap->server_inline_version = i->inline_version;
> +  }
> +
>    // auth
>    i = pauth ? pi:oi;
>    e.mode = i->mode;
> @@ -2738,6 +2749,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
>      bytes += (sizeof(__u32) + sizeof(__u32)) * dirfragtree._splits.size();
>      bytes += sizeof(__u32) + symlink.length();
>      bytes += sizeof(__u32) + xbl.length();
> +    bytes += sizeof(__u32) + inline_data.length();
>      if (bytes > max_bytes)
>        return -ENOSPC;
>    }
> @@ -2833,6 +2845,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
>      ::encode(i->dir_layout, bl);
>    }
>    ::encode(xbl, bl);
> +  ::encode(inline_data, bl);
>  
>    return valid;
>  }
> @@ -2865,6 +2878,15 @@ void CInode::encode_cap_message(MClientCaps *m, Capability *cap)
>    i->atime.encode_timeval(&m->head.atime);
>    m->head.time_warp_seq = i->time_warp_seq;
>  
> +  if ((cap->pending() & CEPH_CAP_FILE_CACHE) &&
> +      (i->inline_version > cap->client_inline_version)) {
> +    dout(10) << " push updated inline data version " << i->inline_version << dendl;
> +    m->head.inline_version = i->inline_version;
> +    m->inline_data = i->inline_data;
> +    cap->client_inline_version = i->inline_version;
> +    cap->server_inline_version = i->inline_version;
> +  }
> +  
>    // max_size is min of projected, actual.
>    uint64_t oldms = oi->client_ranges.count(client) ? oi->client_ranges[client].range.last : 0;
>    uint64_t newms = pi->client_ranges.count(client) ? pi->client_ranges[client].range.last : 0;
> diff --git a/src/mds/Capability.h b/src/mds/Capability.h
> index 54d2312..aa8fbb9 100644
> --- a/src/mds/Capability.h
> +++ b/src/mds/Capability.h
> @@ -207,6 +207,8 @@ private:
>  public:
>    snapid_t client_follows;
>    version_t client_xattr_version;
> +  version_t client_inline_version;
> +  version_t server_inline_version;
>    
>    xlist<Capability*>::item item_session_caps;
>    xlist<Capability*>::item item_snaprealm_caps;
> @@ -221,6 +223,7 @@ public:
>      mseq(0),
>      suppress(0), stale(false),
>      client_follows(0), client_xattr_version(0),
> +    client_inline_version(0), server_inline_version(0),
>      item_session_caps(this), item_snaprealm_caps(this) {
>      g_num_cap++;
>      g_num_capa++;
> diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc
> index c5ddb92..a45a4db 100644
> --- a/src/mds/Locker.cc
> +++ b/src/mds/Locker.cc
> @@ -2844,6 +2844,54 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
>  
>    _update_cap_fields(in, dirty, m, pi);
>  
> +  if ((dirty & CEPH_CAP_FILE_WR) && 
> +      (dirty & CEPH_CAP_FILE_BUFFER) && 
> +      in->inode.is_file()) {
> +    if (!m->get_inline_version()) {
> +      bufferlist::iterator p = m->inline_data.begin();
> +
> +      uint32_t offset = 0;
> +      bufferlist inlinebl;
> +      if (!p.end())
> +        decode(offset, p);
> +      if (!p.end())
> +        decode(inlinebl, p);
> +
> +      if (offset + inlinebl.length() > 0) {
> +        bufferlist bl;
> +
> +        uint32_t len = pi->inline_data.length();
> +        if (offset < len) {
> +          bl.substr_of(pi->inline_data, 0, offset);
> +        } else {
> +          bl.append(pi->inline_data);
> +          if (offset > len)
> +            bl.append_zero(offset - len);
> +        }
> +        bl.append(inlinebl);
> +        if (bl.length() < len)
> +          pi->inline_data.copy(bl.length(), len - bl.length(), bl);
> +
> +        pi->inline_data.claim(bl);
> +        pi->inline_version++;
> +        if ((pi->size > CEPH_INLINE_SIZE) && (pi->inline_version < CEPH_INLINE_MIGRATION))
> +          pi->inline_version = CEPH_INLINE_MIGRATION;
> +      }
> +    } else {
> +      if (cap && 
> +          (m->get_inline_version() >= cap->client_inline_version) &&
> +          (cap->server_inline_version == pi->inline_version)) {
> +        pi->inline_data = m->inline_data;
> +        version_t newversion = pi->inline_version + 1;
> +        if (m->get_inline_version() > newversion)
> +          newversion = m->get_inline_version();
> +        if ((pi->size > CEPH_INLINE_SIZE) && (newversion < CEPH_INLINE_MIGRATION))
> +          newversion = CEPH_INLINE_MIGRATION;
> +        pi->inline_version = cap->server_inline_version = newversion;
> +      }
> +    }
> +  }
> +
>    if (change_max) {
>      dout(7) << "  max_size " << old_max << " -> " << new_max
>  	    << " for " << *in << dendl;
> diff --git a/src/mds/mdstypes.cc b/src/mds/mdstypes.cc
> index b1ce640..bb56122 100644
> --- a/src/mds/mdstypes.cc
> +++ b/src/mds/mdstypes.cc
> @@ -227,6 +227,8 @@ void inode_t::encode(bufferlist &bl) const
>    ::encode(mtime, bl);
>    ::encode(atime, bl);
>    ::encode(time_warp_seq, bl);
> +  ::encode(inline_version, bl);
> +  ::encode(inline_data, bl);
>    ::encode(client_ranges, bl);
>  
>    ::encode(dirstat, bl);
> @@ -273,6 +275,8 @@ void inode_t::decode(bufferlist::iterator &p)
>    ::decode(mtime, p);
>    ::decode(atime, p);
>    ::decode(time_warp_seq, p);
> +  ::decode(inline_version, p);
> +  ::decode(inline_data, p);
>    if (struct_v >= 3) {
>      ::decode(client_ranges, p);
>    } else {
> diff --git a/src/mds/mdstypes.h b/src/mds/mdstypes.h
> index aa9d165..7e8a69a 100644
> --- a/src/mds/mdstypes.h
> +++ b/src/mds/mdstypes.h
> @@ -334,6 +334,8 @@ struct inode_t {
>    utime_t    mtime;   // file data modify time.
>    utime_t    atime;   // file data access time.
>    uint32_t   time_warp_seq;  // count of (potential) mtime/atime timewarps (i.e., utimes())
> +  bufferlist inline_data;
> +  version_t  inline_version;
>  
>    map<client_t,client_writeable_range_t> client_ranges;  // client(s) can write to these ranges
>  
> @@ -355,6 +357,7 @@ struct inode_t {
>  	      size(0), truncate_seq(0), truncate_size(0), truncate_from(0),
>  	      truncate_pending(0),
>  	      time_warp_seq(0),
> +	      inline_version(1),
>  	      version(0), file_data_version(0), xattr_version(0), last_renamed_version(0) { 
>      clear_layout();
>      memset(&dir_layout, 0, sizeof(dir_layout));
> diff --git a/src/messages/MClientCaps.h b/src/messages/MClientCaps.h
> index 117f241..aab28d2 100644
> --- a/src/messages/MClientCaps.h
> +++ b/src/messages/MClientCaps.h
> @@ -29,6 +29,7 @@ class MClientCaps : public Message {
>    bufferlist snapbl;
>    bufferlist xattrbl;
>    bufferlist flockbl;
> +  bufferlist inline_data;
>  
>    int      get_caps() { return head.caps; }
>    int      get_wanted() { return head.wanted; }
> @@ -49,6 +50,7 @@ class MClientCaps : public Message {
>    utime_t get_mtime() { return utime_t(head.mtime); }
>    utime_t get_atime() { return utime_t(head.atime); }
>    __u32 get_time_warp_seq() { return head.time_warp_seq; }
> +  __u32 get_inline_version() { return head.inline_version; }
>  
>    ceph_file_layout& get_layout() { return head.layout; }
>  
> @@ -151,6 +153,8 @@ public:
>      // conditionally decode flock metadata
>      if (header.version >= 2)
>        ::decode(flockbl, p);
> +
> +    ::decode(inline_data, p);
>    }
>    void encode_payload(uint64_t features) {
>      head.snap_trace_len = snapbl.length();
> @@ -166,6 +170,8 @@ public:
>      } else {
>        header.version = 1;  // old
>      }
> +
> +    ::encode(inline_data, payload);
>    }
>  };
>  
> diff --git a/src/messages/MClientReply.h b/src/messages/MClientReply.h
> index 896245f..b15e843 100644
> --- a/src/messages/MClientReply.h
> +++ b/src/messages/MClientReply.h
> @@ -108,6 +108,8 @@ struct InodeStat {
>    uint64_t truncate_size;
>    utime_t ctime, mtime, atime;
>    version_t time_warp_seq;
> +  bufferlist inline_data;
> +  version_t inline_version;
>  
>    frag_info_t dirstat;
>    nest_info_t rstat;
> @@ -144,6 +146,7 @@ struct InodeStat {
>      mtime.decode_timeval(&e.mtime);
>      atime.decode_timeval(&e.atime);
>      time_warp_seq = e.time_warp_seq;
> +    inline_version = e.inline_version;
>      mode = e.mode;
>      uid = e.uid;
>      gid = e.gid;
> @@ -174,6 +177,7 @@ struct InodeStat {
>  
>      xattr_version = e.xattr_version;
>      ::decode(xattrbl, p);
> +    ::decode(inline_data, p);
>    }
>    
>    // see CInode::encode_inodestat for encoder.
> -- 
> 1.7.9.5
> 
> 
> 
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/src/client/Client.cc b/src/client/Client.cc
index bddfa0a..fe947be 100644
--- a/src/client/Client.cc
+++ b/src/client/Client.cc
@@ -632,6 +632,11 @@  Inode * Client::add_update_inode(InodeStat *st, utime_t from, MetaSession *sessi
     in->layout = st->layout;
     in->ctime = st->ctime;
     in->max_size = st->max_size;  // right?
+
+    if (st->inline_version > in->inline_version) {
+      in->inline_version = st->inline_version;
+      in->inline_data = st->inline_data;
+    }
   
     update_inode_file_bits(in, st->truncate_seq, st->truncate_size, st->size,
 			   st->time_warp_seq, st->ctime, st->mtime, st->atime,
@@ -2321,6 +2326,17 @@  void Client::send_cap(Inode *in, MetaSession *session, Cap *cap,
     ::encode(in->xattrs, m->xattrbl);
     m->head.xattr_version = in->xattr_version;
   }
+
+  if ((flush & CEPH_CAP_FILE_WR) &&
+      (flush & CEPH_CAP_FILE_BUFFER)) {
+    if (in->inline_commit_data.length()) {
+      m->head.inline_version = 0;
+      m->inline_data = in->inline_commit_data;
+    } else {
+      m->head.inline_version = in->inline_version;
+      m->inline_data = in->inline_data;
+    }
+  }
   
   m->head.layout = in->layout;
   m->head.size = in->size;
@@ -3550,6 +3566,13 @@  void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient
     ::decode(in->xattrs, p);
     in->xattr_version = m->head.xattr_version;
   }
+  if ((new_caps & CEPH_CAP_FILE_CACHE) &&
+      (m->get_inline_version() >= in->inline_version)) {
+    ldout(cct, 10) << " update inline version " << m->get_inline_version() << dendl;
+    in->inline_version = m->get_inline_version();
+    in->inline_data = m->inline_data;
+  }
+
   update_inode_file_bits(in, m->get_truncate_seq(), m->get_truncate_size(), m->get_size(),
 			 m->get_time_warp_seq(), m->get_ctime(), m->get_mtime(), m->get_atime(), issued);
 
@@ -5637,6 +5660,13 @@  int Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
   Inode *in = f->inode;
 
   //bool lazy = f->mode == CEPH_FILE_MODE_LAZY;
+  if ((in->inline_version != CEPH_INLINE_DISABLED) &&
+      !(in->caps_issued(NULL) & CEPH_CAP_FILE_CACHE)) {
+    ldout(cct, 10) << " read inline from mds." << dendl;
+    int ret = _getattr(in, CEPH_STAT_CAP_INLINE, -1, -1, true);
+    if (ret < 0)
+      return ret;
+  }
 
   int have;
   int r = get_caps(in, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_CACHE, &have, -1);
@@ -5650,15 +5680,31 @@  int Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
     movepos = true;
   }
 
-  if (!conf->client_debug_force_sync_read &&
-      (cct->_conf->client_oc && (have & CEPH_CAP_FILE_CACHE))) {
+  unsigned int inlinesize = 0;
+  if (in->inline_version != CEPH_INLINE_DISABLED) {
+    if (offset < in->inline_data.length()) {
+      inlinesize = in->inline_data.length() - offset;
+      if (inlinesize > size)
+        inlinesize = size;
+      in->inline_data.copy(offset, inlinesize, *bl);
+    }
+  }
+
+  if (size > inlinesize) {
+    bufferlist blread;
+    int64_t bloffset = offset + inlinesize;
+    uint64_t blsize = size - inlinesize;
+    if (!conf->client_debug_force_sync_read &&
+        (cct->_conf->client_oc && (have & CEPH_CAP_FILE_CACHE))) {
 
-    if (f->flags & O_RSYNC) {
-      _flush_range(in, offset, size);
+      if (f->flags & O_RSYNC) {
+        _flush_range(in, bloffset, blsize);
+      }
+      r = _read_async(f, bloffset, blsize, &blread);
+    } else {
+      r = _read_sync(f, bloffset, blsize, &blread);
     }
-    r = _read_async(f, offset, size, bl);
-  } else {
-    r = _read_sync(f, offset, size, bl);
+    bl->claim_append(blread);
   }
 
   // don't move pointer if the read failed
@@ -5935,22 +5981,91 @@  int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
   // time it.
   utime_t start = ceph_clock_now(cct);
 
-  // copy into fresh buffer (since our write may be resub, async)
-  bufferptr bp;
-  if (size > 0) bp = buffer::copy(buf, size);
-  bufferlist bl;
-  bl.push_back( bp );
-
   utime_t lat;
   uint64_t totalwritten;
   uint64_t endoff = offset + size;
   int have;
-  int r = get_caps(in, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, &have, endoff);
+  int want;
+  if (in->inline_version != CEPH_INLINE_DISABLED) {
+    want = 0;
+    if (offset < CEPH_INLINE_SIZE)
+      want |= CEPH_CAP_FILE_CACHE;
+    if (offset + size > CEPH_INLINE_SIZE)
+      want |= CEPH_CAP_FILE_BUFFER;
+  } else {
+    want = CEPH_CAP_FILE_BUFFER;
+  }
+  int r = get_caps(in, CEPH_CAP_FILE_WR, want, &have, endoff);
   if (r < 0)
     return r;
 
   ldout(cct, 10) << " snaprealm " << *in->snaprealm << dendl;
 
+  bufferlist inlinebl, osdbl;
+  if (size > 0) {
+    // copy into fresh buffer (since our write may be resub, async)
+    bufferptr bp(buf, size);
+    if ((in->inline_version != CEPH_INLINE_DISABLED) &&
+        (want & CEPH_CAP_FILE_CACHE)) {
+
+      if (want & CEPH_CAP_FILE_BUFFER) {
+        bufferptr inlinebp(bp, 0, CEPH_INLINE_SIZE - offset);
+        inlinebl.push_back(inlinebp);
+        bufferptr osdbp(bp, CEPH_INLINE_SIZE - offset, size + offset - CEPH_INLINE_SIZE);
+        osdbl.push_back(osdbp);
+      } else {
+        inlinebl.push_back(bp);
+      }
+
+      if (have & CEPH_CAP_FILE_CACHE) {
+        bufferlist bl;
+
+        uint32_t len = in->inline_data.length();
+        if (offset < len) {
+          bl.substr_of(in->inline_data, 0, offset);
+        } else {
+          bl.append(in->inline_data);
+          if (offset > len)
+            bl.append_zero(offset - len);
+        }
+        bl.append(inlinebl);
+        if (bl.length() < len)
+          in->inline_data.copy(bl.length(), len - bl.length(), bl);
+
+        if (in->inline_version >= CEPH_INLINE_MIGRATION) {
+          ldout(cct, 10) << " migrate data." << dendl;
+          bl.append(osdbl);
+          osdbl.claim(bl);
+        } else {
+          in->inline_data.claim(bl);
+        }
+      } else {
+        encode((uint32_t)offset, in->inline_commit_data);
+        encode(inlinebl, in->inline_commit_data);
+      }
+
+      if ((in->inline_version < CEPH_INLINE_MIGRATION) ||
+          !(have & CEPH_CAP_FILE_CACHE)) {
+        mark_caps_dirty(in, CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER);
+        check_caps(in, true);
+      }
+
+      if (!(have & CEPH_CAP_FILE_CACHE)) {
+        in->inline_commit_data.clear();
+      }
+    } else {
+      osdbl.push_back(bp);
+    }
+  }
+
+  uint64_t osdsize = osdbl.length();
+  int64_t osdoffset = offset + size - osdsize;
+  if (osdoffset < 0)
+    osdoffset = 0;
+
+  if (!osdsize)
+    goto skip_osd;
+
   if (cct->_conf->client_oc && (have & CEPH_CAP_FILE_BUFFER)) {
     // do buffered write
     if (!in->oset.dirty_or_tx)
@@ -5960,7 +6075,7 @@  int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
 
     // async, caching, non-blocking.
     r = objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(),
-			         offset, size, bl, ceph_clock_now(cct), 0,
+			         osdoffset, osdsize, osdbl, ceph_clock_now(cct), 0,
 			         client_lock);
 
     put_cap_ref(in, CEPH_CAP_FILE_BUFFER);
@@ -5972,7 +6087,7 @@  int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
     // O_DSYNC == O_SYNC on linux < 2.6.33
     // O_SYNC = __O_SYNC | O_DSYNC on linux >= 2.6.33
     if ((f->flags & O_SYNC) || (f->flags & O_DSYNC)) {
-      _flush_range(in, offset, size);
+      _flush_range(in, osdoffset, osdsize);
     }
   } else {
     // simple, non-atomic sync write
@@ -5986,7 +6101,7 @@  int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
     get_cap_ref(in, CEPH_CAP_FILE_BUFFER);  // released by onsafe callback
 
     r = filer->write_trunc(in->ino, &in->layout, in->snaprealm->get_snap_context(),
-			   offset, size, bl, ceph_clock_now(cct), 0,
+			   osdoffset, osdsize, osdbl, ceph_clock_now(cct), 0,
 			   in->truncate_size, in->truncate_seq,
 			   onfinish, onsafe);
     if (r < 0)
@@ -6001,6 +6116,15 @@  int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
   }
 
   // if we get here, write was successful, update client metadata
+skip_osd:
+  if ((have & CEPH_CAP_FILE_CACHE) &&
+      (in->inline_version >= CEPH_INLINE_MIGRATION)) {
+    ldout(cct, 10) << " disable inline." << dendl;
+    in->inline_version = CEPH_INLINE_DISABLED;
+    in->inline_data.clear();
+    mark_caps_dirty(in, CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER);
+    check_caps(in, true);
+  }
 
   // time
   lat = ceph_clock_now(cct);
diff --git a/src/client/Inode.h b/src/client/Inode.h
index b33c38e..455fc37 100644
--- a/src/client/Inode.h
+++ b/src/client/Inode.h
@@ -111,6 +111,11 @@  class Inode {
   version_t version;           // auth only
   version_t xattr_version;
 
+  // inline data
+  bufferlist inline_data;
+  version_t  inline_version;
+  bufferlist inline_commit_data;
+
   bool is_symlink() const { return (mode & S_IFMT) == S_IFLNK; }
   bool is_dir()     const { return (mode & S_IFMT) == S_IFDIR; }
   bool is_file()    const { return (mode & S_IFMT) == S_IFREG; }
@@ -205,6 +210,7 @@  class Inode {
       rdev(0), mode(0), uid(0), gid(0), nlink(0),
       size(0), truncate_seq(1), truncate_size(-1),
       time_warp_seq(0), max_size(0), version(0), xattr_version(0),
+      inline_version(0),
       flags(0),
       dir_hashed(false), dir_replicated(false), auth_cap(NULL),
       dirty_caps(0), flushing_caps(0), flushing_cap_seq(0), shared_gen(0), cache_gen(0),
diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h
index 6c41d14..bd81687 100644
--- a/src/include/ceph_fs.h
+++ b/src/include/ceph_fs.h
@@ -471,6 +471,7 @@  struct ceph_mds_reply_inode {
 	struct ceph_file_layout layout;
 	struct ceph_timespec ctime, mtime, atime;
 	__le32 time_warp_seq;
+	__le32 inline_version;
 	__le64 size, max_size, truncate_size;
 	__le32 truncate_seq;
 	__le32 mode, uid, gid;
@@ -522,6 +523,10 @@  struct ceph_filelock {
 
 int ceph_flags_to_mode(int flags);
 
+/* inline data state */
+#define CEPH_INLINE_DISABLED   ((__u32)-1)
+#define CEPH_INLINE_MIGRATION  (CEPH_INLINE_DISABLED >> 1)
+#define CEPH_INLINE_SIZE       (1 << 8)
 
 /* capability bits */
 #define CEPH_CAP_PIN         1  /* no specific capabilities beyond the pin */
@@ -580,6 +585,7 @@  int ceph_flags_to_mode(int flags);
 #define CEPH_STAT_CAP_MTIME    CEPH_CAP_FILE_SHARED
 #define CEPH_STAT_CAP_SIZE     CEPH_CAP_FILE_SHARED
 #define CEPH_STAT_CAP_ATIME    CEPH_CAP_FILE_SHARED  /* fixme */
+#define CEPH_STAT_CAP_INLINE   CEPH_CAP_FILE_SHARED
 #define CEPH_STAT_CAP_XATTR    CEPH_CAP_XATTR_SHARED
 #define CEPH_STAT_CAP_INODE_ALL (CEPH_CAP_PIN |			\
 				 CEPH_CAP_AUTH_SHARED |	\
@@ -657,6 +663,9 @@  struct ceph_mds_caps {
 	struct ceph_timespec mtime, atime, ctime;
 	struct ceph_file_layout layout;
 	__le32 time_warp_seq;
+
+	/* ilnine data */
+	__le32 inline_version;
 } __attribute__ ((packed));
 
 /* cap release msg head */
diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc
index 781ed72..3999078 100644
--- a/src/mds/CInode.cc
+++ b/src/mds/CInode.cc
@@ -2706,6 +2706,17 @@  int CInode::encode_inodestat(bufferlist& bl, Session *session,
   e.rfiles = i->rstat.rfiles;
   e.rsubdirs = i->rstat.rsubdirs;
 
+  // inline data
+  bufferlist inline_data;
+  if (!cap || (cap->client_inline_version < i->inline_version)) {
+    e.inline_version = i->inline_version;
+    inline_data = i->inline_data;
+  }
+  if (cap && (cap->client_inline_version < i->inline_version)) {
+    cap->client_inline_version = i->inline_version;
+    cap->server_inline_version = i->inline_version;
+  }
+
   // auth
   i = pauth ? pi:oi;
   e.mode = i->mode;
@@ -2738,6 +2749,7 @@  int CInode::encode_inodestat(bufferlist& bl, Session *session,
     bytes += (sizeof(__u32) + sizeof(__u32)) * dirfragtree._splits.size();
     bytes += sizeof(__u32) + symlink.length();
     bytes += sizeof(__u32) + xbl.length();
+    bytes += sizeof(__u32) + inline_data.length();
     if (bytes > max_bytes)
       return -ENOSPC;
   }
@@ -2833,6 +2845,7 @@  int CInode::encode_inodestat(bufferlist& bl, Session *session,
     ::encode(i->dir_layout, bl);
   }
   ::encode(xbl, bl);
+  ::encode(inline_data, bl);
 
   return valid;
 }
@@ -2865,6 +2878,15 @@  void CInode::encode_cap_message(MClientCaps *m, Capability *cap)
   i->atime.encode_timeval(&m->head.atime);
   m->head.time_warp_seq = i->time_warp_seq;
 
+  if ((cap->pending() & CEPH_CAP_FILE_CACHE) &&
+      (i->inline_version > cap->client_inline_version)) {
+    dout(10) << " push updated inline data version " << i->inline_version << dendl;
+    m->head.inline_version = i->inline_version;
+    m->inline_data = i->inline_data;
+    cap->client_inline_version = i->inline_version;
+    cap->server_inline_version = i->inline_version;
+  }
+  
   // max_size is min of projected, actual.
   uint64_t oldms = oi->client_ranges.count(client) ? oi->client_ranges[client].range.last : 0;
   uint64_t newms = pi->client_ranges.count(client) ? pi->client_ranges[client].range.last : 0;
diff --git a/src/mds/Capability.h b/src/mds/Capability.h
index 54d2312..aa8fbb9 100644
--- a/src/mds/Capability.h
+++ b/src/mds/Capability.h
@@ -207,6 +207,8 @@  private:
 public:
   snapid_t client_follows;
   version_t client_xattr_version;
+  version_t client_inline_version;
+  version_t server_inline_version;
   
   xlist<Capability*>::item item_session_caps;
   xlist<Capability*>::item item_snaprealm_caps;
@@ -221,6 +223,7 @@  public:
     mseq(0),
     suppress(0), stale(false),
     client_follows(0), client_xattr_version(0),
+    client_inline_version(0), server_inline_version(0),
     item_session_caps(this), item_snaprealm_caps(this) {
     g_num_cap++;
     g_num_capa++;
diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc
index c5ddb92..a45a4db 100644
--- a/src/mds/Locker.cc
+++ b/src/mds/Locker.cc
@@ -2844,6 +2844,54 @@  bool Locker::_do_cap_update(CInode *in, Capability *cap,
 
   _update_cap_fields(in, dirty, m, pi);
 
+  if ((dirty & CEPH_CAP_FILE_WR) && 
+      (dirty & CEPH_CAP_FILE_BUFFER) && 
+      in->inode.is_file()) {
+    if (!m->get_inline_version()) {
+      bufferlist::iterator p = m->inline_data.begin();
+
+      uint32_t offset = 0;
+      bufferlist inlinebl;
+      if (!p.end())
+        decode(offset, p);
+      if (!p.end())
+        decode(inlinebl, p);
+
+      if (offset + inlinebl.length() > 0) {
+        bufferlist bl;
+
+        uint32_t len = pi->inline_data.length();
+        if (offset < len) {
+          bl.substr_of(pi->inline_data, 0, offset);
+        } else {
+          bl.append(pi->inline_data);
+          if (offset > len)
+            bl.append_zero(offset - len);
+        }
+        bl.append(inlinebl);
+        if (bl.length() < len)
+          pi->inline_data.copy(bl.length(), len - bl.length(), bl);
+
+        pi->inline_data.claim(bl);
+        pi->inline_version++;
+        if ((pi->size > CEPH_INLINE_SIZE) && (pi->inline_version < CEPH_INLINE_MIGRATION))
+          pi->inline_version = CEPH_INLINE_MIGRATION;
+      }
+    } else {
+      if (cap && 
+          (m->get_inline_version() >= cap->client_inline_version) &&
+          (cap->server_inline_version == pi->inline_version)) {
+        pi->inline_data = m->inline_data;
+        version_t newversion = pi->inline_version + 1;
+        if (m->get_inline_version() > newversion)
+          newversion = m->get_inline_version();
+        if ((pi->size > CEPH_INLINE_SIZE) && (newversion < CEPH_INLINE_MIGRATION))
+          newversion = CEPH_INLINE_MIGRATION;
+        pi->inline_version = cap->server_inline_version = newversion;
+      }
+    }
+  }
+
   if (change_max) {
     dout(7) << "  max_size " << old_max << " -> " << new_max
 	    << " for " << *in << dendl;
diff --git a/src/mds/mdstypes.cc b/src/mds/mdstypes.cc
index b1ce640..bb56122 100644
--- a/src/mds/mdstypes.cc
+++ b/src/mds/mdstypes.cc
@@ -227,6 +227,8 @@  void inode_t::encode(bufferlist &bl) const
   ::encode(mtime, bl);
   ::encode(atime, bl);
   ::encode(time_warp_seq, bl);
+  ::encode(inline_version, bl);
+  ::encode(inline_data, bl);
   ::encode(client_ranges, bl);
 
   ::encode(dirstat, bl);
@@ -273,6 +275,8 @@  void inode_t::decode(bufferlist::iterator &p)
   ::decode(mtime, p);
   ::decode(atime, p);
   ::decode(time_warp_seq, p);
+  ::decode(inline_version, p);
+  ::decode(inline_data, p);
   if (struct_v >= 3) {
     ::decode(client_ranges, p);
   } else {
diff --git a/src/mds/mdstypes.h b/src/mds/mdstypes.h
index aa9d165..7e8a69a 100644
--- a/src/mds/mdstypes.h
+++ b/src/mds/mdstypes.h
@@ -334,6 +334,8 @@  struct inode_t {
   utime_t    mtime;   // file data modify time.
   utime_t    atime;   // file data access time.
   uint32_t   time_warp_seq;  // count of (potential) mtime/atime timewarps (i.e., utimes())
+  bufferlist inline_data;
+  version_t  inline_version;
 
   map<client_t,client_writeable_range_t> client_ranges;  // client(s) can write to these ranges
 
@@ -355,6 +357,7 @@  struct inode_t {
 	      size(0), truncate_seq(0), truncate_size(0), truncate_from(0),
 	      truncate_pending(0),
 	      time_warp_seq(0),
+	      inline_version(1),
 	      version(0), file_data_version(0), xattr_version(0), last_renamed_version(0) { 
     clear_layout();
     memset(&dir_layout, 0, sizeof(dir_layout));
diff --git a/src/messages/MClientCaps.h b/src/messages/MClientCaps.h
index 117f241..aab28d2 100644
--- a/src/messages/MClientCaps.h
+++ b/src/messages/MClientCaps.h
@@ -29,6 +29,7 @@  class MClientCaps : public Message {
   bufferlist snapbl;
   bufferlist xattrbl;
   bufferlist flockbl;
+  bufferlist inline_data;
 
   int      get_caps() { return head.caps; }
   int      get_wanted() { return head.wanted; }
@@ -49,6 +50,7 @@  class MClientCaps : public Message {
   utime_t get_mtime() { return utime_t(head.mtime); }
   utime_t get_atime() { return utime_t(head.atime); }
   __u32 get_time_warp_seq() { return head.time_warp_seq; }
+  __u32 get_inline_version() { return head.inline_version; }
 
   ceph_file_layout& get_layout() { return head.layout; }
 
@@ -151,6 +153,8 @@  public:
     // conditionally decode flock metadata
     if (header.version >= 2)
       ::decode(flockbl, p);
+
+    ::decode(inline_data, p);
   }
   void encode_payload(uint64_t features) {
     head.snap_trace_len = snapbl.length();
@@ -166,6 +170,8 @@  public:
     } else {
       header.version = 1;  // old
     }
+
+    ::encode(inline_data, payload);
   }
 };
 
diff --git a/src/messages/MClientReply.h b/src/messages/MClientReply.h
index 896245f..b15e843 100644
--- a/src/messages/MClientReply.h
+++ b/src/messages/MClientReply.h
@@ -108,6 +108,8 @@  struct InodeStat {
   uint64_t truncate_size;
   utime_t ctime, mtime, atime;
   version_t time_warp_seq;
+  bufferlist inline_data;
+  version_t inline_version;
 
   frag_info_t dirstat;
   nest_info_t rstat;
@@ -144,6 +146,7 @@  struct InodeStat {
     mtime.decode_timeval(&e.mtime);
     atime.decode_timeval(&e.atime);
     time_warp_seq = e.time_warp_seq;
+    inline_version = e.inline_version;
     mode = e.mode;
     uid = e.uid;
     gid = e.gid;
@@ -174,6 +177,7 @@  struct InodeStat {
 
     xattr_version = e.xattr_version;
     ::decode(xattrbl, p);
+    ::decode(inline_data, p);
   }
   
   // see CInode::encode_inodestat for encoder.