diff mbox

[V12,06/10] colo-compare: introduce packet comparison thread

Message ID 1471421428-26379-7-git-send-email-zhangchen.fnst@cn.fujitsu.com (mailing list archive)
State New, archived
Headers show

Commit Message

Zhang Chen Aug. 17, 2016, 8:10 a.m. UTC
If primary packet is same with secondary packet,
we will send primary packet and drop secondary
packet, otherwise notify COLO frame to do checkpoint.
If primary packet comes but secondary packet does not,
after REGULAR_PACKET_CHECK_MS milliseconds we set
the primary packet as old_packet,then do a checkpoint.

Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
---
 net/colo-compare.c | 216 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 net/colo.c         |   1 +
 net/colo.h         |   3 +
 trace-events       |   2 +
 4 files changed, 222 insertions(+)

Comments

Jason Wang Aug. 31, 2016, 9:13 a.m. UTC | #1
On 2016年08月17日 16:10, Zhang Chen wrote:
> If primary packet is same with secondary packet,
> we will send primary packet and drop secondary
> packet, otherwise notify COLO frame to do checkpoint.
> If primary packet comes but secondary packet does not,
> after REGULAR_PACKET_CHECK_MS milliseconds we set
> the primary packet as old_packet,then do a checkpoint.
>
> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> ---
>   net/colo-compare.c | 216 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>   net/colo.c         |   1 +
>   net/colo.h         |   3 +
>   trace-events       |   2 +
>   4 files changed, 222 insertions(+)
>
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index bab215b..b90cf1f 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -36,6 +36,8 @@
>   
>   #define COMPARE_READ_LEN_MAX NET_BUFSIZE
>   #define MAX_QUEUE_SIZE 1024
> +/* TODO: Should be configurable */
> +#define REGULAR_PACKET_CHECK_MS 3000
>   
>   /*
>     + CompareState ++
> @@ -79,6 +81,10 @@ typedef struct CompareState {
>       GQueue conn_list;
>       /* hashtable to save connection */
>       GHashTable *connection_track_table;
> +    /* compare thread, a thread for each NIC */
> +    QemuThread thread;
> +    /* Timer used on the primary to find packets that are never matched */
> +    QEMUTimer *timer;
>   } CompareState;
>   
>   typedef struct CompareClass {
> @@ -152,6 +158,113 @@ static int packet_enqueue(CompareState *s, int mode)
>       return 0;
>   }
>   
> +/*
> + * The IP packets sent by primary and secondary
> + * will be compared in here
> + * TODO support ip fragment, Out-Of-Order
> + * return:    0  means packet same
> + *            > 0 || < 0 means packet different
> + */
> +static int colo_packet_compare(Packet *ppkt, Packet *spkt)
> +{
> +    trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src),
> +                               inet_ntoa(ppkt->ip->ip_dst), spkt->size,
> +                               inet_ntoa(spkt->ip->ip_src),
> +                               inet_ntoa(spkt->ip->ip_dst));
> +
> +    if (ppkt->size == spkt->size) {
> +        return memcmp(ppkt->data, spkt->data, spkt->size);
> +    } else {
> +        return -1;
> +    }
> +}
> +
> +static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
> +{
> +    trace_colo_compare_main("compare all");
> +    return colo_packet_compare(ppkt, spkt);
> +}
> +
> +static void colo_old_packet_check_one(void *opaque_packet,
> +                                      void *opaque_found)
> +{
> +    int64_t now;
> +    bool *found_old = (bool *)opaque_found;
> +    Packet *ppkt = (Packet *)opaque_packet;
> +
> +    if (*found_old) {
> +        /* Someone found an old packet earlier in the queue */
> +        return;
> +    }
> +
> +    now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
> +    if ((now - ppkt->creation_ms) > REGULAR_PACKET_CHECK_MS) {
> +        trace_colo_old_packet_check_found(ppkt->creation_ms);
> +        *found_old = true;
> +    }
> +}
> +
> +static void colo_old_packet_check_one_conn(void *opaque,
> +                                           void *user_data)
> +{
> +    bool found_old = false;
> +    Connection *conn = opaque;
> +
> +    g_queue_foreach(&conn->primary_list, colo_old_packet_check_one,
> +                    &found_old);

As I mentioned in last version, can we avoid iterating all packets by 
using g_queue_find_custom() here?

> +    if (found_old) {
> +        /* do checkpoint will flush old packet */
> +        /* TODO: colo_notify_checkpoint();*/
> +    }
> +}
> +
> +/*
> + * Look for old packets that the secondary hasn't matched,
> + * if we have some then we have to checkpoint to wake
> + * the secondary up.
> + */
> +static void colo_old_packet_check(void *opaque)
> +{
> +    CompareState *s = opaque;
> +
> +    g_queue_foreach(&s->conn_list, colo_old_packet_check_one_conn, NULL);
> +}
> +
> +/*
> + * called from the compare thread on the primary
> + * for compare connection
> + */
> +static void colo_compare_connection(void *opaque, void *user_data)
> +{
> +    CompareState *s = user_data;
> +    Connection *conn = opaque;
> +    Packet *pkt = NULL;
> +    GList *result = NULL;
> +    int ret;
> +
> +    while (!g_queue_is_empty(&conn->primary_list) &&
> +           !g_queue_is_empty(&conn->secondary_list)) {
> +        pkt = g_queue_pop_tail(&conn->primary_list);
> +        result = g_queue_find_custom(&conn->secondary_list,
> +                              pkt, (GCompareFunc)colo_packet_compare_all);
> +
> +        if (result) {
> +            ret = compare_chr_send(s->chr_out, pkt->data, pkt->size);
> +            if (ret < 0) {
> +                error_report("colo_send_primary_packet failed");
> +            }
> +            trace_colo_compare_main("packet same and release packet");
> +            g_queue_remove(&conn->secondary_list, result->data);
> +            packet_destroy(pkt, NULL);
> +        } else {

Better add a comment to explain the case when secondary packet comes a 
little bit late here.

> +            trace_colo_compare_main("packet different");
> +            g_queue_push_tail(&conn->primary_list, pkt);
> +            /* TODO: colo_notify_checkpoint();*/
> +            break;
> +        }
> +    }
> +}
> +
>   static int compare_chr_send(CharDriverState *out,
>                               const uint8_t *buf,
>                               uint32_t size)
> @@ -179,6 +292,65 @@ err:
>       return ret < 0 ? ret : -EIO;
>   }
>   
> +static int compare_chr_can_read(void *opaque)
> +{
> +    return COMPARE_READ_LEN_MAX;
> +}
> +
> +/*
> + * called from the main thread on the primary for packets
> + * arriving over the socket from the primary.
> + */
> +static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
> +{
> +    CompareState *s = COLO_COMPARE(opaque);
> +    int ret;
> +
> +    ret = net_fill_rstate(&s->pri_rs, buf, size);
> +    if (ret == -1) {
> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
> +        error_report("colo-compare primary_in error");
> +    }
> +}
> +
> +/*
> + * called from the main thread on the primary for packets
> + * arriving over the socket from the secondary.
> + */
> +static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
> +{
> +    CompareState *s = COLO_COMPARE(opaque);
> +    int ret;
> +
> +    ret = net_fill_rstate(&s->sec_rs, buf, size);
> +    if (ret == -1) {
> +        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
> +        error_report("colo-compare secondary_in error");
> +    }
> +}
> +
> +static void *colo_compare_thread(void *opaque)
> +{
> +    GMainContext *worker_context;
> +    GMainLoop *compare_loop;
> +    CompareState *s = opaque;
> +
> +    worker_context = g_main_context_new();
> +
> +    qemu_chr_add_handlers_full(s->chr_pri_in, compare_chr_can_read,
> +                          compare_pri_chr_in, NULL, s, worker_context);
> +    qemu_chr_add_handlers_full(s->chr_sec_in, compare_chr_can_read,
> +                          compare_sec_chr_in, NULL, s, worker_context);
> +
> +    compare_loop = g_main_loop_new(worker_context, FALSE);
> +
> +    g_main_loop_run(compare_loop);
> +
> +    g_main_loop_unref(compare_loop);
> +    g_main_context_unref(worker_context);
> +    return NULL;
> +}
> +
>   static char *compare_get_pri_indev(Object *obj, Error **errp)
>   {
>       CompareState *s = COLO_COMPARE(obj);
> @@ -231,6 +403,9 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs)
>       if (packet_enqueue(s, PRIMARY_IN)) {
>           trace_colo_compare_main("primary: unsupported packet in");
>           compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
> +    } else {
> +        /* compare connection */
> +        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>       }
>   }
>   
> @@ -240,6 +415,9 @@ static void compare_sec_rs_finalize(SocketReadState *sec_rs)
>   
>       if (packet_enqueue(s, SECONDARY_IN)) {
>           trace_colo_compare_main("secondary: unsupported packet in");
> +    } else {
> +        /* compare connection */
> +        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>       }
>   }
>   
> @@ -266,6 +444,20 @@ static int compare_chardev_opts(void *opaque,
>   }
>   
>   /*
> + * Check old packet regularly so it can watch for any packets
> + * that the secondary hasn't produced equivalents of.
> + */
> +static void check_old_packet_regular(void *opaque)
> +{
> +    CompareState *s = opaque;
> +
> +    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
> +              REGULAR_PACKET_CHECK_MS);
> +    /* if have old packet we will notify checkpoint */
> +    colo_old_packet_check(s);
> +}
> +
> +/*
>    * called from the main thread on the primary
>    * to setup colo-compare.
>    */
> @@ -273,6 +465,8 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>   {
>       CompareState *s = COLO_COMPARE(uc);
>       CompareChardevProps props;
> +    char thread_name[64];
> +    static int compare_id;
>   
>       if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>           error_setg(errp, "colo compare needs 'primary_in' ,"
> @@ -356,6 +550,18 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>                                                         g_free,
>                                                         connection_destroy);
>   
> +    sprintf(thread_name, "colo-compare %d", compare_id);
> +    qemu_thread_create(&s->thread, thread_name,
> +                       colo_compare_thread, s,
> +                       QEMU_THREAD_JOINABLE);
> +    compare_id++;
> +
> +    /* A regular timer to kick any packets that the secondary doesn't match */
> +    s->timer = timer_new_ms(QEMU_CLOCK_VIRTUAL, /* Only when guest runs */
> +                            check_old_packet_regular, s);
> +    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
> +                        REGULAR_PACKET_CHECK_MS);

