@@ -36,8 +36,9 @@ void json_message_parser_init(JSONMessageParser *parser,
Error *err),
void *opaque, va_list *ap);
-void json_message_parser_feed(JSONMessageParser *parser,
- const char *buffer, size_t size);
+size_t json_message_parser_feed(JSONMessageParser *parser,
+ const char *buffer, size_t size,
+ bool track_qmp);
void json_message_parser_flush(JSONMessageParser *parser);
@@ -367,8 +367,22 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err)
static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size)
{
MonitorQMP *mon = opaque;
-
- json_message_parser_feed(&mon->parser, (const char *) buf, size);
+ char *cursor = (char *) buf;
+ size_t len;
+
+ while (size > 0) {
+ len = json_message_parser_feed(&mon->parser, (const char *) cursor,
+ size, true);
+ cursor += len;
+ size -= len;
+
+ if (size > 0) {
+ /* Let the dispatcher process the QMP command */
+ while (qatomic_mb_read(&mon->common.suspend_cnt)) {
+ g_usleep(20);
+ }
+ }
+ }
}
static QDict *qmp_greeting(MonitorQMP *mon)
@@ -605,7 +605,7 @@ static gboolean channel_event_cb(GIOCondition condition, gpointer data)
case G_IO_STATUS_NORMAL:
buf[count] = 0;
g_debug("read data, count: %d, data: %s", (int)count, buf);
- json_message_parser_feed(&s->parser, (char *)buf, (int)count);
+ json_message_parser_feed(&s->parser, (char *)buf, (int)count, false);
break;
case G_IO_STATUS_EOF:
g_debug("received EOF");
@@ -280,10 +280,11 @@ void json_lexer_init(JSONLexer *lexer, bool enable_interpolation)
lexer->x = lexer->y = 0;
}
-static void json_lexer_feed_char(JSONLexer *lexer, char ch, bool flush)
+static JSONTokenType json_lexer_feed_char(JSONLexer *lexer, char ch, bool flush)
{
int new_state;
bool char_consumed = false;
+ JSONTokenType ret;
lexer->x++;
if (ch == '\n') {
@@ -310,16 +311,16 @@ static void json_lexer_feed_char(JSONLexer *lexer, char ch, bool flush)
case JSON_FLOAT:
case JSON_KEYWORD:
case JSON_STRING:
- json_message_process_token(lexer, lexer->token, new_state,
- lexer->x, lexer->y);
+ ret = json_message_process_token(lexer, lexer->token, new_state,
+ lexer->x, lexer->y);
/* fall through */
case IN_START:
g_string_truncate(lexer->token, 0);
new_state = lexer->start_state;
break;
case JSON_ERROR:
- json_message_process_token(lexer, lexer->token, JSON_ERROR,
- lexer->x, lexer->y);
+ ret = json_message_process_token(lexer, lexer->token, JSON_ERROR,
+ lexer->x, lexer->y);
new_state = IN_RECOVERY;
/* fall through */
case IN_RECOVERY:
@@ -335,20 +336,31 @@ static void json_lexer_feed_char(JSONLexer *lexer, char ch, bool flush)
* this is a security consideration.
*/
if (lexer->token->len > MAX_TOKEN_SIZE) {
- json_message_process_token(lexer, lexer->token, lexer->state,
- lexer->x, lexer->y);
+ ret = json_message_process_token(lexer, lexer->token, lexer->state,
+ lexer->x, lexer->y);
g_string_truncate(lexer->token, 0);
lexer->state = lexer->start_state;
}
+ return ret;
}
-void json_lexer_feed(JSONLexer *lexer, const char *buffer, size_t size)
+/*
+ * Return the number of characters fed until the end of a QMP command or
+ * the buffer size if not any or else not tracked.
+ */
+size_t json_lexer_feed(JSONLexer *lexer, const char *buffer, size_t size,
+ bool track)
{
size_t i;
+ JSONTokenType type = JSON_ERROR;
for (i = 0; i < size; i++) {
- json_lexer_feed_char(lexer, buffer[i], false);
+ if ((type == JSON_QMP_CMD_END) && track) {
+ break;
+ }
+ type = json_lexer_feed_char(lexer, buffer[i], false);
}
+ return i;
}
void json_lexer_flush(JSONLexer *lexer)
@@ -31,6 +31,7 @@ typedef enum json_token_type {
JSON_KEYWORD,
JSON_STRING,
JSON_INTERP,
+ JSON_QMP_CMD_END,
JSON_END_OF_INPUT,
JSON_MAX = JSON_END_OF_INPUT
} JSONTokenType;
@@ -39,13 +40,14 @@ typedef struct JSONToken JSONToken;
/* json-lexer.c */
void json_lexer_init(JSONLexer *lexer, bool enable_interpolation);
-void json_lexer_feed(JSONLexer *lexer, const char *buffer, size_t size);
+size_t json_lexer_feed(JSONLexer *lexer, const char *buffer, size_t size,
+ bool track);
void json_lexer_flush(JSONLexer *lexer);
void json_lexer_destroy(JSONLexer *lexer);
/* json-streamer.c */
-void json_message_process_token(JSONLexer *lexer, GString *input,
- JSONTokenType type, int x, int y);
+JSONTokenType json_message_process_token(JSONLexer *lexer, GString *input,
+ JSONTokenType type, int x, int y);
/* json-parser.c */
JSONToken *json_token(JSONTokenType type, int x, int y, GString *tokstr);
@@ -28,8 +28,8 @@ static void json_message_free_tokens(JSONMessageParser *parser)
}
}
-void json_message_process_token(JSONLexer *lexer, GString *input,
- JSONTokenType type, int x, int y)
+JSONTokenType json_message_process_token(JSONLexer *lexer, GString *input,
+ JSONTokenType type, int x, int y)
{
JSONMessageParser *parser = container_of(lexer, JSONMessageParser, lexer);
QObject *json = NULL;
@@ -54,7 +54,7 @@ void json_message_process_token(JSONLexer *lexer, GString *input,
goto out_emit;
case JSON_END_OF_INPUT:
if (g_queue_is_empty(&parser->tokens)) {
- return;
+ return type;
}
json = json_parser_parse(&parser->tokens, parser->ap, &err);
goto out_emit;
@@ -86,7 +86,7 @@ void json_message_process_token(JSONLexer *lexer, GString *input,
if ((parser->brace_count > 0 || parser->bracket_count > 0)
&& parser->brace_count >= 0 && parser->bracket_count >= 0) {
- return;
+ return type;
}
json = json_parser_parse(&parser->tokens, parser->ap, &err);
@@ -97,6 +97,7 @@ out_emit:
json_message_free_tokens(parser);
parser->token_size = 0;
parser->emit(parser->opaque, json, err);
+ return JSON_QMP_CMD_END;
}
void json_message_parser_init(JSONMessageParser *parser,
@@ -115,10 +116,10 @@ void json_message_parser_init(JSONMessageParser *parser,
json_lexer_init(&parser->lexer, !!ap);
}
-void json_message_parser_feed(JSONMessageParser *parser,
- const char *buffer, size_t size)
+size_t json_message_parser_feed(JSONMessageParser *parser,
+ const char *buffer, size_t size, bool track_qmp)
{
- json_lexer_feed(&parser->lexer, buffer, size);
+ return json_lexer_feed(&parser->lexer, buffer, size, track_qmp);
}
void json_message_parser_flush(JSONMessageParser *parser)
@@ -66,7 +66,7 @@ static QObject *qobject_from_jsonv(const char *string, va_list *ap,
JSONParsingState state = {};
json_message_parser_init(&state.parser, consume_json, &state, ap);
- json_message_parser_feed(&state.parser, string, strlen(string));
+ json_message_parser_feed(&state.parser, string, strlen(string), false);
json_message_parser_flush(&state.parser);
json_message_parser_destroy(&state.parser);
@@ -611,7 +611,7 @@ QDict *qmp_fd_receive(int fd)
if (log) {
len = write(2, &c, 1);
}
- json_message_parser_feed(&qmp.parser, &c, 1);
+ json_message_parser_feed(&qmp.parser, &c, 1, false);
}
json_message_parser_destroy(&qmp.parser);
We are going to allow the QMP monitor reading data from input channel more than one byte at once to increase the performance. With the OOB compatibility disabled, the monitor queues one QMP command at most. It was done for the backward compatibility as stated in the comment before pushing a command into the queue. To keep that concept functional, the monitor should track the end of a single QMP command. It allows the dispatcher handling the command and send a response to client in time. Signed-off-by: Andrey Shinkevich <andrey.shinkevich@virtuozzo.com> --- include/qapi/qmp/json-parser.h | 5 +++-- monitor/qmp.c | 18 ++++++++++++++++-- qga/main.c | 2 +- qobject/json-lexer.c | 30 +++++++++++++++++++++--------- qobject/json-parser-int.h | 8 +++++--- qobject/json-streamer.c | 15 ++++++++------- qobject/qjson.c | 2 +- tests/qtest/libqtest.c | 2 +- 8 files changed, 56 insertions(+), 26 deletions(-)