Message ID | 20210616144324.31652-9-julien@xen.org (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | tools/xenstored: Bug fixes + Improve Live-Update | expand |
> On 16 Jun 2021, at 15:43, Julien Grall <julien@xen.org> wrote: > > From: Julien Grall <jgrall@amazon.com> > > Currently, the restore code is considering the stream will contain at > most one in-flight request per connection. In a follow-up changes, we > will want to transfer multiple in-flight requests. > > The function read_state_buffered() is now extended to restore multiple > in-flight request. Complete requests will be queued as delayed > requests, if there a partial request (only the last one can) then it > will used as the current in-flight request. > > Note that we want to bypass the quota check for delayed requests as > the new Xenstore may have a lower limit. > > Lastly, there is no need to change the specification as there was > no restriction on the number of in-flight requests preserved. > > Signed-off-by: Julien Grall <jgrall@amazon.com> Reviewed-by: Luca Fancellu <luca.fancellu@arm.com> > --- > tools/xenstore/xenstored_core.c | 56 ++++++++++++++++++++++++++++----- > 1 file changed, 48 insertions(+), 8 deletions(-) > > diff --git a/tools/xenstore/xenstored_core.c b/tools/xenstore/xenstored_core.c > index a5084a5b173d..5b7ab7f74013 100644 > --- a/tools/xenstore/xenstored_core.c > +++ b/tools/xenstore/xenstored_core.c > @@ -1486,6 +1486,10 @@ static void process_message(struct connection *conn, struct buffered_data *in) > enum xsd_sockmsg_type type = in->hdr.msg.type; > int ret; > > + /* At least send_error() and send_reply() expects conn->in == in */ > + assert(conn->in == in); > + trace_io(conn, in, 0); > + > if ((unsigned int)type >= XS_TYPE_COUNT || !wire_funcs[type].func) { > eprintf("Client unknown operation %i", type); > send_error(conn, ENOSYS); > @@ -1515,6 +1519,23 @@ static void process_message(struct connection *conn, struct buffered_data *in) > conn->transaction = NULL; > } > > +static bool process_delayed_message(struct delayed_request *req) > +{ > + struct connection *conn = req->data; > + struct buffered_data *saved_in = conn->in; > + > + /* > + * Part of process_message() expects conn->in to contains the > + * processed response. So save the current conn->in and restore it > + * afterwards. > + */ > + conn->in = req->in; > + process_message(req->data, req->in); > + conn->in = saved_in; > + > + return true; > +} > + > static void consider_message(struct connection *conn) > { > if (verbose) > @@ -1582,7 +1603,6 @@ static void handle_input(struct connection *conn) > if (in->used != in->hdr.msg.len) > return; > > - trace_io(conn, in, 0); > consider_message(conn); > return; > > @@ -2611,14 +2631,20 @@ void read_state_buffered_data(const void *ctx, struct connection *conn, > unsigned int len; > bool partial = sc->data_resp_len; > > - if (sc->data_in_len) { > + for (data = sc->data; data < sc->data + sc->data_in_len; data += len) { > bdata = new_buffer(conn); > if (!bdata) > barf("error restoring read data"); > - if (sc->data_in_len < sizeof(bdata->hdr)) { > + > + /* > + * We don't know yet if there is more than one message > + * to process. So the len is the size of the leftover data. > + */ > + len = sc->data_in_len - (data - sc->data); > + if (len < sizeof(bdata->hdr)) { > bdata->inhdr = true; > - memcpy(&bdata->hdr, sc->data, sc->data_in_len); > - bdata->used = sc->data_in_len; > + memcpy(&bdata->hdr, sc->data, len); > + bdata->used = len; > } else { > bdata->inhdr = false; > memcpy(&bdata->hdr, sc->data, sizeof(bdata->hdr)); > @@ -2629,12 +2655,26 @@ void read_state_buffered_data(const void *ctx, struct connection *conn, > bdata->hdr.msg.len); > if (!bdata->buffer) > barf("Error allocating in buffer"); > - bdata->used = sc->data_in_len - sizeof(bdata->hdr); > - memcpy(bdata->buffer, sc->data + sizeof(bdata->hdr), > + bdata->used = min_t(unsigned int, > + len - sizeof(bdata->hdr), > + bdata->hdr.msg.len); > + memcpy(bdata->buffer, data + sizeof(bdata->hdr), > bdata->used); > + /* Update len to match the size of the message. */ > + len = bdata->used + sizeof(bdata->hdr); > } > > - conn->in = bdata; > + /* > + * If the message is not complete, then it means this was > + * the current processed message. All the other messages > + * will be queued to be handled after restoring. > + */ > + if (bdata->inhdr || bdata->used != bdata->hdr.msg.len) { > + assert(conn->in == NULL); > + conn->in = bdata; > + } else if (delay_request(conn, bdata, process_delayed_message, > + conn, true)) > + barf("Unable to delay the request"); > } > > for (data = sc->data + sc->data_in_len; > -- > 2.17.1 > >
On 16.06.21 16:43, Julien Grall wrote: > From: Julien Grall <jgrall@amazon.com> > > Currently, the restore code is considering the stream will contain at > most one in-flight request per connection. In a follow-up changes, we > will want to transfer multiple in-flight requests. > > The function read_state_buffered() is now extended to restore multiple > in-flight request. Complete requests will be queued as delayed > requests, if there a partial request (only the last one can) then it > will used as the current in-flight request. > > Note that we want to bypass the quota check for delayed requests as > the new Xenstore may have a lower limit. > > Lastly, there is no need to change the specification as there was > no restriction on the number of in-flight requests preserved. > > Signed-off-by: Julien Grall <jgrall@amazon.com> > --- > tools/xenstore/xenstored_core.c | 56 ++++++++++++++++++++++++++++----- > 1 file changed, 48 insertions(+), 8 deletions(-) > > diff --git a/tools/xenstore/xenstored_core.c b/tools/xenstore/xenstored_core.c > index a5084a5b173d..5b7ab7f74013 100644 > --- a/tools/xenstore/xenstored_core.c > +++ b/tools/xenstore/xenstored_core.c > @@ -1486,6 +1486,10 @@ static void process_message(struct connection *conn, struct buffered_data *in) > enum xsd_sockmsg_type type = in->hdr.msg.type; > int ret; > > + /* At least send_error() and send_reply() expects conn->in == in */ > + assert(conn->in == in); > + trace_io(conn, in, 0); > + > if ((unsigned int)type >= XS_TYPE_COUNT || !wire_funcs[type].func) { > eprintf("Client unknown operation %i", type); > send_error(conn, ENOSYS); > @@ -1515,6 +1519,23 @@ static void process_message(struct connection *conn, struct buffered_data *in) > conn->transaction = NULL; > } > > +static bool process_delayed_message(struct delayed_request *req) > +{ > + struct connection *conn = req->data; > + struct buffered_data *saved_in = conn->in; > + > + /* > + * Part of process_message() expects conn->in to contains the > + * processed response. So save the current conn->in and restore it > + * afterwards. > + */ > + conn->in = req->in; > + process_message(req->data, req->in); > + conn->in = saved_in; This pattern was added to do_lu_start() already, and it will be needed for each other function being called via call_delayed(). Maybe it would be better to add conn explicitly to struct delayed_request (or even better: replace data with conn) and to do the conn->in saving/setting/restoring in call_delayed() instead? Other than that: Reviewed-by: Juergen Gross <jgross@suse.com> Juergen
Hi Juergen, On 24/06/2021 10:30, Juergen Gross wrote: > On 16.06.21 16:43, Julien Grall wrote: >> From: Julien Grall <jgrall@amazon.com> >> >> Currently, the restore code is considering the stream will contain at >> most one in-flight request per connection. In a follow-up changes, we >> will want to transfer multiple in-flight requests. >> >> The function read_state_buffered() is now extended to restore multiple >> in-flight request. Complete requests will be queued as delayed >> requests, if there a partial request (only the last one can) then it >> will used as the current in-flight request. >> >> Note that we want to bypass the quota check for delayed requests as >> the new Xenstore may have a lower limit. >> >> Lastly, there is no need to change the specification as there was >> no restriction on the number of in-flight requests preserved. >> >> Signed-off-by: Julien Grall <jgrall@amazon.com> >> --- >> tools/xenstore/xenstored_core.c | 56 ++++++++++++++++++++++++++++----- >> 1 file changed, 48 insertions(+), 8 deletions(-) >> >> diff --git a/tools/xenstore/xenstored_core.c >> b/tools/xenstore/xenstored_core.c >> index a5084a5b173d..5b7ab7f74013 100644 >> --- a/tools/xenstore/xenstored_core.c >> +++ b/tools/xenstore/xenstored_core.c >> @@ -1486,6 +1486,10 @@ static void process_message(struct connection >> *conn, struct buffered_data *in) >> enum xsd_sockmsg_type type = in->hdr.msg.type; >> int ret; >> + /* At least send_error() and send_reply() expects conn->in == in */ >> + assert(conn->in == in); >> + trace_io(conn, in, 0); >> + >> if ((unsigned int)type >= XS_TYPE_COUNT || !wire_funcs[type].func) > { >> eprintf("Client unknown operation %i", type); >> send_error(conn, ENOSYS); >> @@ -1515,6 +1519,23 @@ static void process_message(struct connection >> *conn, struct buffered_data *in) >> conn->transaction = NULL; >> } >> +static bool process_delayed_message(struct delayed_request *req) >> +{ >> + struct connection *conn = req->data; >> + struct buffered_data *saved_in = conn->in; >> + >> + /* >> + * Part of process_message() expects conn->in to contains the >> + * processed response. So save the current conn->in and restore it >> + * afterwards. >> + */ >> + conn->in = req->in; >> + process_message(req->data, req->in); >> + conn->in = saved_in; > > This pattern was added to do_lu_start() already, and it will be needed > for each other function being called via call_delayed(). > > Maybe it would be better to add conn explicitly to struct > delayed_request (or even better: replace data with conn) and to do the > conn->in saving/setting/restoring in call_delayed() instead? This was my original approach, but I abandoned it because in the case of do_lu_start() we need to save the original conn->in in the stream (see patch #3 for more details). If we overwrite conn->in in call_delayed(), then we need a different way to find the original conn->in. I couldn't find a nice way and decided to settle with the duplication. Cheers,
On 24.06.21 10:42, Julien Grall wrote: > Hi Juergen, > > On 24/06/2021 10:30, Juergen Gross wrote: >> On 16.06.21 16:43, Julien Grall wrote: >>> From: Julien Grall <jgrall@amazon.com> >>> >>> Currently, the restore code is considering the stream will contain at >>> most one in-flight request per connection. In a follow-up changes, we >>> will want to transfer multiple in-flight requests. >>> >>> The function read_state_buffered() is now extended to restore multiple >>> in-flight request. Complete requests will be queued as delayed >>> requests, if there a partial request (only the last one can) then it >>> will used as the current in-flight request. >>> >>> Note that we want to bypass the quota check for delayed requests as >>> the new Xenstore may have a lower limit. >>> >>> Lastly, there is no need to change the specification as there was >>> no restriction on the number of in-flight requests preserved. >>> >>> Signed-off-by: Julien Grall <jgrall@amazon.com> >>> --- >>> tools/xenstore/xenstored_core.c | 56 ++++++++++++++++++++++++++++----- >>> 1 file changed, 48 insertions(+), 8 deletions(-) >>> >>> diff --git a/tools/xenstore/xenstored_core.c >>> b/tools/xenstore/xenstored_core.c >>> index a5084a5b173d..5b7ab7f74013 100644 >>> --- a/tools/xenstore/xenstored_core.c >>> +++ b/tools/xenstore/xenstored_core.c >>> @@ -1486,6 +1486,10 @@ static void process_message(struct connection >>> *conn, struct buffered_data *in) >>> enum xsd_sockmsg_type type = in->hdr.msg.type; >>> int ret; >>> + /* At least send_error() and send_reply() expects conn->in == in */ >>> + assert(conn->in == in); >>> + trace_io(conn, in, 0); >>> + >>> if ((unsigned int)type >= XS_TYPE_COUNT|| !wire_funcs[type].func) >> { >>> eprintf("Client unknown operation %i", type); >>> send_error(conn, ENOSYS); >>> @@ -1515,6 +1519,23 @@ static void process_message(struct connection >>> *conn, struct buffered_data *in) >>> conn->transaction = NULL; >>> } >>> +static bool process_delayed_message(struct delayed_request *req) >>> +{ >>> + struct connection *conn = req->data; >>> + struct buffered_data *saved_in = conn->in; >>> + >>> + /* >>> + * Part of process_message() expects conn->in to contains the >>> + * processed response. So save the current conn->in and restore it >>> + * afterwards. >>> + */ >>> + conn->in = req->in; >>> + process_message(req->data, req->in); >>> + conn->in = saved_in; >> >> This pattern was added to do_lu_start() already, and it will be needed >> for each other function being called via call_delayed(). >> >> Maybe it would be better to add conn explicitly to struct >> delayed_request (or even better: replace data with conn) and to do the >> conn->in saving/setting/restoring in call_delayed() instead? > > This was my original approach, but I abandoned it because in the case of > do_lu_start() we need to save the original conn->in in the stream (see > patch #3 for more details). > > If we overwrite conn->in in call_delayed(), then we need a different way > to find the original conn->in. I couldn't find a nice way and decided to > settle with the duplication. Ah, okay, understood. Even in case we'd be able to restore conn->in it would be the same amount of code again, but harder to follow. Juergen
diff --git a/tools/xenstore/xenstored_core.c b/tools/xenstore/xenstored_core.c index a5084a5b173d..5b7ab7f74013 100644 --- a/tools/xenstore/xenstored_core.c +++ b/tools/xenstore/xenstored_core.c @@ -1486,6 +1486,10 @@ static void process_message(struct connection *conn, struct buffered_data *in) enum xsd_sockmsg_type type = in->hdr.msg.type; int ret; + /* At least send_error() and send_reply() expects conn->in == in */ + assert(conn->in == in); + trace_io(conn, in, 0); + if ((unsigned int)type >= XS_TYPE_COUNT || !wire_funcs[type].func) { eprintf("Client unknown operation %i", type); send_error(conn, ENOSYS); @@ -1515,6 +1519,23 @@ static void process_message(struct connection *conn, struct buffered_data *in) conn->transaction = NULL; } +static bool process_delayed_message(struct delayed_request *req) +{ + struct connection *conn = req->data; + struct buffered_data *saved_in = conn->in; + + /* + * Part of process_message() expects conn->in to contains the + * processed response. So save the current conn->in and restore it + * afterwards. + */ + conn->in = req->in; + process_message(req->data, req->in); + conn->in = saved_in; + + return true; +} + static void consider_message(struct connection *conn) { if (verbose) @@ -1582,7 +1603,6 @@ static void handle_input(struct connection *conn) if (in->used != in->hdr.msg.len) return; - trace_io(conn, in, 0); consider_message(conn); return; @@ -2611,14 +2631,20 @@ void read_state_buffered_data(const void *ctx, struct connection *conn, unsigned int len; bool partial = sc->data_resp_len; - if (sc->data_in_len) { + for (data = sc->data; data < sc->data + sc->data_in_len; data += len) { bdata = new_buffer(conn); if (!bdata) barf("error restoring read data"); - if (sc->data_in_len < sizeof(bdata->hdr)) { + + /* + * We don't know yet if there is more than one message + * to process. So the len is the size of the leftover data. + */ + len = sc->data_in_len - (data - sc->data); + if (len < sizeof(bdata->hdr)) { bdata->inhdr = true; - memcpy(&bdata->hdr, sc->data, sc->data_in_len); - bdata->used = sc->data_in_len; + memcpy(&bdata->hdr, sc->data, len); + bdata->used = len; } else { bdata->inhdr = false; memcpy(&bdata->hdr, sc->data, sizeof(bdata->hdr)); @@ -2629,12 +2655,26 @@ void read_state_buffered_data(const void *ctx, struct connection *conn, bdata->hdr.msg.len); if (!bdata->buffer) barf("Error allocating in buffer"); - bdata->used = sc->data_in_len - sizeof(bdata->hdr); - memcpy(bdata->buffer, sc->data + sizeof(bdata->hdr), + bdata->used = min_t(unsigned int, + len - sizeof(bdata->hdr), + bdata->hdr.msg.len); + memcpy(bdata->buffer, data + sizeof(bdata->hdr), bdata->used); + /* Update len to match the size of the message. */ + len = bdata->used + sizeof(bdata->hdr); } - conn->in = bdata; + /* + * If the message is not complete, then it means this was + * the current processed message. All the other messages + * will be queued to be handled after restoring. + */ + if (bdata->inhdr || bdata->used != bdata->hdr.msg.len) { + assert(conn->in == NULL); + conn->in = bdata; + } else if (delay_request(conn, bdata, process_delayed_message, + conn, true)) + barf("Unable to delay the request"); } for (data = sc->data + sc->data_in_len;