I still think we need to make sure the timer were processed in colo 
thread. Since check_old_packet_regular may iterate conn_list which may 
be modified by colo thread at the same time.

> +
>       return;
>   }
>   
> @@ -397,6 +603,16 @@ static void colo_compare_finalize(Object *obj)
>   
>       g_queue_free(&s->conn_list);
>   
> +    if (s->thread.thread) {
> +        /* compare connection */
> +        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
> +        qemu_thread_join(&s->thread);
> +    }
> +
> +    if (s->timer) {
> +        timer_del(s->timer);
> +    }
> +
>       g_free(s->pri_indev);
>       g_free(s->sec_indev);
>       g_free(s->outdev);
> diff --git a/net/colo.c b/net/colo.c
> index bc86553..da4b771 100644
> --- a/net/colo.c
> +++ b/net/colo.c
> @@ -128,6 +128,7 @@ Packet *packet_new(const void *data, int size)
>   
>       pkt->data = g_memdup(data, size);
>       pkt->size = size;
> +    pkt->creation_ms = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>   
>       return pkt;
>   }
> diff --git a/net/colo.h b/net/colo.h
> index 9cbc14e..6b395a3 100644
> --- a/net/colo.h
> +++ b/net/colo.h
> @@ -17,6 +17,7 @@
>   
>   #include "slirp/slirp.h"
>   #include "qemu/jhash.h"
> +#include "qemu/timer.h"
>   
>   #define HASHTABLE_MAX_SIZE 16384
>   
> @@ -28,6 +29,8 @@ typedef struct Packet {
>       };
>       uint8_t *transport_header;
>       int size;
> +    /* Time of packet creation, in wall clock ms */
> +    int64_t creation_ms;
>   } Packet;
>   
>   typedef struct ConnectionKey {
> diff --git a/trace-events b/trace-events
> index 703de1a..1537e91 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -1919,3 +1919,5 @@ aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64
>   
>   # net/colo-compare.c
>   colo_compare_main(const char *chr) ": %s"
> +colo_compare_ip_info(int psize, const char *sta, const char *stb, int ssize, const char *stc, const char *std) "ppkt size = %d, ip_src = %s, ip_dst = %s, spkt size = %d, ip_src = %s, ip_dst = %s"
> +colo_old_packet_check_found(int64_t old_time) "%" PRId64
Zhang Chen Sept. 1, 2016, 4:50 a.m. UTC | #2
On 08/31/2016 05:13 PM, Jason Wang wrote:
>
>
> On 2016年08月17日 16:10, Zhang Chen wrote:
>> If primary packet is same with secondary packet,
>> we will send primary packet and drop secondary
>> packet, otherwise notify COLO frame to do checkpoint.
>> If primary packet comes but secondary packet does not,
>> after REGULAR_PACKET_CHECK_MS milliseconds we set
>> the primary packet as old_packet,then do a checkpoint.
>>
>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>> ---
>>   net/colo-compare.c | 216 
>> +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>   net/colo.c         |   1 +
>>   net/colo.h         |   3 +
>>   trace-events       |   2 +
>>   4 files changed, 222 insertions(+)
>>
>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>> index bab215b..b90cf1f 100644
>> --- a/net/colo-compare.c
>> +++ b/net/colo-compare.c
>> @@ -36,6 +36,8 @@
>>     #define COMPARE_READ_LEN_MAX NET_BUFSIZE
>>   #define MAX_QUEUE_SIZE 1024
>> +/* TODO: Should be configurable */
>> +#define REGULAR_PACKET_CHECK_MS 3000
>>     /*
>>     + CompareState ++
>> @@ -79,6 +81,10 @@ typedef struct CompareState {
>>       GQueue conn_list;
>>       /* hashtable to save connection */
>>       GHashTable *connection_track_table;
>> +    /* compare thread, a thread for each NIC */
>> +    QemuThread thread;
>> +    /* Timer used on the primary to find packets that are never 
>> matched */
>> +    QEMUTimer *timer;
>>   } CompareState;
>>     typedef struct CompareClass {
>> @@ -152,6 +158,113 @@ static int packet_enqueue(CompareState *s, int 
>> mode)
>>       return 0;
>>   }
>>   +/*
>> + * The IP packets sent by primary and secondary
>> + * will be compared in here
>> + * TODO support ip fragment, Out-Of-Order
>> + * return:    0  means packet same
>> + *            > 0 || < 0 means packet different
>> + */
>> +static int colo_packet_compare(Packet *ppkt, Packet *spkt)
>> +{
>> +    trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src),
>> + inet_ntoa(ppkt->ip->ip_dst), spkt->size,
>> + inet_ntoa(spkt->ip->ip_src),
>> + inet_ntoa(spkt->ip->ip_dst));
>> +
>> +    if (ppkt->size == spkt->size) {
>> +        return memcmp(ppkt->data, spkt->data, spkt->size);
>> +    } else {
>> +        return -1;
>> +    }
>> +}
>> +
>> +static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
>> +{
>> +    trace_colo_compare_main("compare all");
>> +    return colo_packet_compare(ppkt, spkt);
>> +}
>> +
>> +static void colo_old_packet_check_one(void *opaque_packet,
>> +                                      void *opaque_found)
>> +{
>> +    int64_t now;
>> +    bool *found_old = (bool *)opaque_found;
>> +    Packet *ppkt = (Packet *)opaque_packet;
>> +
>> +    if (*found_old) {
>> +        /* Someone found an old packet earlier in the queue */
>> +        return;
>> +    }
>> +
>> +    now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>> +    if ((now - ppkt->creation_ms) > REGULAR_PACKET_CHECK_MS) {
>> + trace_colo_old_packet_check_found(ppkt->creation_ms);
>> +        *found_old = true;
>> +    }
>> +}
>> +
>> +static void colo_old_packet_check_one_conn(void *opaque,
>> +                                           void *user_data)
>> +{
>> +    bool found_old = false;
>> +    Connection *conn = opaque;
>> +
>> +    g_queue_foreach(&conn->primary_list, colo_old_packet_check_one,
>> +                    &found_old);
>
> As I mentioned in last version, can we avoid iterating all packets by 
> using g_queue_find_custom() here?

OK~~ I got it.

>
>> +    if (found_old) {
>> +        /* do checkpoint will flush old packet */
>> +        /* TODO: colo_notify_checkpoint();*/
>> +    }
>> +}
>> +
>> +/*
>> + * Look for old packets that the secondary hasn't matched,
>> + * if we have some then we have to checkpoint to wake
>> + * the secondary up.
>> + */
>> +static void colo_old_packet_check(void *opaque)
>> +{
>> +    CompareState *s = opaque;
>> +
>> +    g_queue_foreach(&s->conn_list, colo_old_packet_check_one_conn, 
>> NULL);
>> +}
>> +
>> +/*
>> + * called from the compare thread on the primary
>> + * for compare connection
>> + */
>> +static void colo_compare_connection(void *opaque, void *user_data)
>> +{
>> +    CompareState *s = user_data;
>> +    Connection *conn = opaque;
>> +    Packet *pkt = NULL;
>> +    GList *result = NULL;
>> +    int ret;
>> +
>> +    while (!g_queue_is_empty(&conn->primary_list) &&
>> +           !g_queue_is_empty(&conn->secondary_list)) {
>> +        pkt = g_queue_pop_tail(&conn->primary_list);
>> +        result = g_queue_find_custom(&conn->secondary_list,
>> +                              pkt, 
>> (GCompareFunc)colo_packet_compare_all);
>> +
>> +        if (result) {
>> +            ret = compare_chr_send(s->chr_out, pkt->data, pkt->size);
>> +            if (ret < 0) {
>> +                error_report("colo_send_primary_packet failed");
>> +            }
>> +            trace_colo_compare_main("packet same and release packet");
>> +            g_queue_remove(&conn->secondary_list, result->data);
>> +            packet_destroy(pkt, NULL);
>> +        } else {
>
> Better add a comment to explain the case when secondary packet comes a 
> little bit late here.

OK~~ I will add comments in next version.

>
>> + trace_colo_compare_main("packet different");
>> +            g_queue_push_tail(&conn->primary_list, pkt);
>> +            /* TODO: colo_notify_checkpoint();*/
>> +            break;
>> +        }
>> +    }
>> +}
>> +
>>   static int compare_chr_send(CharDriverState *out,
>>                               const uint8_t *buf,
>>                               uint32_t size)
>> @@ -179,6 +292,65 @@ err:
>>       return ret < 0 ? ret : -EIO;
>>   }
>>   +static int compare_chr_can_read(void *opaque)
>> +{
>> +    return COMPARE_READ_LEN_MAX;
>> +}
>> +
>> +/*
>> + * called from the main thread on the primary for packets
>> + * arriving over the socket from the primary.
>> + */
>> +static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int 
>> size)
>> +{
>> +    CompareState *s = COLO_COMPARE(opaque);
>> +    int ret;
>> +
>> +    ret = net_fill_rstate(&s->pri_rs, buf, size);
>> +    if (ret == -1) {
>> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>> +        error_report("colo-compare primary_in error");
>> +    }
>> +}
>> +
>> +/*
>> + * called from the main thread on the primary for packets
>> + * arriving over the socket from the secondary.
>> + */
>> +static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int 
>> size)
>> +{
>> +    CompareState *s = COLO_COMPARE(opaque);
>> +    int ret;
>> +
>> +    ret = net_fill_rstate(&s->sec_rs, buf, size);
>> +    if (ret == -1) {
>> +        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>> +        error_report("colo-compare secondary_in error");
>> +    }
>> +}
>> +
>> +static void *colo_compare_thread(void *opaque)
>> +{
>> +    GMainContext *worker_context;
>> +    GMainLoop *compare_loop;
>> +    CompareState *s = opaque;
>> +
>> +    worker_context = g_main_context_new();
>> +
>> +    qemu_chr_add_handlers_full(s->chr_pri_in, compare_chr_can_read,
>> +                          compare_pri_chr_in, NULL, s, worker_context);
>> +    qemu_chr_add_handlers_full(s->chr_sec_in, compare_chr_can_read,
>> +                          compare_sec_chr_in, NULL, s, worker_context);
>> +
>> +    compare_loop = g_main_loop_new(worker_context, FALSE);
>> +
>> +    g_main_loop_run(compare_loop);
>> +
>> +    g_main_loop_unref(compare_loop);
>> +    g_main_context_unref(worker_context);
>> +    return NULL;
>> +}
>> +
>>   static char *compare_get_pri_indev(Object *obj, Error **errp)
>>   {
>>       CompareState *s = COLO_COMPARE(obj);
>> @@ -231,6 +403,9 @@ static void 
>> compare_pri_rs_finalize(SocketReadState *pri_rs)
>>       if (packet_enqueue(s, PRIMARY_IN)) {
>>           trace_colo_compare_main("primary: unsupported packet in");
>>           compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
>> +    } else {
>> +        /* compare connection */
>> +        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>>       }
>>   }
>>   @@ -240,6 +415,9 @@ static void 
>> compare_sec_rs_finalize(SocketReadState *sec_rs)
>>         if (packet_enqueue(s, SECONDARY_IN)) {
>>           trace_colo_compare_main("secondary: unsupported packet in");
>> +    } else {
>> +        /* compare connection */
>> +        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>>       }
>>   }
>>   @@ -266,6 +444,20 @@ static int compare_chardev_opts(void *opaque,
>>   }
>>     /*
>> + * Check old packet regularly so it can watch for any packets
>> + * that the secondary hasn't produced equivalents of.
>> + */
>> +static void check_old_packet_regular(void *opaque)
>> +{
>> +    CompareState *s = opaque;
>> +
>> +    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
>> +              REGULAR_PACKET_CHECK_MS);
>> +    /* if have old packet we will notify checkpoint */
>> +    colo_old_packet_check(s);
>> +}
>> +
>> +/*
>>    * called from the main thread on the primary
>>    * to setup colo-compare.
>>    */
>> @@ -273,6 +465,8 @@ static void colo_compare_complete(UserCreatable 
>> *uc, Error **errp)
>>   {
>>       CompareState *s = COLO_COMPARE(uc);
>>       CompareChardevProps props;
>> +    char thread_name[64];
>> +    static int compare_id;
>>         if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>>           error_setg(errp, "colo compare needs 'primary_in' ,"
>> @@ -356,6 +550,18 @@ static void colo_compare_complete(UserCreatable 
>> *uc, Error **errp)
>>                                                         g_free,
>> connection_destroy);
>>   +    sprintf(thread_name, "colo-compare %d", compare_id);
>> +    qemu_thread_create(&s->thread, thread_name,
>> +                       colo_compare_thread, s,
>> +                       QEMU_THREAD_JOINABLE);
>> +    compare_id++;
>> +
>> +    /* A regular timer to kick any packets that the secondary 
>> doesn't match */
>> +    s->timer = timer_new_ms(QEMU_CLOCK_VIRTUAL, /* Only when guest 
>> runs */
>> +                            check_old_packet_regular, s);
>> +    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
>> +                        REGULAR_PACKET_CHECK_MS);
>
> I still think we need to make sure the timer were processed in colo 
> thread. Since check_old_packet_regular may iterate conn_list which may 
> be modified by colo thread at the same time.

