@@ -202,6 +202,8 @@ extern const char *ceph_osd_state_name(int s);
/* sync */ \
f(SYNC_READ, __CEPH_OSD_OP(RD, DATA, 11), "sync_read") \
\
+ f(CMPEXT, __CEPH_OSD_OP(RD, DATA, 31), "cmpext") \
+ \
/* write */ \
f(WRITE, __CEPH_OSD_OP(WR, DATA, 1), "write") \
f(WRITEFULL, __CEPH_OSD_OP(WR, DATA, 2), "writefull") \
@@ -361,6 +363,7 @@ static inline int ceph_osd_op_uses_extent(int op)
case CEPH_OSD_OP_ZERO:
case CEPH_OSD_OP_APPEND:
case CEPH_OSD_OP_TRIMTRUNC:
+ case CEPH_OSD_OP_CMPEXT:
return true;
default:
return false;
@@ -2999,6 +2999,46 @@ int ReplicatedPG::do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr)
}
}
+int ReplicatedPG::do_extent_cmp(OpContext *ctx, OSDOp& osd_op)
+{
+ ceph_osd_op& op = osd_op.op;
+ vector<OSDOp> read_ops(1);
+ OSDOp& read_op = read_ops[0];
+ int result = 0;
+ uint64_t mismatch_offset = 0;
+
+ read_op.op.op = CEPH_OSD_OP_SYNC_READ;
+ read_op.op.extent.offset = op.extent.offset;
+ read_op.op.extent.length = op.extent.length;
+ read_op.op.extent.truncate_seq = op.extent.truncate_seq;
+ read_op.op.extent.truncate_size = op.extent.truncate_size;
+
+ result = do_osd_ops(ctx, read_ops);
+ if (result < 0) {
+ derr << "do_extent_cmp do_osd_ops failed " << result << dendl;
+ return result;
+ }
+
+ if (read_op.outdata.length() != osd_op.indata.length())
+ goto fail;
+
+ for (uint64_t p = 0; p < osd_op.indata.length(); p++) {
+ if (read_op.outdata[p] != osd_op.indata[p]) {
+ mismatch_offset = p;
+ dout(20) << "mismatch at " << p << " read " << read_op.outdata << " sent " << osd_op.indata << dendl;
+ goto fail;
+ }
+ }
+
+ return 0;
+
+fail:
+ ::encode(mismatch_offset, osd_op.outdata);
+ // should this be ::encode(read_op.outdata, osd_op.outdata);
+ osd_op.outdata.claim_append(read_op.outdata);
+ return -EILSEQ;
+}
+
// ========================================================================
// low level osd ops
@@ -3428,6 +3468,12 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
// --- READS ---
+ case CEPH_OSD_OP_CMPEXT:
+ tracepoint(osd, do_osd_op_pre_extent_cmp, soid.oid.name.c_str(), soid.snap.val, size, seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
+ // TODO: Locking - this op and the write are supposed to be atomic
+ result = do_extent_cmp(ctx, osd_op);
+ break;
+
case CEPH_OSD_OP_SYNC_READ:
if (pool.info.require_rollback()) {
result = -EOPNOTSUPP;
@@ -1382,6 +1382,8 @@ protected:
int do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr);
int do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr);
+ int do_extent_cmp(OpContext *ctx, OSDOp& osd_op);
+
bool pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata);
int get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilter);