@@ -29,17 +29,24 @@
#include "qemu/sockets.h"
#include "qapi-visit.h"
#include "net/colo.h"
+#include "net/colo-compare.h"
#define TYPE_COLO_COMPARE "colo-compare"
#define COLO_COMPARE(obj) \
OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
+static QTAILQ_HEAD(, CompareState) net_compares =
+ QTAILQ_HEAD_INITIALIZER(net_compares);
+
#define COMPARE_READ_LEN_MAX NET_BUFSIZE
#define MAX_QUEUE_SIZE 1024
/* TODO: Should be configurable */
#define REGULAR_PACKET_CHECK_MS 3000
+static QemuMutex event_mtx = { .lock = PTHREAD_MUTEX_INITIALIZER };
+static QemuCond event_complete_cond = { .cond = PTHREAD_COND_INITIALIZER };
+static int event_unhandled_count;
/*
+ CompareState ++
| |
@@ -87,6 +94,10 @@ typedef struct CompareState {
GMainContext *worker_context;
GMainLoop *compare_loop;
+ /* Used for COLO to notify compare to do something */
+ FilterNotifier *notifier;
+
+ QTAILQ_ENTRY(CompareState) next;
} CompareState;
typedef struct CompareClass {
@@ -417,6 +428,11 @@ static void colo_compare_connection(void *opaque, void *user_data)
while (!g_queue_is_empty(&conn->primary_list) &&
!g_queue_is_empty(&conn->secondary_list)) {
pkt = g_queue_pop_tail(&conn->primary_list);
+ if (!pkt) {
+ error_report("colo-compare pop pkt failed");
+ return;
+ }
+
switch (conn->ip_proto) {
case IPPROTO_TCP:
result = g_queue_find_custom(&conn->secondary_list,
@@ -538,6 +554,53 @@ static gboolean check_old_packet_regular(void *opaque)
return TRUE;
}
+/* Public API, Used for COLO frame to notify compare event */
+void colo_notify_compares_event(void *opaque, int event, Error **errp)
+{
+ CompareState *s;
+ int ret;
+
+ qemu_mutex_lock(&event_mtx);
+ QTAILQ_FOREACH(s, &net_compares, next) {
+ ret = filter_notifier_set(s->notifier, event);
+ if (ret < 0) {
+ error_setg_errno(errp, -ret, "Failed to write value to eventfd");
+ goto fail;
+ }
+ event_unhandled_count++;
+ }
+ /* Wait all compare threads to finish handling this event */
+ while (event_unhandled_count > 0) {
+ qemu_cond_wait(&event_complete_cond, &event_mtx);
+ }
+
+fail:
+ qemu_mutex_unlock(&event_mtx);
+}
+
+static void colo_flush_packets(void *opaque, void *user_data);
+
+static void colo_compare_handle_event(void *opaque, int event)
+{
+ FilterNotifier *notify = opaque;
+ CompareState *s = notify->opaque;
+
+ switch (event) {
+ case COLO_CHECKPOINT:
+ g_queue_foreach(&s->conn_list, colo_flush_packets, s);
+ break;
+ case COLO_FAILOVER:
+ break;
+ default:
+ break;
+ }
+ qemu_mutex_lock(&event_mtx);
+ assert(event_unhandled_count > 0);
+ event_unhandled_count--;
+ qemu_cond_broadcast(&event_complete_cond);
+ qemu_mutex_unlock(&event_mtx);
+}
+
static void *colo_compare_thread(void *opaque)
{
CompareState *s = opaque;
@@ -558,10 +621,15 @@ static void *colo_compare_thread(void *opaque)
(GSourceFunc)check_old_packet_regular, s, NULL);
g_source_attach(timeout_source, s->worker_context);
+ s->notifier = filter_notifier_new(colo_compare_handle_event, s, NULL);
+ g_source_attach(&s->notifier->source, s->worker_context);
+
qemu_sem_post(&s->thread_ready);
g_main_loop_run(s->compare_loop);
+ g_source_destroy(&s->notifier->source);
+ g_source_unref(&s->notifier->source);
g_source_destroy(timeout_source);
g_source_unref(timeout_source);
@@ -706,6 +774,8 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize);
net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize);
+ QTAILQ_INSERT_TAIL(&net_compares, s, next);
+
g_queue_init(&s->conn_list);
s->connection_track_table = g_hash_table_new_full(connection_key_hash,
@@ -765,6 +835,7 @@ static void colo_compare_init(Object *obj)
static void colo_compare_finalize(Object *obj)
{
CompareState *s = COLO_COMPARE(obj);
+ CompareState *tmp = NULL;
qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
s->worker_context, true);
@@ -777,6 +848,13 @@ static void colo_compare_finalize(Object *obj)
}
qemu_thread_join(&s->thread);
+ QTAILQ_FOREACH(tmp, &net_compares, next) {
+ if (!strcmp(tmp->outdev, s->outdev)) {
+ QTAILQ_REMOVE(&net_compares, s, next);
+ break;
+ }
+ }
+
/* Release all unhandled packets after compare thead exited */
g_queue_foreach(&s->conn_list, colo_flush_packets, s);
new file mode 100644
@@ -0,0 +1,6 @@
+#ifndef QEMU_COLO_COMPARE_H
+#define QEMU_COLO_COMPARE_H
+
+void colo_notify_compares_event(void *opaque, int event, Error **errp);
+
+#endif /* QEMU_COLO_COMPARE_H */