Make sense, but in here we just read the conn_list, maybe we should add 
a lock for it?
Because of we don't have a easy way to make timer's handler run in colo 
thread,
the handler run in main-loop. Maybe this job we can do it later.

Thanks
Zhang Chen

>
>> +
>>       return;
>>   }
>>   @@ -397,6 +603,16 @@ static void colo_compare_finalize(Object *obj)
>>         g_queue_free(&s->conn_list);
>>   +    if (s->thread.thread) {
>> +        /* compare connection */
>> +        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>> +        qemu_thread_join(&s->thread);
>> +    }
>> +
>> +    if (s->timer) {
>> +        timer_del(s->timer);
>> +    }
>> +
>>       g_free(s->pri_indev);
>>       g_free(s->sec_indev);
>>       g_free(s->outdev);
>> diff --git a/net/colo.c b/net/colo.c
>> index bc86553..da4b771 100644
>> --- a/net/colo.c
>> +++ b/net/colo.c
>> @@ -128,6 +128,7 @@ Packet *packet_new(const void *data, int size)
>>         pkt->data = g_memdup(data, size);
>>       pkt->size = size;
>> +    pkt->creation_ms = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>>         return pkt;
>>   }
>> diff --git a/net/colo.h b/net/colo.h
>> index 9cbc14e..6b395a3 100644
>> --- a/net/colo.h
>> +++ b/net/colo.h
>> @@ -17,6 +17,7 @@
>>     #include "slirp/slirp.h"
>>   #include "qemu/jhash.h"
>> +#include "qemu/timer.h"
>>     #define HASHTABLE_MAX_SIZE 16384
>>   @@ -28,6 +29,8 @@ typedef struct Packet {
>>       };
>>       uint8_t *transport_header;
>>       int size;
>> +    /* Time of packet creation, in wall clock ms */
>> +    int64_t creation_ms;
>>   } Packet;
>>     typedef struct ConnectionKey {
>> diff --git a/trace-events b/trace-events
>> index 703de1a..1537e91 100644
>> --- a/trace-events
>> +++ b/trace-events
>> @@ -1919,3 +1919,5 @@ aspeed_vic_write(uint64_t offset, unsigned 
>> size, uint32_t data) "To 0x%" PRIx64
>>     # net/colo-compare.c
>>   colo_compare_main(const char *chr) ": %s"
>> +colo_compare_ip_info(int psize, const char *sta, const char *stb, 
>> int ssize, const char *stc, const char *std) "ppkt size = %d, ip_src 
>> = %s, ip_dst = %s, spkt size = %d, ip_src = %s, ip_dst = %s"
>> +colo_old_packet_check_found(int64_t old_time) "%" PRId64
>
>
>
> .
>
Jason Wang Sept. 1, 2016, 7:38 a.m. UTC | #3
On 2016年09月01日 12:50, Zhang Chen wrote:
>>>   + sprintf(thread_name, "colo-compare %d", compare_id);
>>> +    qemu_thread_create(&s->thread, thread_name,
>>> +                       colo_compare_thread, s,
>>> +                       QEMU_THREAD_JOINABLE);
>>> +    compare_id++;
>>> +
>>> +    /* A regular timer to kick any packets that the secondary 
>>> doesn't match */
>>> +    s->timer = timer_new_ms(QEMU_CLOCK_VIRTUAL, /* Only when guest 
>>> runs */
>>> +                            check_old_packet_regular, s);
>>> +    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
>>> +                        REGULAR_PACKET_CHECK_MS);
>>
>> I still think we need to make sure the timer were processed in colo 
>> thread. Since check_old_packet_regular may iterate conn_list which 
>> may be modified by colo thread at the same time.
>
> Make sense, but in here we just read the conn_list, maybe we should 
> add a lock for it?
> Because of we don't have a easy way to make timer's handler run in 
> colo thread,
> the handler run in main-loop. Maybe this job we can do it later.
>
> Thanks
> Zhang Chen 

