From patchwork Wed May 25 12:50:52 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Zhang Chen X-Patchwork-Id: 9135339 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork.web.codeaurora.org (Postfix) with ESMTP id 9C9416075C for ; Wed, 25 May 2016 13:21:23 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 8F00A27984 for ; Wed, 25 May 2016 13:21:23 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 83E8B28110; Wed, 25 May 2016 13:21:23 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-6.9 required=2.0 tests=BAYES_00,RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by mail.wl.linuxfoundation.org (Postfix) with ESMTPS id 664B527984 for ; Wed, 25 May 2016 13:21:21 +0000 (UTC) Received: from localhost ([::1]:60012 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1b5YkS-00022S-F0 for patchwork-qemu-devel@patchwork.kernel.org; Wed, 25 May 2016 09:21:20 -0400 Received: from eggs.gnu.org ([2001:4830:134:3::10]:51823) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1b5YGq-00036g-Od for qemu-devel@nongnu.org; Wed, 25 May 2016 08:50:47 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1b5YGk-0006m8-Sw for qemu-devel@nongnu.org; Wed, 25 May 2016 08:50:43 -0400 Received: from [59.151.112.132] (port=23346 helo=heian.cn.fujitsu.com) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1b5YGh-0006ct-Ud for qemu-devel@nongnu.org; Wed, 25 May 2016 08:50:38 -0400 X-IronPort-AV: E=Sophos;i="5.22,518,1449504000"; d="scan'208";a="6895106" Received: from unknown (HELO cn.fujitsu.com) ([10.167.33.5]) by heian.cn.fujitsu.com with ESMTP; 25 May 2016 20:49:53 +0800 Received: from G08CNEXCHPEKD02.g08.fujitsu.local (unknown [10.167.33.83]) by cn.fujitsu.com (Postfix) with ESMTP id 351FC489F976; Wed, 25 May 2016 20:49:49 +0800 (CST) Received: from G08FNSTD140215.g08.fujitsu.local (10.167.226.56) by G08CNEXCHPEKD02.g08.fujitsu.local (10.167.33.89) with Microsoft SMTP Server (TLS) id 14.3.279.2; Wed, 25 May 2016 20:49:48 +0800 From: Zhang Chen To: qemu devel , Jason Wang Date: Wed, 25 May 2016 20:50:52 +0800 Message-ID: <1464180653-12637-4-git-send-email-zhangchen.fnst@cn.fujitsu.com> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1464180653-12637-1-git-send-email-zhangchen.fnst@cn.fujitsu.com> References: <1464180653-12637-1-git-send-email-zhangchen.fnst@cn.fujitsu.com> MIME-Version: 1.0 X-Originating-IP: [10.167.226.56] X-yoursite-MailScanner-ID: 351FC489F976.AF49F X-yoursite-MailScanner: Found to be clean X-yoursite-MailScanner-From: zhangchen.fnst@cn.fujitsu.com X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 59.151.112.132 Subject: [Qemu-devel] [RFC PATCH V4 3/4] colo-compare: introduce packet comparison thread X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: Li Zhijian , zhanghailiang , "eddie . dong" , "Dr . David Alan Gilbert" , Zhang Chen Errors-To: qemu-devel-bounces+patchwork-qemu-devel=patchwork.kernel.org@nongnu.org Sender: "Qemu-devel" X-Virus-Scanned: ClamAV using ClamSMTP if packets are same, we send primary packet and drop secondary packet, otherwise notify COLO do checkpoint. Signed-off-by: Zhang Chen Signed-off-by: Li Zhijian Signed-off-by: Wen Congyang --- net/colo-base.c | 1 + net/colo-base.h | 3 + net/colo-compare.c | 172 +++++++++++++++++++++++++++++++++++++++++++++++++++++ trace-events | 2 + 4 files changed, 178 insertions(+) diff --git a/net/colo-base.c b/net/colo-base.c index 6bb80ca..e795b5f 100644 --- a/net/colo-base.c +++ b/net/colo-base.c @@ -134,6 +134,7 @@ Packet *packet_new(const void *data, int size) pkt->data = g_memdup(data, size); pkt->size = size; + pkt->creation_ms = qemu_clock_get_ms(QEMU_CLOCK_HOST); return pkt; } diff --git a/net/colo-base.h b/net/colo-base.h index 5d72302..7b0abdf 100644 --- a/net/colo-base.h +++ b/net/colo-base.h @@ -18,6 +18,7 @@ #include "slirp/slirp.h" #include "qemu/jhash.h" #include "qemu/rcu.h" +#include "qemu/timer.h" #define HASHTABLE_MAX_SIZE 16384 @@ -47,6 +48,8 @@ typedef struct Packet { }; uint8_t *transport_layer; int size; + /* Time of packet creation, in wall clock ms */ + int64_t creation_ms; } Packet; typedef struct ConnectionKey { diff --git a/net/colo-compare.c b/net/colo-compare.c index 309d341..f2ad260 100644 --- a/net/colo-compare.c +++ b/net/colo-compare.c @@ -35,6 +35,8 @@ OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) #define COMPARE_READ_LEN_MAX NET_BUFSIZE +/* TODO: Should be configurable */ +#define REGULAR_CHECK_MS 400 static QTAILQ_HEAD(, CompareState) net_compares = QTAILQ_HEAD_INITIALIZER(net_compares); @@ -86,6 +88,14 @@ typedef struct CompareState { GQueue unprocessed_connections; /* proxy current hash size */ uint32_t hashtable_size; + + /* notify compare thread */ + QemuEvent event; + /* compare thread, a thread for each NIC */ + QemuThread thread; + int thread_status; + /* Timer used on the primary to find packets that are never matched */ + QEMUTimer *timer; } CompareState; typedef struct CompareClass { @@ -97,6 +107,15 @@ enum { SECONDARY_IN, }; +enum { + /* compare thread isn't started */ + COMPARE_THREAD_NONE, + /* compare thread is running */ + COMPARE_THREAD_RUNNING, + /* compare thread exit */ + COMPARE_THREAD_EXIT, +}; + static int compare_chr_send(CharDriverState *out, const uint8_t *buf, uint32_t size); @@ -145,6 +164,119 @@ static int packet_enqueue(CompareState *s, int mode) return 0; } +/* + * The IP packets sent by primary and secondary + * will be compared in here + * TODO support ip fragment, Out-Of-Order + * return: 0 means packet same + * > 0 || < 0 means packet different + */ +static int colo_packet_compare(Packet *ppkt, Packet *spkt) +{ + trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src), + inet_ntoa(ppkt->ip->ip_dst), spkt->size, + inet_ntoa(spkt->ip->ip_src), + inet_ntoa(spkt->ip->ip_dst)); + + if (ppkt->size == spkt->size) { + return memcmp(ppkt->data, spkt->data, spkt->size); + } else { + return -1; + } +} + +static int colo_packet_compare_all(Packet *spkt, Packet *ppkt) +{ + trace_colo_compare_main("compare all"); + return colo_packet_compare(ppkt, spkt); +} + +static void colo_old_packet_check(void *opaque_packet, void *opaque_found) +{ + int64_t now; + bool *found_old = (bool *)opaque_found; + Packet *ppkt = (Packet *)opaque_packet; + + if (*found_old) { + /* Someone found an old packet earlier in the queue */ + return; + } + + now = qemu_clock_get_ms(QEMU_CLOCK_HOST); + if ((ppkt->creation_ms < now) && + ((now - ppkt->creation_ms) > REGULAR_CHECK_MS)) { + trace_colo_old_packet_check_found(ppkt->creation_ms); + *found_old = true; + } +} + +/* + * called from the compare thread on the primary + * for compare connection + */ +static void colo_compare_connection(void *opaque, void *user_data) +{ + CompareState *s = user_data; + Connection *conn = opaque; + Packet *pkt = NULL; + GList *result = NULL; + bool found_old; + int ret; + + while (!g_queue_is_empty(&conn->primary_list) && + !g_queue_is_empty(&conn->secondary_list)) { + qemu_mutex_lock(&conn->list_lock); + pkt = g_queue_pop_tail(&conn->primary_list); + result = g_queue_find_custom(&conn->secondary_list, + pkt, (GCompareFunc)colo_packet_compare_all); + qemu_mutex_unlock(&conn->list_lock); + + if (result) { + ret = compare_chr_send(s->chr_out, pkt->data, pkt->size); + if (ret < 0) { + error_report("colo_send_primary_packet failed"); + } + trace_colo_compare_main("packet same and release packet"); + qemu_mutex_lock(&conn->list_lock); + g_queue_remove(&conn->secondary_list, result->data); + qemu_mutex_unlock(&conn->list_lock); + } else { + trace_colo_compare_main("packet different"); + qemu_mutex_lock(&conn->list_lock); + g_queue_push_tail(&conn->primary_list, pkt); + qemu_mutex_unlock(&conn->list_lock); + /* TODO: colo_notify_checkpoint();*/ + break; + } + } + + /* Look for old packets that the secondary hasn't matched, if we have some + * then we have to checkpoint to wake the secondary up. + */ + qemu_mutex_lock(&conn->list_lock); + found_old = false; + g_queue_foreach(&conn->primary_list, colo_old_packet_check, &found_old); + qemu_mutex_unlock(&conn->list_lock); + if (found_old) { + /* TODO: colo_notify_checkpoint();*/ + } +} + +static void *colo_compare_thread(void *opaque) +{ + CompareState *s = opaque; + + while (s->thread_status == COMPARE_THREAD_RUNNING) { + qemu_event_wait(&s->event); + qemu_event_reset(&s->event); + rcu_read_lock(); + g_queue_foreach(&s->conn_list, colo_compare_connection, s); + rcu_read_unlock(); + } + + return NULL; +} + static int compare_chr_send(CharDriverState *out, const uint8_t *buf, uint32_t size) @@ -261,6 +393,8 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs) if (packet_enqueue(s, PRIMARY_IN)) { trace_colo_compare_main("primary: unsupported packet in"); compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len); + } else { + qemu_event_set(&s->event); } } @@ -270,9 +404,23 @@ static void compare_sec_rs_finalize(SocketReadState *sec_rs) if (packet_enqueue(s, SECONDARY_IN)) { trace_colo_compare_main("secondary: unsupported packet in"); + } else { + qemu_event_set(&s->event); } } +/* Prod the compare thread regularly so it can watch for any packets + * that the secondary hasn't produced equivalents of. + */ +static void colo_compare_regular(void *opaque) +{ + CompareState *s = opaque; + + timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) + + REGULAR_CHECK_MS); + qemu_event_set(&s->event); +} + /* * called from the main thread on the primary * to setup colo-compare. @@ -280,6 +428,8 @@ static void compare_sec_rs_finalize(SocketReadState *sec_rs) static void colo_compare_complete(UserCreatable *uc, Error **errp) { CompareState *s = COLO_COMPARE(uc); + char thread_name[64]; + static int compare_id; if (!s->pri_indev || !s->sec_indev || !s->outdev) { error_setg(errp, "colo compare needs 'primary_in' ," @@ -329,6 +479,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) g_queue_init(&s->conn_list); qemu_mutex_init(&s->conn_list_lock); + qemu_event_init(&s->event, false); s->hashtable_size = 0; s->connection_track_table = g_hash_table_new_full(connection_key_hash, @@ -336,6 +487,19 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) g_free, connection_destroy); + s->thread_status = COMPARE_THREAD_RUNNING; + sprintf(thread_name, "compare %d", compare_id); + qemu_thread_create(&s->thread, thread_name, + colo_compare_thread, s, + QEMU_THREAD_JOINABLE); + compare_id++; + + /* A regular timer to kick any packets that the secondary doesn't match */ + s->timer = timer_new_ms(QEMU_CLOCK_VIRTUAL, /* Only when guest runs */ + colo_compare_regular, s); + timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) + + REGULAR_CHECK_MS); + return; } @@ -381,6 +545,14 @@ static void colo_compare_finalize(Object *obj) qemu_mutex_destroy(&s->conn_list_lock); g_queue_free(&s->conn_list); + if (s->thread.thread) { + s->thread_status = COMPARE_THREAD_EXIT; + qemu_event_set(&s->event); + qemu_thread_join(&s->thread); + } + qemu_event_destroy(&s->event); + timer_del(s->timer); + g_free(s->pri_indev); g_free(s->sec_indev); g_free(s->outdev); diff --git a/trace-events b/trace-events index 703de1a..1537e91 100644 --- a/trace-events +++ b/trace-events @@ -1919,3 +1919,5 @@ aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64 # net/colo-compare.c colo_compare_main(const char *chr) ": %s" +colo_compare_ip_info(int psize, const char *sta, const char *stb, int ssize, const char *stc, const char *std) "ppkt size = %d, ip_src = %s, ip_dst = %s, spkt size = %d, ip_src = %s, ip_dst = %s" +colo_old_packet_check_found(int64_t old_time) "%" PRId64