Message ID | 20191219160756.22389-3-yury-kotov@yandex-team.ru (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | Speed up QMP stream reading | expand |
Long delay, compliments of the season. My apologies! Yury Kotov <yury-kotov@yandex-team.ru> writes: > The monitor_qmp_can_read (as a callback of qemu_chr_fe_set_handlers) > should return size of buffer which monitor_qmp_read can process. > Currently, monitor_can_read returns 1, because it guarantees that > only one QMP command can be handled at a time. > Thus, for each QMP command, len(QMD) iterations of the main loop > are required to handle a command. > > This patch adds an input buffer to speed up reading and to keep > the guarantee of executing one command at a time. > > Signed-off-by: Yury Kotov <yury-kotov@yandex-team.ru> > --- > monitor/monitor-internal.h | 11 +++++++++++ > monitor/monitor.c | 27 +++++++++++++++++++++++++++ > monitor/qmp.c | 17 +++++++++++++++-- > 3 files changed, 53 insertions(+), 2 deletions(-) > > diff --git a/monitor/monitor-internal.h b/monitor/monitor-internal.h > index c0ba29abf1..22983b9dda 100644 > --- a/monitor/monitor-internal.h > +++ b/monitor/monitor-internal.h > @@ -32,6 +32,8 @@ > #include "qemu/readline.h" > #include "sysemu/iothread.h" > > +#define MON_INPUT_BUFFER_SIZE 1024 > + > /* > * Supported types: > * > @@ -93,6 +95,11 @@ struct Monitor { > gchar *mon_cpu_path; > QTAILQ_ENTRY(Monitor) entry; > > + /* Must be accessed only by monitor's iothread */ Uh, a QMP monitor need not use an iothread! More on this below at [*]. > + char inbuf[MON_INPUT_BUFFER_SIZE]; This is the only use of macro MON_INPUT_BUFFER_SIZE. Let's scratch the macro. > + int inbuf_pos; > + int inbuf_len; Not immediately obvious: this is a ring buffer. Stating it in the comment would help. @input_pos is the ring buffer's read index, @inbuf_len is the amount of data. I prefer storing the write index instead of the amount of data myself. Matter of taste, okay as it is. I wonder how many open-coded ring buffers we have in the tree... Why is the ring buffer in struct Monitor? Isn't it just for QMP monitors? We should explain somewhere in the code *why* we buffer input. > + > /* > * The per-monitor lock. We can't access guest memory when holding > * the lock. > @@ -169,9 +176,13 @@ void monitor_data_destroy(Monitor *mon); > void monitor_list_append(Monitor *mon); > void monitor_fdsets_cleanup(void); > > +void monitor_inbuf_write(Monitor *mon, const char *buf, int size); > +int monitor_inbuf_read(Monitor *mon, char *buf, int size); > + > void qmp_send_response(MonitorQMP *mon, const QDict *rsp); > void monitor_data_destroy_qmp(MonitorQMP *mon); > void monitor_qmp_bh_dispatcher(void *data); > +void monitor_qmp_handle_inbuf(Monitor *mon); > > int get_monitor_def(int64_t *pval, const char *name); > void help_cmd(Monitor *mon, const char *name); > diff --git a/monitor/monitor.c b/monitor/monitor.c > index d25cc8ea4a..9eb258ac2f 100644 > --- a/monitor/monitor.c > +++ b/monitor/monitor.c > @@ -440,6 +440,29 @@ static gboolean qapi_event_throttle_equal(const void *a, const void *b) > return TRUE; > } > > +void monitor_inbuf_write(Monitor *mon, const char *buf, int size) > +{ > + int pos = mon->inbuf_pos + mon->inbuf_len; > + > + assert(size <= sizeof(mon->inbuf) - mon->inbuf_len); > + while (size-- > 0) { > + mon->inbuf[pos++ % sizeof(mon->inbuf)] = *buf++; > + mon->inbuf_len++; > + } > +} > + > +int monitor_inbuf_read(Monitor *mon, char *buf, int size) > +{ > + int read_bytes = 0; > + > + while (read_bytes < size && mon->inbuf_len > 0) { > + buf[read_bytes++] = mon->inbuf[mon->inbuf_pos++]; > + mon->inbuf_pos %= sizeof(mon->inbuf); > + mon->inbuf_len--; > + } > + return read_bytes; > +} > + If efficiency was a concern, we'd want to use memcpy(). Okay as it is. > int monitor_suspend(Monitor *mon) > { > if (monitor_is_hmp_non_interactive(mon)) { > @@ -465,6 +488,10 @@ static void monitor_accept_input(void *opaque) > Monitor *mon = opaque; > > qemu_chr_fe_accept_input(&mon->chr); > + > + if (mon->is_qmp) { > + monitor_qmp_handle_inbuf(mon); > + } > } Hmm, do we need to adjust when we call qemu_chr_fe_accept_input()? I'll discuss this below at [**]. > > void monitor_resume(Monitor *mon) > diff --git a/monitor/qmp.c b/monitor/qmp.c > index 37884c6c43..9d2634eeb3 100644 > --- a/monitor/qmp.c > +++ b/monitor/qmp.c > @@ -315,14 +315,27 @@ static int monitor_qmp_can_read(void *opaque) > { > Monitor *mon = opaque; > > - return !atomic_mb_read(&mon->suspend_cnt); > + return sizeof(mon->inbuf) - mon->inbuf_len; Can read as much as the ring buffer has space. Good. > +} > + > +void monitor_qmp_handle_inbuf(Monitor *mon) > +{ > + MonitorQMP *mon_qmp = container_of(mon, MonitorQMP, common); > + char ch; > + > + /* Handle only one byte at a time, because monitor may become suspened */ Typo, it's "suspended". I'm afraid the comment doesn't really explain much. It can serve as reminder when you already know what the problem is. When you don't, you're still lost. > + while (!atomic_mb_read(&mon->suspend_cnt) && > + monitor_inbuf_read(mon, &ch, 1)) { > + json_message_parser_feed(&mon_qmp->parser, &ch, 1); > + } > } > > static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size) > { > MonitorQMP *mon = opaque; > > - json_message_parser_feed(&mon->parser, (const char *) buf, size); > + monitor_inbuf_write(&mon->common, (const char *)buf, size); > + monitor_qmp_handle_inbuf(&mon->common); > } > > static QDict *qmp_greeting(MonitorQMP *mon) Now let's revisit the two issues I postponed. [*] Ring buffer access rules to ensure thread safety You state it "must be accessed only by monitor's iothread". Quick recap of monitor iothread use: * There is one shared monitor I/O thread @mon_iothread. * HMP monitors do not use it. * A QMP monitor uses it when its character device has QEMU_CHAR_FEATURE_GCONTEXT. A QMP monitor that doesn't use the iothread obviously cannot satisfy "must be accessed only by monitor's iothread". Does this mean trouble? Ways to access the ring buffer: (1) monitor_inbuf_write() called from monitor_qmp_read(). (2) monitor_inbuf_read() called from monitor_qmp_handle_inbuf() called from monitor_qmp_read() (3) monitor_inbuf_read() called from monitor_qmp_handle_inbuf() called from monitor_accept_input() If the monitor uses @mon_iothread, it arranges for its front end char handlers monitor_qmp_can_read(), monitor_qmp_read() and monitor_qmp_read to run in @mon_iothread. If the monitor does not use @mon_iothread, they run in the main thread instead. Therefore, any QMP monitor's (1) and (2) either run only in @mon_iothread, or only in the main thread. (3) runs in a bottom half scheduled by monitor_resume() to execute in @mon_iothread's AIO context when the monitor uses @mon_iothread, else in the main loop's AIO context. Looks safe to me. The comment needs fixing, though. [**] When to call qemu_chr_fe_accept_input() Before this patch, the monitor can read input only while it's not suspended. It calls qemu_chr_fe_accept_input() soon after it comes out of suspended state: monitor_resume() arranges for it to run in the appropriate AIO context. After this patch, the monitor can read input while the ring buffer has space. The patch does not adjust when qemu_chr_fe_accept_input() is called. Is this okay? Can we ever come out of suspended state with a full ring buffer? If yes, we run qemu_chr_fe_accept_input() even though we can't accept any. Is that bad? Can the ring buffer fill up without the monitor becoming suspended? If yes, we don't run qemu_chr_fe_accept_input() when we can accept input again. Is that bad?
diff --git a/monitor/monitor-internal.h b/monitor/monitor-internal.h index c0ba29abf1..22983b9dda 100644 --- a/monitor/monitor-internal.h +++ b/monitor/monitor-internal.h @@ -32,6 +32,8 @@ #include "qemu/readline.h" #include "sysemu/iothread.h" +#define MON_INPUT_BUFFER_SIZE 1024 + /* * Supported types: * @@ -93,6 +95,11 @@ struct Monitor { gchar *mon_cpu_path; QTAILQ_ENTRY(Monitor) entry; + /* Must be accessed only by monitor's iothread */ + char inbuf[MON_INPUT_BUFFER_SIZE]; + int inbuf_pos; + int inbuf_len; + /* * The per-monitor lock. We can't access guest memory when holding * the lock. @@ -169,9 +176,13 @@ void monitor_data_destroy(Monitor *mon); void monitor_list_append(Monitor *mon); void monitor_fdsets_cleanup(void); +void monitor_inbuf_write(Monitor *mon, const char *buf, int size); +int monitor_inbuf_read(Monitor *mon, char *buf, int size); + void qmp_send_response(MonitorQMP *mon, const QDict *rsp); void monitor_data_destroy_qmp(MonitorQMP *mon); void monitor_qmp_bh_dispatcher(void *data); +void monitor_qmp_handle_inbuf(Monitor *mon); int get_monitor_def(int64_t *pval, const char *name); void help_cmd(Monitor *mon, const char *name); diff --git a/monitor/monitor.c b/monitor/monitor.c index d25cc8ea4a..9eb258ac2f 100644 --- a/monitor/monitor.c +++ b/monitor/monitor.c @@ -440,6 +440,29 @@ static gboolean qapi_event_throttle_equal(const void *a, const void *b) return TRUE; } +void monitor_inbuf_write(Monitor *mon, const char *buf, int size) +{ + int pos = mon->inbuf_pos + mon->inbuf_len; + + assert(size <= sizeof(mon->inbuf) - mon->inbuf_len); + while (size-- > 0) { + mon->inbuf[pos++ % sizeof(mon->inbuf)] = *buf++; + mon->inbuf_len++; + } +} + +int monitor_inbuf_read(Monitor *mon, char *buf, int size) +{ + int read_bytes = 0; + + while (read_bytes < size && mon->inbuf_len > 0) { + buf[read_bytes++] = mon->inbuf[mon->inbuf_pos++]; + mon->inbuf_pos %= sizeof(mon->inbuf); + mon->inbuf_len--; + } + return read_bytes; +} + int monitor_suspend(Monitor *mon) { if (monitor_is_hmp_non_interactive(mon)) { @@ -465,6 +488,10 @@ static void monitor_accept_input(void *opaque) Monitor *mon = opaque; qemu_chr_fe_accept_input(&mon->chr); + + if (mon->is_qmp) { + monitor_qmp_handle_inbuf(mon); + } } void monitor_resume(Monitor *mon) diff --git a/monitor/qmp.c b/monitor/qmp.c index 37884c6c43..9d2634eeb3 100644 --- a/monitor/qmp.c +++ b/monitor/qmp.c @@ -315,14 +315,27 @@ static int monitor_qmp_can_read(void *opaque) { Monitor *mon = opaque; - return !atomic_mb_read(&mon->suspend_cnt); + return sizeof(mon->inbuf) - mon->inbuf_len; +} + +void monitor_qmp_handle_inbuf(Monitor *mon) +{ + MonitorQMP *mon_qmp = container_of(mon, MonitorQMP, common); + char ch; + + /* Handle only one byte at a time, because monitor may become suspened */ + while (!atomic_mb_read(&mon->suspend_cnt) && + monitor_inbuf_read(mon, &ch, 1)) { + json_message_parser_feed(&mon_qmp->parser, &ch, 1); + } } static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size) { MonitorQMP *mon = opaque; - json_message_parser_feed(&mon->parser, (const char *) buf, size); + monitor_inbuf_write(&mon->common, (const char *)buf, size); + monitor_qmp_handle_inbuf(&mon->common); } static QDict *qmp_greeting(MonitorQMP *mon)
The monitor_qmp_can_read (as a callback of qemu_chr_fe_set_handlers) should return size of buffer which monitor_qmp_read can process. Currently, monitor_can_read returns 1, because it guarantees that only one QMP command can be handled at a time. Thus, for each QMP command, len(QMD) iterations of the main loop are required to handle a command. This patch adds an input buffer to speed up reading and to keep the guarantee of executing one command at a time. Signed-off-by: Yury Kotov <yury-kotov@yandex-team.ru> --- monitor/monitor-internal.h | 11 +++++++++++ monitor/monitor.c | 27 +++++++++++++++++++++++++++ monitor/qmp.c | 17 +++++++++++++++-- 3 files changed, 53 insertions(+), 2 deletions(-)