A lock is ok for this series. But need to add a TODO here and we 
something like patch 1 to make sure timer could be processed other than 
main loop in the future.

Thanks
diff mbox

Patch

diff --git a/net/colo-compare.c b/net/colo-compare.c
index bab215b..b90cf1f 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -36,6 +36,8 @@ 
 
 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
 #define MAX_QUEUE_SIZE 1024
+/* TODO: Should be configurable */
+#define REGULAR_PACKET_CHECK_MS 3000
 
 /*
   + CompareState ++
@@ -79,6 +81,10 @@  typedef struct CompareState {
     GQueue conn_list;
     /* hashtable to save connection */
     GHashTable *connection_track_table;
+    /* compare thread, a thread for each NIC */
+    QemuThread thread;
+    /* Timer used on the primary to find packets that are never matched */
+    QEMUTimer *timer;
 } CompareState;
 
 typedef struct CompareClass {
@@ -152,6 +158,113 @@  static int packet_enqueue(CompareState *s, int mode)
     return 0;
 }
 
+/*
+ * The IP packets sent by primary and secondary
+ * will be compared in here
+ * TODO support ip fragment, Out-Of-Order
+ * return:    0  means packet same
+ *            > 0 || < 0 means packet different
+ */
+static int colo_packet_compare(Packet *ppkt, Packet *spkt)
+{
+    trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src),
+                               inet_ntoa(ppkt->ip->ip_dst), spkt->size,
+                               inet_ntoa(spkt->ip->ip_src),
+                               inet_ntoa(spkt->ip->ip_dst));
+
+    if (ppkt->size == spkt->size) {
+        return memcmp(ppkt->data, spkt->data, spkt->size);
+    } else {
+        return -1;
+    }
+}
+
+static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
+{
+    trace_colo_compare_main("compare all");
+    return colo_packet_compare(ppkt, spkt);
+}
+
+static void colo_old_packet_check_one(void *opaque_packet,
+                                      void *opaque_found)
+{
+    int64_t now;
+    bool *found_old = (bool *)opaque_found;
+    Packet *ppkt = (Packet *)opaque_packet;
+
+    if (*found_old) {
+        /* Someone found an old packet earlier in the queue */
+        return;
+    }
+
+    now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
+    if ((now - ppkt->creation_ms) > REGULAR_PACKET_CHECK_MS) {
+        trace_colo_old_packet_check_found(ppkt->creation_ms);
+        *found_old = true;
+    }
+}
+
+static void colo_old_packet_check_one_conn(void *opaque,
+                                           void *user_data)
+{
+    bool found_old = false;
+    Connection *conn = opaque;
+
+    g_queue_foreach(&conn->primary_list, colo_old_packet_check_one,
+                    &found_old);
+    if (found_old) {
+        /* do checkpoint will flush old packet */
+        /* TODO: colo_notify_checkpoint();*/
+    }
+}
+
+/*
+ * Look for old packets that the secondary hasn't matched,
+ * if we have some then we have to checkpoint to wake
+ * the secondary up.
+ */
+static void colo_old_packet_check(void *opaque)
+{
+    CompareState *s = opaque;
+
+    g_queue_foreach(&s->conn_list, colo_old_packet_check_one_conn, NULL);
+}
+
+/*
+ * called from the compare thread on the primary
+ * for compare connection
+ */
+static void colo_compare_connection(void *opaque, void *user_data)
+{
+    CompareState *s = user_data;
+    Connection *conn = opaque;
+    Packet *pkt = NULL;
+    GList *result = NULL;
+    int ret;
+
+    while (!g_queue_is_empty(&conn->primary_list) &&
+           !g_queue_is_empty(&conn->secondary_list)) {
+        pkt = g_queue_pop_tail(&conn->primary_list);
+        result = g_queue_find_custom(&conn->secondary_list,
+                              pkt, (GCompareFunc)colo_packet_compare_all);
+
+        if (result) {
+            ret = compare_chr_send(s->chr_out, pkt->data, pkt->size);
+            if (ret < 0) {
+                error_report("colo_send_primary_packet failed");
+            }
+            trace_colo_compare_main("packet same and release packet");
+            g_queue_remove(&conn->secondary_list, result->data);
+            packet_destroy(pkt, NULL);
+        } else {
+            trace_colo_compare_main("packet different");
+            g_queue_push_tail(&conn->primary_list, pkt);
+            /* TODO: colo_notify_checkpoint();*/
+            break;
+        }
+    }
+}
+
 static int compare_chr_send(CharDriverState *out,
                             const uint8_t *buf,
                             uint32_t size)
