@@ -32,6 +32,9 @@
#include "migration/migration.h"
#include "util.h"
+#include "block/aio-wait.h"
+#include "qemu/coroutine.h"
+
#define TYPE_COLO_COMPARE "colo-compare"
#define COLO_COMPARE(obj) \
OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
@@ -77,6 +80,20 @@ static int event_unhandled_count;
* |packet | |packet + |packet | |packet +
* +--------+ +--------+ +--------+ +--------+
*/
+
+typedef struct SendCo {
+ Coroutine *co;
+ GQueue send_list;
+ bool done;
+ int ret;
+} SendCo;
+
+typedef struct SendEntry {
+ uint32_t size;
+ uint32_t vnet_hdr_len;
+ uint8_t buf[];
+} SendEntry;
+
typedef struct CompareState {
Object parent;
@@ -91,6 +108,7 @@ typedef struct CompareState {
SocketReadState pri_rs;
SocketReadState sec_rs;
SocketReadState notify_rs;
+ SendCo sendco;
bool vnet_hdr;
uint32_t compare_timeout;
uint32_t expired_scan_cycle;
@@ -126,8 +144,11 @@ enum {
static int compare_chr_send(CompareState *s,
const uint8_t *buf,
uint32_t size,
- uint32_t vnet_hdr_len,
- bool notify_remote_frame);
+ uint32_t vnet_hdr_len);
+
+static int notify_chr_send(CompareState *s,
+ const uint8_t *buf,
+ uint32_t size);
static bool packet_matches_str(const char *str,
const uint8_t *buf,
@@ -145,7 +166,7 @@ static void notify_remote_frame(CompareState *s)
char msg[] = "DO_CHECKPOINT";
int ret = 0;
- ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
+ ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg));
if (ret < 0) {
error_report("Notify Xen COLO-frame failed");
}
@@ -271,8 +292,7 @@ static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
ret = compare_chr_send(s,
pkt->data,
pkt->size,
- pkt->vnet_hdr_len,
- false);
+ pkt->vnet_hdr_len);
if (ret < 0) {
error_report("colo send primary packet failed");
}
@@ -699,63 +719,123 @@ static void colo_compare_connection(void *opaque, void *user_data)
}
}
-static int compare_chr_send(CompareState *s,
- const uint8_t *buf,
- uint32_t size,
- uint32_t vnet_hdr_len,
- bool notify_remote_frame)
+static void coroutine_fn _compare_chr_send(void *opaque)
{
+ CompareState *s = opaque;
+ SendCo *sendco = &s->sendco;
int ret = 0;
- uint32_t len = htonl(size);
- if (!size) {
- return 0;
- }
+ while (!g_queue_is_empty(&sendco->send_list)) {
+ SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
+ uint32_t len = htonl(entry->size);
- if (notify_remote_frame) {
- ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
- (uint8_t *)&len,
- sizeof(len));
- } else {
ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
- }
- if (ret != sizeof(len)) {
- goto err;
- }
+ if (ret != sizeof(len)) {
+ g_free(entry);
+ goto err;
+ }
- if (s->vnet_hdr) {
- /*
- * We send vnet header len make other module(like filter-redirector)
- * know how to parse net packet correctly.
- */
- len = htonl(vnet_hdr_len);
+ if (s->vnet_hdr) {
+ /*
+ * We send vnet header len make other module(like filter-redirector)
+ * know how to parse net packet correctly.
+ */
+ len = htonl(entry->vnet_hdr_len);
- if (!notify_remote_frame) {
ret = qemu_chr_fe_write_all(&s->chr_out,
(uint8_t *)&len,
sizeof(len));
+
+ if (ret != sizeof(len)) {
+ g_free(entry);
+ goto err;
+ }
}
- if (ret != sizeof(len)) {
+ ret = qemu_chr_fe_write_all(&s->chr_out,
+ (uint8_t *)entry->buf,
+ entry->size);
+
+ if (ret != entry->size) {
+ g_free(entry);
goto err;
}
+
+ g_free(entry);
}
- if (notify_remote_frame) {
- ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
- (uint8_t *)buf,
- size);
- } else {
- ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
+ sendco->ret = 0;
+ goto out;
+
+err:
+ while (!g_queue_is_empty(&sendco->send_list)) {
+ SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
+ g_free(entry);
}
+ sendco->ret = ret < 0 ? ret : -EIO;
+out:
+ sendco->co = NULL;
+ sendco->done = true;
+ aio_wait_kick();
+}
+
+static int compare_chr_send(CompareState *s,
+ const uint8_t *buf,
+ uint32_t size,
+ uint32_t vnet_hdr_len)
+{
+ SendCo *sendco = &s->sendco;
+ SendEntry *entry;
+
+ if (!size) {
+ return 0;
+ }
+
+ entry = g_malloc(sizeof(SendEntry) + size);
+ entry->size = size;
+ entry->vnet_hdr_len = vnet_hdr_len;
+ memcpy(entry->buf, buf, size);
+ g_queue_push_head(&sendco->send_list, entry);
+
+ if (sendco->done) {
+ sendco->co = qemu_coroutine_create(_compare_chr_send, s);
+ sendco->done = false;
+ qemu_coroutine_enter(sendco->co);
+ if (sendco->done) {
+ /* report early errors */
+ return sendco->ret;
+ }
+ }
+
+ /* assume success */
+ return 0;
+}
+
+static int notify_chr_send(CompareState *s,
+ const uint8_t *buf,
+ uint32_t size)
+{
+ int ret = 0;
+ uint32_t len = htonl(size);
+
+ ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
+ (uint8_t *)&len,
+ sizeof(len));
+
+ if (ret != sizeof(len)) {
+ goto err;
+ }
+
+ ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
+ (uint8_t *)buf,
+ size);
if (ret != size) {
goto err;
}
return 0;
-
err:
return ret < 0 ? ret : -EIO;
}
@@ -1062,8 +1142,7 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs)
compare_chr_send(s,
pri_rs->buf,
pri_rs->packet_len,
- pri_rs->vnet_hdr_len,
- false);
+ pri_rs->vnet_hdr_len);
} else {
/* compare packet in the specified connection */
colo_compare_connection(conn, s);
@@ -1093,7 +1172,7 @@ static void compare_notify_rs_finalize(SocketReadState *notify_rs)
if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
notify_rs->buf,
notify_rs->packet_len)) {
- ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
+ ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg));
if (ret < 0) {
error_report("Notify Xen COLO-frame INIT failed");
}
@@ -1199,6 +1278,9 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
QTAILQ_INSERT_TAIL(&net_compares, s, next);
+ s->sendco.done = true;
+ g_queue_init(&s->sendco.send_list);
+
g_queue_init(&s->conn_list);
qemu_mutex_init(&event_mtx);
@@ -1224,8 +1306,7 @@ static void colo_flush_packets(void *opaque, void *user_data)
compare_chr_send(s,
pkt->data,
pkt->size,
- pkt->vnet_hdr_len,
- false);
+ pkt->vnet_hdr_len);
packet_destroy(pkt, NULL);
}
while (!g_queue_is_empty(&conn->secondary_list)) {
@@ -1281,6 +1362,11 @@ static void colo_compare_finalize(Object *obj)
CompareState *s = COLO_COMPARE(obj);
CompareState *tmp = NULL;
+ AioContext *ctx = iothread_get_aio_context(s->iothread);
+ aio_context_acquire(ctx);
+ AIO_WAIT_WHILE(ctx, !s->sendco.done);
+ aio_context_release(ctx);
+
qemu_chr_fe_deinit(&s->chr_pri_in, false);
qemu_chr_fe_deinit(&s->chr_sec_in, false);
qemu_chr_fe_deinit(&s->chr_out, false);
@@ -1305,6 +1391,7 @@ static void colo_compare_finalize(Object *obj)
g_queue_foreach(&s->conn_list, colo_flush_packets, s);
g_queue_clear(&s->conn_list);
+ g_queue_clear(&s->sendco.send_list);
if (s->connection_track_table) {
g_hash_table_destroy(s->connection_track_table);
The chr_out chardev is connected to a filter-redirector running in the main loop. qemu_chr_fe_write_all might block here in compare_chr_send if the (socket-)buffer is full. If another filter-redirector in the main loop want's to send data to chr_pri_in it might also block if the buffer is full. This leads to a deadlock because both event loops get blocked. Fix this by converting compare_chr_send to a coroutine and putting the packets in a send queue. Also create a new function notify_chr_send, since that should be independend. Signed-off-by: Lukas Straub <lukasstraub2@web.de> --- net/colo-compare.c | 173 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 130 insertions(+), 43 deletions(-)