diff mbox

[10/11] libceph: follow redirect replies from osds

Message ID 1390838333-32266-11-git-send-email-ilya.dryomov@inktank.com (mailing list archive)
State New, archived
Headers show

Commit Message

Ilya Dryomov Jan. 27, 2014, 3:58 p.m. UTC
Follow redirect replies from osds, for details see ceph.git commit
fbbe3ad1220799b7bb00ea30fce581c5eadaf034.

v1 (current) version of redirect reply consists of oloc and oid, which
expands to pool, key, nspace, hash and oid.  However, server-side code
that would populate anything other than pool doesn't exist yet, and
hence this commit adds support for pool redirects only.  To make sure
that future server-side updates don't break us, we decode all fields
and, if any of key, nspace, hash or oid have a non-default value, error
out with "corrupt osd_op_reply ..." message.

Signed-off-by: Ilya Dryomov <ilya.dryomov@inktank.com>
---
 include/linux/ceph/osd_client.h |    6 ++
 net/ceph/osd_client.c           |  167 ++++++++++++++++++++++++++++++++++++---
 2 files changed, 164 insertions(+), 9 deletions(-)

Comments

Sage Weil Jan. 27, 2014, 6:32 p.m. UTC | #1
On Mon, 27 Jan 2014, Ilya Dryomov wrote:
> Follow redirect replies from osds, for details see ceph.git commit
> fbbe3ad1220799b7bb00ea30fce581c5eadaf034.
> 
> v1 (current) version of redirect reply consists of oloc and oid, which
> expands to pool, key, nspace, hash and oid.  However, server-side code
> that would populate anything other than pool doesn't exist yet, and
> hence this commit adds support for pool redirects only.  To make sure
> that future server-side updates don't break us, we decode all fields
> and, if any of key, nspace, hash or oid have a non-default value, error
> out with "corrupt osd_op_reply ..." message.
> 
> Signed-off-by: Ilya Dryomov <ilya.dryomov@inktank.com>
> ---
>  include/linux/ceph/osd_client.h |    6 ++
>  net/ceph/osd_client.c           |  167 ++++++++++++++++++++++++++++++++++++---
>  2 files changed, 164 insertions(+), 9 deletions(-)
> 
> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> index 3170ca6d98b2..fd47e872ebcc 100644
> --- a/include/linux/ceph/osd_client.h
> +++ b/include/linux/ceph/osd_client.h
> @@ -155,6 +155,8 @@ struct ceph_osd_request {
>  
>  	struct ceph_object_locator r_base_oloc;
>  	struct ceph_object_id r_base_oid;
> +	struct ceph_object_locator r_target_oloc;
> +	struct ceph_object_id r_target_oid;
>  
>  	u64               r_snapid;
>  	unsigned long     r_stamp;            /* send OR check time */
> @@ -162,6 +164,10 @@ struct ceph_osd_request {
>  	struct ceph_snap_context *r_snapc;    /* snap context for writes */
>  };
>  
> +struct ceph_request_redirect {
> +	struct ceph_object_locator oloc;
> +};
> +
>  struct ceph_osd_event {
>  	u64 cookie;
>  	int one_shot;
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index 3997a87c4f51..010ff3bd58ad 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -369,6 +369,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
>  	INIT_LIST_HEAD(&req->r_osd_item);
>  
>  	req->r_base_oloc.pool = -1;
> +	req->r_target_oloc.pool = -1;
>  
>  	/* create reply message */
>  	if (use_mempool)
> @@ -1256,23 +1257,36 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap,
>  			     struct ceph_osd_request *req,
>  			     struct ceph_pg *pg_out)
>  {
> -	if ((req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
> +	bool need_check_tiering;
> +
> +	need_check_tiering = false;
> +	if (req->r_target_oloc.pool == -1) {
> +		req->r_target_oloc = req->r_base_oloc; /* struct */
> +		need_check_tiering = true;
> +	}
> +	if (req->r_target_oid.name_len == 0) {
> +		ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
> +		need_check_tiering = true;
> +	}
> +
> +	if (need_check_tiering &&
> +	    (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
>  		struct ceph_pg_pool_info *pi;
>  
> -		pi = ceph_pg_pool_by_id(osdmap, req->r_base_oloc.pool);
> +		pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
>  		if (pi) {
>  			if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
>  			    pi->read_tier >= 0)
> -				req->r_base_oloc.pool = pi->read_tier;
> +				req->r_target_oloc.pool = pi->read_tier;
>  			if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
>  			    pi->write_tier >= 0)
> -				req->r_base_oloc.pool = pi->write_tier;
> +				req->r_target_oloc.pool = pi->write_tier;
>  		}
>  		/* !pi is caught in ceph_oloc_oid_to_pg() */
>  	}
>  
> -	return ceph_oloc_oid_to_pg(osdmap, &req->r_base_oloc,
> -				   &req->r_base_oid, pg_out);
> +	return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
> +				   &req->r_target_oid, pg_out);
>  }
>  
>  /*
> @@ -1382,7 +1396,7 @@ static void __send_request(struct ceph_osd_client *osdc,
>  	/* fill in message content that changes each time we send it */
>  	put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
>  	put_unaligned_le32(req->r_flags, req->r_request_flags);
> -	put_unaligned_le64(req->r_base_oloc.pool, req->r_request_pool);
> +	put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
>  	p = req->r_request_pgid;
>  	ceph_encode_64(&p, req->r_pgid.pool);
>  	ceph_encode_32(&p, req->r_pgid.seed);
> @@ -1483,6 +1497,109 @@ static void handle_osds_timeout(struct work_struct *work)
>  			      round_jiffies_relative(delay));
>  }
>  
> +static int ceph_oloc_decode(void **p, void *end,
> +			    struct ceph_object_locator *oloc)
> +{
> +	u8 struct_v, struct_cv;
> +	u32 len;
> +	void *struct_end;
> +	int ret = 0;
> +
> +	ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
> +	struct_v = ceph_decode_8(p);
> +	struct_cv = ceph_decode_8(p);
> +	if (struct_v < 3) {
> +		pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
> +			struct_v, struct_cv);
> +		goto e_inval;
> +	}
> +	if (struct_cv > 6) {
> +		pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
> +			struct_v, struct_cv);
> +		goto e_inval;
> +	}
> +	len = ceph_decode_32(p);
> +	ceph_decode_need(p, end, len, e_inval);
> +	struct_end = *p + len;
> +
> +	oloc->pool = ceph_decode_64(p);
> +	*p += 4; /* skip preferred */
> +
> +	len = ceph_decode_32(p);
> +	if (len > 0) {
> +		pr_warn("ceph_object_locator::key is set\n");
> +		goto e_inval;
> +	}
> +
> +	if (struct_v >= 5) {
> +		len = ceph_decode_32(p);
> +		if (len > 0) {
> +			pr_warn("ceph_object_locator::nspace is set\n");
> +			goto e_inval;
> +		}
> +	}
> +
> +	if (struct_v >= 6) {
> +		s64 hash = ceph_decode_64(p);
> +		if (hash != -1) {
> +			pr_warn("ceph_object_locator::hash is set\n");
> +			goto e_inval;
> +		}
> +	}
> +
> +	/* skip the rest */
> +	*p = struct_end;
> +out:
> +	return ret;
> +
> +e_inval:
> +	ret = -EINVAL;
> +	goto out;
> +}
> +
> +static int ceph_redirect_decode(void **p, void *end,
> +				struct ceph_request_redirect *redir)
> +{
> +	u8 struct_v, struct_cv;
> +	u32 len;
> +	void *struct_end;
> +	int ret;
> +
> +	ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
> +	struct_v = ceph_decode_8(p);
> +	struct_cv = ceph_decode_8(p);
> +	if (struct_cv > 1) {
> +		pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
> +			struct_v, struct_cv);
> +		goto e_inval;
> +	}
> +	len = ceph_decode_32(p);
> +	ceph_decode_need(p, end, len, e_inval);
> +	struct_end = *p + len;
> +
> +	ret = ceph_oloc_decode(p, end, &redir->oloc);
> +	if (ret)
> +		goto out;
> +
> +	len = ceph_decode_32(p);
> +	if (len > 0) {
> +		pr_warn("ceph_request_redirect::object_name is set\n");
> +		goto e_inval;
> +	}
> +
> +	len = ceph_decode_32(p);
> +	*p += len; /* skip osd_instructions */
> +
> +	/* skip the rest */
> +	*p = struct_end;
> +out:
> +	return ret;
> +
> +e_inval:
> +	ret = -EINVAL;
> +	goto out;
> +}
> +
>  static void complete_request(struct ceph_osd_request *req)
>  {
>  	complete_all(&req->r_safe_completion);  /* fsync waiter */
> @@ -1497,6 +1614,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
>  {
>  	void *p, *end;
>  	struct ceph_osd_request *req;
> +	struct ceph_request_redirect redir;
>  	u64 tid;
>  	int object_len;
>  	unsigned int numops;
> @@ -1576,10 +1694,41 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
>  	for (i = 0; i < numops; i++)
>  		req->r_reply_op_result[i] = ceph_decode_32(&p);
>  
> -	already_completed = req->r_got_reply;
> +	if (le16_to_cpu(msg->hdr.version) >= 6) {
> +		p += 8 + 4; /* skip replay_version */
> +		p += 8; /* skip user_version */
>  
> -	if (!req->r_got_reply) {
> +		err = ceph_redirect_decode(&p, end, &redir);
> +		if (err)
> +			goto bad_put;
> +	} else {
> +		redir.oloc.pool = -1;
> +	}
>  
> +	if (redir.oloc.pool != -1) {
> +		dout("redirect pool %lld\n", redir.oloc.pool);
> +
> +		__unregister_request(osdc, req);

I'm a little worried here about dropping request_mutex here... we could 
conceivably have a racing reply from a previous send attempt that does 
something else with this request.  IIRC Josh had a patch that made 
get_reply more aggressively ignore those replies unless it was the most 
recent send attempt... we might want to make sure that gets in here too?

Otherwise, this all looks good!

Reviewed-by:

sage



> +		mutex_unlock(&osdc->request_mutex);
> +
> +		req->r_target_oloc = redir.oloc; /* struct */
> +
> +		/*
> +		 * Start redirect requests with nofail=true.  If
> +		 * mapping fails, request will end up on the notarget
> +		 * list, waiting for the new osdmap (which can take
> +		 * a while), even though the original request mapped
> +		 * successfully.  In the future we might want to follow
> +		 * original request's nofail setting here.
> +		 */
> +		err = ceph_osdc_start_request(osdc, req, true);
> +		BUG_ON(err);
> +
> +		goto done;
> +	}
> +
> +	already_completed = req->r_got_reply;
> +	if (!req->r_got_reply) {
>  		req->r_result = result;
>  		dout("handle_reply result %d bytes %d\n", req->r_result,
>  		     bytes);
> -- 
> 1.7.10.4
> 
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> 
> 
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Ilya Dryomov Jan. 27, 2014, 9:42 p.m. UTC | #2
On Mon, Jan 27, 2014 at 8:32 PM, Sage Weil <sage@inktank.com> wrote:
> On Mon, 27 Jan 2014, Ilya Dryomov wrote:
>> Follow redirect replies from osds, for details see ceph.git commit
>> fbbe3ad1220799b7bb00ea30fce581c5eadaf034.
>>
>> v1 (current) version of redirect reply consists of oloc and oid, which
>> expands to pool, key, nspace, hash and oid.  However, server-side code
>> that would populate anything other than pool doesn't exist yet, and
>> hence this commit adds support for pool redirects only.  To make sure
>> that future server-side updates don't break us, we decode all fields
>> and, if any of key, nspace, hash or oid have a non-default value, error
>> out with "corrupt osd_op_reply ..." message.
>>
>> Signed-off-by: Ilya Dryomov <ilya.dryomov@inktank.com>
>> ---
>>  include/linux/ceph/osd_client.h |    6 ++
>>  net/ceph/osd_client.c           |  167 ++++++++++++++++++++++++++++++++++++---
>>  2 files changed, 164 insertions(+), 9 deletions(-)
>>
>> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
>> index 3170ca6d98b2..fd47e872ebcc 100644
>> --- a/include/linux/ceph/osd_client.h
>> +++ b/include/linux/ceph/osd_client.h
>> @@ -155,6 +155,8 @@ struct ceph_osd_request {
>>
>>       struct ceph_object_locator r_base_oloc;
>>       struct ceph_object_id r_base_oid;
>> +     struct ceph_object_locator r_target_oloc;
>> +     struct ceph_object_id r_target_oid;
>>
>>       u64               r_snapid;
>>       unsigned long     r_stamp;            /* send OR check time */
>> @@ -162,6 +164,10 @@ struct ceph_osd_request {
>>       struct ceph_snap_context *r_snapc;    /* snap context for writes */
>>  };
>>
>> +struct ceph_request_redirect {
>> +     struct ceph_object_locator oloc;
>> +};
>> +
>>  struct ceph_osd_event {
>>       u64 cookie;
>>       int one_shot;
>> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
>> index 3997a87c4f51..010ff3bd58ad 100644
>> --- a/net/ceph/osd_client.c
>> +++ b/net/ceph/osd_client.c
>> @@ -369,6 +369,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
>>       INIT_LIST_HEAD(&req->r_osd_item);
>>
>>       req->r_base_oloc.pool = -1;
>> +     req->r_target_oloc.pool = -1;
>>
>>       /* create reply message */
>>       if (use_mempool)
>> @@ -1256,23 +1257,36 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap,
>>                            struct ceph_osd_request *req,
>>                            struct ceph_pg *pg_out)
>>  {
>> -     if ((req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
>> +     bool need_check_tiering;
>> +
>> +     need_check_tiering = false;
>> +     if (req->r_target_oloc.pool == -1) {
>> +             req->r_target_oloc = req->r_base_oloc; /* struct */
>> +             need_check_tiering = true;
>> +     }
>> +     if (req->r_target_oid.name_len == 0) {
>> +             ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
>> +             need_check_tiering = true;
>> +     }
>> +
>> +     if (need_check_tiering &&
>> +         (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
>>               struct ceph_pg_pool_info *pi;
>>
>> -             pi = ceph_pg_pool_by_id(osdmap, req->r_base_oloc.pool);
>> +             pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
>>               if (pi) {
>>                       if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
>>                           pi->read_tier >= 0)
>> -                             req->r_base_oloc.pool = pi->read_tier;
>> +                             req->r_target_oloc.pool = pi->read_tier;
>>                       if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
>>                           pi->write_tier >= 0)
>> -                             req->r_base_oloc.pool = pi->write_tier;
>> +                             req->r_target_oloc.pool = pi->write_tier;
>>               }
>>               /* !pi is caught in ceph_oloc_oid_to_pg() */
>>       }
>>
>> -     return ceph_oloc_oid_to_pg(osdmap, &req->r_base_oloc,
>> -                                &req->r_base_oid, pg_out);
>> +     return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
>> +                                &req->r_target_oid, pg_out);
>>  }
>>
>>  /*
>> @@ -1382,7 +1396,7 @@ static void __send_request(struct ceph_osd_client *osdc,
>>       /* fill in message content that changes each time we send it */
>>       put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
>>       put_unaligned_le32(req->r_flags, req->r_request_flags);
>> -     put_unaligned_le64(req->r_base_oloc.pool, req->r_request_pool);
>> +     put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
>>       p = req->r_request_pgid;
>>       ceph_encode_64(&p, req->r_pgid.pool);
>>       ceph_encode_32(&p, req->r_pgid.seed);
>> @@ -1483,6 +1497,109 @@ static void handle_osds_timeout(struct work_struct *work)
>>                             round_jiffies_relative(delay));
>>  }
>>
>> +static int ceph_oloc_decode(void **p, void *end,
>> +                         struct ceph_object_locator *oloc)
>> +{
>> +     u8 struct_v, struct_cv;
>> +     u32 len;
>> +     void *struct_end;
>> +     int ret = 0;
>> +
>> +     ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
>> +     struct_v = ceph_decode_8(p);
>> +     struct_cv = ceph_decode_8(p);
>> +     if (struct_v < 3) {
>> +             pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
>> +                     struct_v, struct_cv);
>> +             goto e_inval;
>> +     }
>> +     if (struct_cv > 6) {
>> +             pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
>> +                     struct_v, struct_cv);
>> +             goto e_inval;
>> +     }
>> +     len = ceph_decode_32(p);
>> +     ceph_decode_need(p, end, len, e_inval);
>> +     struct_end = *p + len;
>> +
>> +     oloc->pool = ceph_decode_64(p);
>> +     *p += 4; /* skip preferred */
>> +
>> +     len = ceph_decode_32(p);
>> +     if (len > 0) {
>> +             pr_warn("ceph_object_locator::key is set\n");
>> +             goto e_inval;
>> +     }
>> +
>> +     if (struct_v >= 5) {
>> +             len = ceph_decode_32(p);
>> +             if (len > 0) {
>> +                     pr_warn("ceph_object_locator::nspace is set\n");
>> +                     goto e_inval;
>> +             }
>> +     }
>> +
>> +     if (struct_v >= 6) {
>> +             s64 hash = ceph_decode_64(p);
>> +             if (hash != -1) {
>> +                     pr_warn("ceph_object_locator::hash is set\n");
>> +                     goto e_inval;
>> +             }
>> +     }
>> +
>> +     /* skip the rest */
>> +     *p = struct_end;
>> +out:
>> +     return ret;
>> +
>> +e_inval:
>> +     ret = -EINVAL;
>> +     goto out;
>> +}
>> +
>> +static int ceph_redirect_decode(void **p, void *end,
>> +                             struct ceph_request_redirect *redir)
>> +{
>> +     u8 struct_v, struct_cv;
>> +     u32 len;
>> +     void *struct_end;
>> +     int ret;
>> +
>> +     ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
>> +     struct_v = ceph_decode_8(p);
>> +     struct_cv = ceph_decode_8(p);
>> +     if (struct_cv > 1) {
>> +             pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
>> +                     struct_v, struct_cv);
>> +             goto e_inval;
>> +     }
>> +     len = ceph_decode_32(p);
>> +     ceph_decode_need(p, end, len, e_inval);
>> +     struct_end = *p + len;
>> +
>> +     ret = ceph_oloc_decode(p, end, &redir->oloc);
>> +     if (ret)
>> +             goto out;
>> +
>> +     len = ceph_decode_32(p);
>> +     if (len > 0) {
>> +             pr_warn("ceph_request_redirect::object_name is set\n");
>> +             goto e_inval;
>> +     }
>> +
>> +     len = ceph_decode_32(p);
>> +     *p += len; /* skip osd_instructions */
>> +
>> +     /* skip the rest */
>> +     *p = struct_end;
>> +out:
>> +     return ret;
>> +
>> +e_inval:
>> +     ret = -EINVAL;
>> +     goto out;
>> +}
>> +
>>  static void complete_request(struct ceph_osd_request *req)
>>  {
>>       complete_all(&req->r_safe_completion);  /* fsync waiter */
>> @@ -1497,6 +1614,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
>>  {
>>       void *p, *end;
>>       struct ceph_osd_request *req;
>> +     struct ceph_request_redirect redir;
>>       u64 tid;
>>       int object_len;
>>       unsigned int numops;
>> @@ -1576,10 +1694,41 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
>>       for (i = 0; i < numops; i++)
>>               req->r_reply_op_result[i] = ceph_decode_32(&p);
>>
>> -     already_completed = req->r_got_reply;
>> +     if (le16_to_cpu(msg->hdr.version) >= 6) {
>> +             p += 8 + 4; /* skip replay_version */
>> +             p += 8; /* skip user_version */
>>
>> -     if (!req->r_got_reply) {
>> +             err = ceph_redirect_decode(&p, end, &redir);
>> +             if (err)
>> +                     goto bad_put;
>> +     } else {
>> +             redir.oloc.pool = -1;
>> +     }
>>
>> +     if (redir.oloc.pool != -1) {
>> +             dout("redirect pool %lld\n", redir.oloc.pool);
>> +
>> +             __unregister_request(osdc, req);
>
> I'm a little worried here about dropping request_mutex here... we could
> conceivably have a racing reply from a previous send attempt that does
> something else with this request.  IIRC Josh had a patch that made
> get_reply more aggressively ignore those replies unless it was the most
> recent send attempt... we might want to make sure that gets in here too?

Yes, I asked Josh about those patches (wip-con-race IIRC) and he said
that he wanted to get at least one of them in, but it was never sent to
the mailing list.

Dropping request_mutex here is necessary to satisfy lock ordering (map
semaphore - request mutex) and can probably be avoided by taking map
semaphore unconditionally at the beginning of handle_reply.  I chose to
drop request_mutex because I haven't fully investigated the trade offs
involved.

>
> Otherwise, this all looks good!
>
> Reviewed-by:
>
> sage

Great, I'll add this to the testing branch.  If we decide to do
something about map_sem/request_mutex we get that into rc2.

Thanks,

                Ilya
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 3170ca6d98b2..fd47e872ebcc 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -155,6 +155,8 @@  struct ceph_osd_request {
 
 	struct ceph_object_locator r_base_oloc;
 	struct ceph_object_id r_base_oid;
+	struct ceph_object_locator r_target_oloc;
+	struct ceph_object_id r_target_oid;
 
 	u64               r_snapid;
 	unsigned long     r_stamp;            /* send OR check time */
@@ -162,6 +164,10 @@  struct ceph_osd_request {
 	struct ceph_snap_context *r_snapc;    /* snap context for writes */
 };
 
+struct ceph_request_redirect {
+	struct ceph_object_locator oloc;
+};
+
 struct ceph_osd_event {
 	u64 cookie;
 	int one_shot;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 3997a87c4f51..010ff3bd58ad 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -369,6 +369,7 @@  struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 	INIT_LIST_HEAD(&req->r_osd_item);
 
 	req->r_base_oloc.pool = -1;
+	req->r_target_oloc.pool = -1;
 
 	/* create reply message */
 	if (use_mempool)
@@ -1256,23 +1257,36 @@  static int __calc_request_pg(struct ceph_osdmap *osdmap,
 			     struct ceph_osd_request *req,
 			     struct ceph_pg *pg_out)
 {
-	if ((req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
+	bool need_check_tiering;
+
+	need_check_tiering = false;
+	if (req->r_target_oloc.pool == -1) {
+		req->r_target_oloc = req->r_base_oloc; /* struct */
+		need_check_tiering = true;
+	}
+	if (req->r_target_oid.name_len == 0) {
+		ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
+		need_check_tiering = true;
+	}
+
+	if (need_check_tiering &&
+	    (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
 		struct ceph_pg_pool_info *pi;
 
-		pi = ceph_pg_pool_by_id(osdmap, req->r_base_oloc.pool);
+		pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
 		if (pi) {
 			if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
 			    pi->read_tier >= 0)
-				req->r_base_oloc.pool = pi->read_tier;
+				req->r_target_oloc.pool = pi->read_tier;
 			if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
 			    pi->write_tier >= 0)
-				req->r_base_oloc.pool = pi->write_tier;
+				req->r_target_oloc.pool = pi->write_tier;
 		}
 		/* !pi is caught in ceph_oloc_oid_to_pg() */
 	}
 
-	return ceph_oloc_oid_to_pg(osdmap, &req->r_base_oloc,
-				   &req->r_base_oid, pg_out);
+	return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
+				   &req->r_target_oid, pg_out);
 }
 
 /*
@@ -1382,7 +1396,7 @@  static void __send_request(struct ceph_osd_client *osdc,
 	/* fill in message content that changes each time we send it */
 	put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
 	put_unaligned_le32(req->r_flags, req->r_request_flags);
-	put_unaligned_le64(req->r_base_oloc.pool, req->r_request_pool);
+	put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
 	p = req->r_request_pgid;
 	ceph_encode_64(&p, req->r_pgid.pool);
 	ceph_encode_32(&p, req->r_pgid.seed);
@@ -1483,6 +1497,109 @@  static void handle_osds_timeout(struct work_struct *work)
 			      round_jiffies_relative(delay));
 }
 
+static int ceph_oloc_decode(void **p, void *end,
+			    struct ceph_object_locator *oloc)
+{
+	u8 struct_v, struct_cv;
+	u32 len;
+	void *struct_end;
+	int ret = 0;
+
+	ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
+	struct_v = ceph_decode_8(p);
+	struct_cv = ceph_decode_8(p);
+	if (struct_v < 3) {
+		pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
+			struct_v, struct_cv);
+		goto e_inval;
+	}
+	if (struct_cv > 6) {
+		pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
+			struct_v, struct_cv);
+		goto e_inval;
+	}
+	len = ceph_decode_32(p);
+	ceph_decode_need(p, end, len, e_inval);
+	struct_end = *p + len;
+
+	oloc->pool = ceph_decode_64(p);
+	*p += 4; /* skip preferred */
+
+	len = ceph_decode_32(p);
+	if (len > 0) {
+		pr_warn("ceph_object_locator::key is set\n");
+		goto e_inval;
+	}
+
+	if (struct_v >= 5) {
+		len = ceph_decode_32(p);
+		if (len > 0) {
+			pr_warn("ceph_object_locator::nspace is set\n");
+			goto e_inval;
+		}
+	}
+
+	if (struct_v >= 6) {
+		s64 hash = ceph_decode_64(p);
+		if (hash != -1) {
+			pr_warn("ceph_object_locator::hash is set\n");
+			goto e_inval;
+		}
+	}
+
+	/* skip the rest */
+	*p = struct_end;
+out:
+	return ret;
+
+e_inval:
+	ret = -EINVAL;
+	goto out;
+}
+
+static int ceph_redirect_decode(void **p, void *end,
+				struct ceph_request_redirect *redir)
+{
+	u8 struct_v, struct_cv;
+	u32 len;
+	void *struct_end;
+	int ret;
+
+	ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
+	struct_v = ceph_decode_8(p);
+	struct_cv = ceph_decode_8(p);
+	if (struct_cv > 1) {
+		pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
+			struct_v, struct_cv);
+		goto e_inval;
+	}
+	len = ceph_decode_32(p);
+	ceph_decode_need(p, end, len, e_inval);
+	struct_end = *p + len;
+
+	ret = ceph_oloc_decode(p, end, &redir->oloc);
+	if (ret)
+		goto out;
+
+	len = ceph_decode_32(p);
+	if (len > 0) {
+		pr_warn("ceph_request_redirect::object_name is set\n");
+		goto e_inval;
+	}
+
+	len = ceph_decode_32(p);
+	*p += len; /* skip osd_instructions */
+
+	/* skip the rest */
+	*p = struct_end;
+out:
+	return ret;
+
+e_inval:
+	ret = -EINVAL;
+	goto out;
+}
+
 static void complete_request(struct ceph_osd_request *req)
 {
 	complete_all(&req->r_safe_completion);  /* fsync waiter */
@@ -1497,6 +1614,7 @@  static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
 {
 	void *p, *end;
 	struct ceph_osd_request *req;
+	struct ceph_request_redirect redir;
 	u64 tid;
 	int object_len;
 	unsigned int numops;
@@ -1576,10 +1694,41 @@  static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
 	for (i = 0; i < numops; i++)
 		req->r_reply_op_result[i] = ceph_decode_32(&p);
 
-	already_completed = req->r_got_reply;
+	if (le16_to_cpu(msg->hdr.version) >= 6) {
+		p += 8 + 4; /* skip replay_version */
+		p += 8; /* skip user_version */
 
-	if (!req->r_got_reply) {
+		err = ceph_redirect_decode(&p, end, &redir);
+		if (err)
+			goto bad_put;
+	} else {
+		redir.oloc.pool = -1;
+	}
 
+	if (redir.oloc.pool != -1) {
+		dout("redirect pool %lld\n", redir.oloc.pool);
+
+		__unregister_request(osdc, req);
+		mutex_unlock(&osdc->request_mutex);
+
+		req->r_target_oloc = redir.oloc; /* struct */
+
+		/*
+		 * Start redirect requests with nofail=true.  If
+		 * mapping fails, request will end up on the notarget
+		 * list, waiting for the new osdmap (which can take
+		 * a while), even though the original request mapped
+		 * successfully.  In the future we might want to follow
+		 * original request's nofail setting here.
+		 */
+		err = ceph_osdc_start_request(osdc, req, true);
+		BUG_ON(err);
+
+		goto done;
+	}
+
+	already_completed = req->r_got_reply;
+	if (!req->r_got_reply) {
 		req->r_result = result;
 		dout("handle_reply result %d bytes %d\n", req->r_result,
 		     bytes);