@@ -179,6 +292,65 @@  err:
     return ret < 0 ? ret : -EIO;
 }
 
+static int compare_chr_can_read(void *opaque)
+{
+    return COMPARE_READ_LEN_MAX;
+}
+
+/*
+ * called from the main thread on the primary for packets
+ * arriving over the socket from the primary.
+ */
+static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
+{
+    CompareState *s = COLO_COMPARE(opaque);
+    int ret;
+
+    ret = net_fill_rstate(&s->pri_rs, buf, size);
+    if (ret == -1) {
+        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
+        error_report("colo-compare primary_in error");
+    }
+}
+
+/*
+ * called from the main thread on the primary for packets
+ * arriving over the socket from the secondary.
+ */
+static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
+{
+    CompareState *s = COLO_COMPARE(opaque);
+    int ret;
+
+    ret = net_fill_rstate(&s->sec_rs, buf, size);
+    if (ret == -1) {
+        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
+        error_report("colo-compare secondary_in error");
+    }
+}
+
+static void *colo_compare_thread(void *opaque)
+{
+    GMainContext *worker_context;
+    GMainLoop *compare_loop;
+    CompareState *s = opaque;
+
+    worker_context = g_main_context_new();
+
+    qemu_chr_add_handlers_full(s->chr_pri_in, compare_chr_can_read,
+                          compare_pri_chr_in, NULL, s, worker_context);
+    qemu_chr_add_handlers_full(s->chr_sec_in, compare_chr_can_read,
+                          compare_sec_chr_in, NULL, s, worker_context);
+
+    compare_loop = g_main_loop_new(worker_context, FALSE);
+
+    g_main_loop_run(compare_loop);
+
+    g_main_loop_unref(compare_loop);
+    g_main_context_unref(worker_context);
+    return NULL;
+}
+
 static char *compare_get_pri_indev(Object *obj, Error **errp)
 {
     CompareState *s = COLO_COMPARE(obj);
@@ -231,6 +403,9 @@  static void compare_pri_rs_finalize(SocketReadState *pri_rs)
     if (packet_enqueue(s, PRIMARY_IN)) {
         trace_colo_compare_main("primary: unsupported packet in");
         compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
+    } else {
+        /* compare connection */
+        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
     }
 }
 
@@ -240,6 +415,9 @@  static void compare_sec_rs_finalize(SocketReadState *sec_rs)
 
     if (packet_enqueue(s, SECONDARY_IN)) {
         trace_colo_compare_main("secondary: unsupported packet in");
+    } else {
+        /* compare connection */
+        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
     }
 }
 
@@ -266,6 +444,20 @@  static int compare_chardev_opts(void *opaque,
 }
 
 /*
+ * Check old packet regularly so it can watch for any packets
+ * that the secondary hasn't produced equivalents of.
+ */
+static void check_old_packet_regular(void *opaque)
+{
+    CompareState *s = opaque;
+
+    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
+              REGULAR_PACKET_CHECK_MS);
+    /* if have old packet we will notify checkpoint */
+    colo_old_packet_check(s);
+}
+
+/*
  * called from the main thread on the primary
  * to setup colo-compare.
  */
@@ -273,6 +465,8 @@  static void colo_compare_complete(UserCreatable *uc, Error **errp)
 {
     CompareState *s = COLO_COMPARE(uc);
     CompareChardevProps props;
+    char thread_name[64];
+    static int compare_id;
 
     if (!s->pri_indev || !s->sec_indev || !s->outdev) {
         error_setg(errp, "colo compare needs 'primary_in' ,"
@@ -356,6 +550,18 @@  static void colo_compare_complete(UserCreatable *uc, Error **errp)
                                                       g_free,
                                                       connection_destroy);
 
+    sprintf(thread_name, "colo-compare %d", compare_id);
+    qemu_thread_create(&s->thread, thread_name,
+                       colo_compare_thread, s,
+                       QEMU_THREAD_JOINABLE);
+    compare_id++;
+
+    /* A regular timer to kick any packets that the secondary doesn't match */
+    s->timer = timer_new_ms(QEMU_CLOCK_VIRTUAL, /* Only when guest runs */
+                            check_old_packet_regular, s);
+    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
+                        REGULAR_PACKET_CHECK_MS);
+
     return;
 }
 
@@ -397,6 +603,16 @@  static void colo_compare_finalize(Object *obj)
 
     g_queue_free(&s->conn_list);
 
+    if (s->thread.thread) {
+        /* compare connection */
+        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
+        qemu_thread_join(&s->thread);
+    }
+
+    if (s->timer) {
+        timer_del(s->timer);
+    }
+
     g_free(s->pri_indev);
     g_free(s->sec_indev);
     g_free(s->outdev);
diff --git a/net/colo.c b/net/colo.c
index bc86553..da4b771 100644
--- a/net/colo.c
+++ b/net/colo.c
@@ -128,6 +128,7 @@  Packet *packet_new(const void *data, int size)
 
     pkt->data = g_memdup(data, size);
     pkt->size = size;
+    pkt->creation_ms = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 
     return pkt;
 }
diff --git a/net/colo.h b/net/colo.h
index 9cbc14e..6b395a3 100644
--- a/net/colo.h
+++ b/net/colo.h
@@ -17,6 +17,7 @@ 
 
 #include "slirp/slirp.h"
 #include "qemu/jhash.h"
+#include "qemu/timer.h"
 
 #define HASHTABLE_MAX_SIZE 16384
 
@@ -28,6 +29,8 @@  typedef struct Packet {
     };
     uint8_t *transport_header;
     int size;
+    /* Time of packet creation, in wall clock ms */
+    int64_t creation_ms;
 } Packet;
 
 typedef struct ConnectionKey {
diff --git a/trace-events b/trace-events
index 703de1a..1537e91 100644
--- a/trace-events
+++ b/trace-events
@@ -1919,3 +1919,5 @@  aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64
 
 # net/colo-compare.c
 colo_compare_main(const char *chr) ": %s"
+colo_compare_ip_info(int psize, const char *sta, const char *stb, int ssize, const char *stc, const char *std) "ppkt size = %d, ip_src = %s, ip_dst = %s, spkt size = %d, ip_src = %s, ip_dst = %s"
+colo_old_packet_check_found(int64_t old_time) "%" PRId64