Message ID | 1460977906-25218-4-git-send-email-zhangchen.fnst@cn.fujitsu.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
On 04/18/2016 07:11 PM, Zhang Chen wrote: > if packets are same, we send primary packet and drop secondary > packet, otherwise notify COLO do checkpoint. > > Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com> > Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com> > Signed-off-by: Wen Congyang <wency@cn.fujitsu.com> > --- > net/colo-compare.c | 126 +++++++++++++++++++++++++++++++++++++++++++++++++++++ > trace-events | 2 + > 2 files changed, 128 insertions(+) > > diff --git a/net/colo-compare.c b/net/colo-compare.c > index dc57eac..4b5a2d4 100644 > --- a/net/colo-compare.c > +++ b/net/colo-compare.c > @@ -26,6 +26,7 @@ > #include "qemu/jhash.h" > #include "net/eth.h" > > +#define DEBUG_TCP_COMPARE 1 > #define TYPE_COLO_COMPARE "colo-compare" > #define COLO_COMPARE(obj) \ > OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) > @@ -90,6 +91,13 @@ typedef struct CompareState { > GQueue unprocessed_connections; > /* proxy current hash size */ > uint32_t hashtable_size; > + > + /* notify compare thread */ > + QemuEvent event; > + /* compare thread, a thread for each NIC */ > + QemuThread thread; > + int thread_status; > + > } CompareState; > > typedef struct CompareClass { > @@ -132,6 +140,15 @@ enum { > SECONDARY_IN, > }; > > +enum { > + /* compare thread isn't started */ > + COMPARE_THREAD_NONE, > + /* compare thread is running */ > + COMPARE_THREAD_RUNNING, > + /* compare thread exit */ > + COMPARE_THREAD_EXIT, > +}; > + > static void packet_destroy(void *opaque, void *user_data); > static int compare_chr_send(CharDriverState *out, > const uint8_t *buf, > @@ -336,6 +353,94 @@ static void packet_destroy(void *opaque, void *user_data) > g_slice_free(Packet, pkt); > } > > +static inline void colo_dump_packet(Packet *pkt) > +{ > + int i; > + for (i = 0; i < pkt->size; i++) { > + printf("%02x ", ((uint8_t *)pkt->data)[i]); > + } > + printf("\n"); Can we use something like qemu_hexdump() here? > +} > + > +/* > + * The IP packets sent by primary and secondary > + * will be compared in here > + * TODO support ip fragment, Out-Of-Order > + * return: 0 means packet same > + * > 0 || < 0 means packet different > + */ > +static int colo_packet_compare(Packet *ppkt, Packet *spkt) > +{ > + trace_colo_compare_with_int("ppkt size", ppkt->size); > + trace_colo_compare_with_char("ppkt ip_src", inet_ntoa(ppkt->ip->ip_src)); > + trace_colo_compare_with_char("ppkt ip_dst", inet_ntoa(ppkt->ip->ip_dst)); > + trace_colo_compare_with_int("spkt size", spkt->size); > + trace_colo_compare_with_char("spkt ip_src", inet_ntoa(spkt->ip->ip_src)); > + trace_colo_compare_with_char("spkt ip_dst", inet_ntoa(spkt->ip->ip_dst)); Can we use a single tracepoint here instead? > + > + if (ppkt->size == spkt->size) { > + return memcmp(ppkt->data, spkt->data, spkt->size); > + } else { > + return -1; > + } > +} > + > +static int colo_packet_compare_all(Packet *spkt, Packet *ppkt) > +{ > + trace_colo_compare_main("compare all"); > + return colo_packet_compare(ppkt, spkt); Why need this? > +} > + > +/* > + * called from the compare thread on the primary > + * for compare connection > + */ > +static void colo_compare_connection(void *opaque, void *user_data) > +{ > + Connection *conn = opaque; > + Packet *pkt = NULL; > + GList *result = NULL; > + int ret; > + > + qemu_mutex_lock(&conn->list_lock); > + while (!g_queue_is_empty(&conn->primary_list) && > + !g_queue_is_empty(&conn->secondary_list)) { > + pkt = g_queue_pop_head(&conn->primary_list); > + result = g_queue_find_custom(&conn->secondary_list, > + pkt, (GCompareFunc)colo_packet_compare_all); > + > + if (result) { > + ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size); > + if (ret < 0) { > + error_report("colo_send_primary_packet failed"); > + } > + trace_colo_compare_main("packet same and release packet"); > + g_queue_remove(&conn->secondary_list, result->data); > + } else { > + trace_colo_compare_main("packet different"); > + g_queue_push_head(&conn->primary_list, pkt); Is this possible that the packet from secondary has not been arrived on time? If yes, do we still need to notify the checkpoint here? > + /* TODO: colo_notify_checkpoint();*/ > + break; > + } > + } > + qemu_mutex_unlock(&conn->list_lock); > +} > + > +static void *colo_compare_thread(void *opaque) > +{ > + CompareState *s = opaque; > + > + while (s->thread_status == COMPARE_THREAD_RUNNING) { > + qemu_event_wait(&s->event); > + qemu_event_reset(&s->event); > + qemu_mutex_lock(&s->conn_list_lock); > + g_queue_foreach(&s->conn_list, colo_compare_connection, NULL); > + qemu_mutex_unlock(&s->conn_list_lock); > + } > + > + return NULL; > +} > + > static int compare_chr_send(CharDriverState *out, > const uint8_t *buf, > uint32_t size) > @@ -440,6 +545,8 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size) > if (packet_enqueue(s, PRIMARY_IN)) { > trace_colo_compare_main("primary: unsupported packet in"); > compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len); > + } else { > + qemu_event_set(&s->event); > } > } else if (ret == -1) { > qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL); > @@ -461,6 +568,8 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size) > trace_colo_compare_main("secondary: unsupported packet in"); > /* should we send sec arp pkt? */ > compare_chr_send(s->chr_out, s->sec_rs.buf, s->sec_rs.packet_len); > + } else { > + qemu_event_set(&s->event); > } > } else if (ret == -1) { > qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL); > @@ -519,6 +628,8 @@ static void compare_set_outdev(Object *obj, const char *value, Error **errp) > static void colo_compare_complete(UserCreatable *uc, Error **errp) > { > CompareState *s = COLO_COMPARE(uc); > + char thread_name[64]; > + static int compare_id; > > if (!s->pri_indev || !s->sec_indev || !s->outdev) { > error_setg(errp, "colo compare needs 'primary_in' ," > @@ -564,6 +675,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) > QTAILQ_INSERT_TAIL(&net_compares, s, next); > > g_queue_init(&s->conn_list); > + qemu_event_init(&s->event, false); > qemu_mutex_init(&s->conn_list_lock); > s->hashtable_size = 0; > > @@ -572,6 +684,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) > g_free, > connection_destroy); > > + s->thread_status = COMPARE_THREAD_RUNNING; > + sprintf(thread_name, "compare %d", compare_id); > + qemu_thread_create(&s->thread, thread_name, > + colo_compare_thread, s, > + QEMU_THREAD_JOINABLE); > + compare_id++; > + > return; > } > > @@ -607,6 +726,13 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data) > QTAILQ_REMOVE(&net_compares, s, next); > } > qemu_mutex_destroy(&s->conn_list_lock); > + > + if (s->thread.thread) { > + s->thread_status = COMPARE_THREAD_EXIT; > + qemu_event_set(&s->event); > + qemu_thread_join(&s->thread); > + } > + qemu_event_destroy(&s->event); > } > > static void colo_compare_init(Object *obj) > diff --git a/trace-events b/trace-events > index 8862288..978c47f 100644 > --- a/trace-events > +++ b/trace-events > @@ -1919,3 +1919,5 @@ aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64 > > # net/colo-compare.c > colo_compare_main(const char *chr) "chr: %s" > +colo_compare_with_int(const char *sta, int size) ": %s = %d" > +colo_compare_with_char(const char *sta, const char *stb) ": %s = %s"
On 04/28/2016 03:58 PM, Jason Wang wrote: > > On 04/18/2016 07:11 PM, Zhang Chen wrote: >> if packets are same, we send primary packet and drop secondary >> packet, otherwise notify COLO do checkpoint. >> >> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com> >> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com> >> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com> >> --- >> net/colo-compare.c | 126 +++++++++++++++++++++++++++++++++++++++++++++++++++++ >> trace-events | 2 + >> 2 files changed, 128 insertions(+) >> >> diff --git a/net/colo-compare.c b/net/colo-compare.c >> index dc57eac..4b5a2d4 100644 >> --- a/net/colo-compare.c >> +++ b/net/colo-compare.c >> @@ -26,6 +26,7 @@ >> #include "qemu/jhash.h" >> #include "net/eth.h" >> >> +#define DEBUG_TCP_COMPARE 1 >> #define TYPE_COLO_COMPARE "colo-compare" >> #define COLO_COMPARE(obj) \ >> OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) >> @@ -90,6 +91,13 @@ typedef struct CompareState { >> GQueue unprocessed_connections; >> /* proxy current hash size */ >> uint32_t hashtable_size; >> + >> + /* notify compare thread */ >> + QemuEvent event; >> + /* compare thread, a thread for each NIC */ >> + QemuThread thread; >> + int thread_status; >> + >> } CompareState; >> >> typedef struct CompareClass { >> @@ -132,6 +140,15 @@ enum { >> SECONDARY_IN, >> }; >> >> +enum { >> + /* compare thread isn't started */ >> + COMPARE_THREAD_NONE, >> + /* compare thread is running */ >> + COMPARE_THREAD_RUNNING, >> + /* compare thread exit */ >> + COMPARE_THREAD_EXIT, >> +}; >> + >> static void packet_destroy(void *opaque, void *user_data); >> static int compare_chr_send(CharDriverState *out, >> const uint8_t *buf, >> @@ -336,6 +353,94 @@ static void packet_destroy(void *opaque, void *user_data) >> g_slice_free(Packet, pkt); >> } >> >> +static inline void colo_dump_packet(Packet *pkt) >> +{ >> + int i; >> + for (i = 0; i < pkt->size; i++) { >> + printf("%02x ", ((uint8_t *)pkt->data)[i]); >> + } >> + printf("\n"); > Can we use something like qemu_hexdump() here? Thanks~~ I will change it to qemu_hexdump > >> +} >> + >> +/* >> + * The IP packets sent by primary and secondary >> + * will be compared in here >> + * TODO support ip fragment, Out-Of-Order >> + * return: 0 means packet same >> + * > 0 || < 0 means packet different >> + */ >> +static int colo_packet_compare(Packet *ppkt, Packet *spkt) >> +{ >> + trace_colo_compare_with_int("ppkt size", ppkt->size); >> + trace_colo_compare_with_char("ppkt ip_src", inet_ntoa(ppkt->ip->ip_src)); >> + trace_colo_compare_with_char("ppkt ip_dst", inet_ntoa(ppkt->ip->ip_dst)); >> + trace_colo_compare_with_int("spkt size", spkt->size); >> + trace_colo_compare_with_char("spkt ip_src", inet_ntoa(spkt->ip->ip_src)); >> + trace_colo_compare_with_char("spkt ip_dst", inet_ntoa(spkt->ip->ip_dst)); > Can we use a single tracepoint here instead? Yes,fix in next. > >> + >> + if (ppkt->size == spkt->size) { >> + return memcmp(ppkt->data, spkt->data, spkt->size); >> + } else { >> + return -1; >> + } >> +} >> + >> +static int colo_packet_compare_all(Packet *spkt, Packet *ppkt) >> +{ >> + trace_colo_compare_main("compare all"); >> + return colo_packet_compare(ppkt, spkt); > Why need this? just temp name,will change in patch 4/4 > >> +} >> + >> +/* >> + * called from the compare thread on the primary >> + * for compare connection >> + */ >> +static void colo_compare_connection(void *opaque, void *user_data) >> +{ >> + Connection *conn = opaque; >> + Packet *pkt = NULL; >> + GList *result = NULL; >> + int ret; >> + >> + qemu_mutex_lock(&conn->list_lock); >> + while (!g_queue_is_empty(&conn->primary_list) && >> + !g_queue_is_empty(&conn->secondary_list)) { >> + pkt = g_queue_pop_head(&conn->primary_list); >> + result = g_queue_find_custom(&conn->secondary_list, >> + pkt, (GCompareFunc)colo_packet_compare_all); >> + >> + if (result) { >> + ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size); >> + if (ret < 0) { >> + error_report("colo_send_primary_packet failed"); >> + } >> + trace_colo_compare_main("packet same and release packet"); >> + g_queue_remove(&conn->secondary_list, result->data); >> + } else { >> + trace_colo_compare_main("packet different"); >> + g_queue_push_head(&conn->primary_list, pkt); > Is this possible that the packet from secondary has not been arrived on > time? If yes, do we still need to notify the checkpoint here? Yes,the packet of secondary may not arrived. we will hold primary packet to next periodic checkpoint to flush it. and more, I consider to set a timer to flush timeout(200ms???) packet like Dave's branch. Thanks zhangchen > >> + /* TODO: colo_notify_checkpoint();*/ >> + break; >> + } >> + } >> + qemu_mutex_unlock(&conn->list_lock); >> +} >> + >> +static void *colo_compare_thread(void *opaque) >> +{ >> + CompareState *s = opaque; >> + >> + while (s->thread_status == COMPARE_THREAD_RUNNING) { >> + qemu_event_wait(&s->event); >> + qemu_event_reset(&s->event); >> + qemu_mutex_lock(&s->conn_list_lock); >> + g_queue_foreach(&s->conn_list, colo_compare_connection, NULL); >> + qemu_mutex_unlock(&s->conn_list_lock); >> + } >> + >> + return NULL; >> +} >> + >> static int compare_chr_send(CharDriverState *out, >> const uint8_t *buf, >> uint32_t size) >> @@ -440,6 +545,8 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size) >> if (packet_enqueue(s, PRIMARY_IN)) { >> trace_colo_compare_main("primary: unsupported packet in"); >> compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len); >> + } else { >> + qemu_event_set(&s->event); >> } >> } else if (ret == -1) { >> qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL); >> @@ -461,6 +568,8 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size) >> trace_colo_compare_main("secondary: unsupported packet in"); >> /* should we send sec arp pkt? */ >> compare_chr_send(s->chr_out, s->sec_rs.buf, s->sec_rs.packet_len); >> + } else { >> + qemu_event_set(&s->event); >> } >> } else if (ret == -1) { >> qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL); >> @@ -519,6 +628,8 @@ static void compare_set_outdev(Object *obj, const char *value, Error **errp) >> static void colo_compare_complete(UserCreatable *uc, Error **errp) >> { >> CompareState *s = COLO_COMPARE(uc); >> + char thread_name[64]; >> + static int compare_id; >> >> if (!s->pri_indev || !s->sec_indev || !s->outdev) { >> error_setg(errp, "colo compare needs 'primary_in' ," >> @@ -564,6 +675,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) >> QTAILQ_INSERT_TAIL(&net_compares, s, next); >> >> g_queue_init(&s->conn_list); >> + qemu_event_init(&s->event, false); >> qemu_mutex_init(&s->conn_list_lock); >> s->hashtable_size = 0; >> >> @@ -572,6 +684,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) >> g_free, >> connection_destroy); >> >> + s->thread_status = COMPARE_THREAD_RUNNING; >> + sprintf(thread_name, "compare %d", compare_id); >> + qemu_thread_create(&s->thread, thread_name, >> + colo_compare_thread, s, >> + QEMU_THREAD_JOINABLE); >> + compare_id++; >> + >> return; >> } >> >> @@ -607,6 +726,13 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data) >> QTAILQ_REMOVE(&net_compares, s, next); >> } >> qemu_mutex_destroy(&s->conn_list_lock); >> + >> + if (s->thread.thread) { >> + s->thread_status = COMPARE_THREAD_EXIT; >> + qemu_event_set(&s->event); >> + qemu_thread_join(&s->thread); >> + } >> + qemu_event_destroy(&s->event); >> } >> >> static void colo_compare_init(Object *obj) >> diff --git a/trace-events b/trace-events >> index 8862288..978c47f 100644 >> --- a/trace-events >> +++ b/trace-events >> @@ -1919,3 +1919,5 @@ aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64 >> >> # net/colo-compare.c >> colo_compare_main(const char *chr) "chr: %s" >> +colo_compare_with_int(const char *sta, int size) ": %s = %d" >> +colo_compare_with_char(const char *sta, const char *stb) ": %s = %s" > > > . >
On 04/28/2016 06:31 PM, Zhang Chen wrote: >>> +/* >>> + * called from the compare thread on the primary >>> + * for compare connection >>> + */ >>> +static void colo_compare_connection(void *opaque, void *user_data) >>> +{ >>> + Connection *conn = opaque; >>> + Packet *pkt = NULL; >>> + GList *result = NULL; >>> + int ret; >>> + >>> + qemu_mutex_lock(&conn->list_lock); >>> + while (!g_queue_is_empty(&conn->primary_list) && >>> + !g_queue_is_empty(&conn->secondary_list)) { >>> + pkt = g_queue_pop_head(&conn->primary_list); >>> + result = g_queue_find_custom(&conn->secondary_list, >>> + pkt, >>> (GCompareFunc)colo_packet_compare_all); >>> + >>> + if (result) { >>> + ret = compare_chr_send(pkt->s->chr_out, pkt->data, >>> pkt->size); >>> + if (ret < 0) { >>> + error_report("colo_send_primary_packet failed"); >>> + } >>> + trace_colo_compare_main("packet same and release packet"); >>> + g_queue_remove(&conn->secondary_list, result->data); >>> + } else { >>> + trace_colo_compare_main("packet different"); >>> + g_queue_push_head(&conn->primary_list, pkt); >> Is this possible that the packet from secondary has not been arrived on >> time? If yes, do we still need to notify the checkpoint here? > > Yes,the packet of secondary may not arrived. > we will hold primary packet to next periodic checkpoint > to flush it. and more, I consider to set a timer > to flush timeout(200ms???) packet like Dave's branch. > > > Thanks > zhangchen I was wondering maybe you can merge or unify all other changes from Dave's branch?
On 04/29/2016 10:07 AM, Jason Wang wrote: > > On 04/28/2016 06:31 PM, Zhang Chen wrote: >>>> +/* >>>> + * called from the compare thread on the primary >>>> + * for compare connection >>>> + */ >>>> +static void colo_compare_connection(void *opaque, void *user_data) >>>> +{ >>>> + Connection *conn = opaque; >>>> + Packet *pkt = NULL; >>>> + GList *result = NULL; >>>> + int ret; >>>> + >>>> + qemu_mutex_lock(&conn->list_lock); >>>> + while (!g_queue_is_empty(&conn->primary_list) && >>>> + !g_queue_is_empty(&conn->secondary_list)) { >>>> + pkt = g_queue_pop_head(&conn->primary_list); >>>> + result = g_queue_find_custom(&conn->secondary_list, >>>> + pkt, >>>> (GCompareFunc)colo_packet_compare_all); >>>> + >>>> + if (result) { >>>> + ret = compare_chr_send(pkt->s->chr_out, pkt->data, >>>> pkt->size); >>>> + if (ret < 0) { >>>> + error_report("colo_send_primary_packet failed"); >>>> + } >>>> + trace_colo_compare_main("packet same and release packet"); >>>> + g_queue_remove(&conn->secondary_list, result->data); >>>> + } else { >>>> + trace_colo_compare_main("packet different"); >>>> + g_queue_push_head(&conn->primary_list, pkt); >>> Is this possible that the packet from secondary has not been arrived on >>> time? If yes, do we still need to notify the checkpoint here? >> Yes,the packet of secondary may not arrived. >> we will hold primary packet to next periodic checkpoint >> to flush it. and more, I consider to set a timer >> to flush timeout(200ms???) packet like Dave's branch. >> >> >> Thanks >> zhangchen > I was wondering maybe you can merge or unify all other changes from > Dave's branch? > Yes, I will unify some codes from Dave's colo-proxy branch. Thanks Zhang Chen > . >
* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote: > > > On 04/29/2016 10:07 AM, Jason Wang wrote: > > > >On 04/28/2016 06:31 PM, Zhang Chen wrote: > >>>>+/* > >>>>+ * called from the compare thread on the primary > >>>>+ * for compare connection > >>>>+ */ > >>>>+static void colo_compare_connection(void *opaque, void *user_data) > >>>>+{ > >>>>+ Connection *conn = opaque; > >>>>+ Packet *pkt = NULL; > >>>>+ GList *result = NULL; > >>>>+ int ret; > >>>>+ > >>>>+ qemu_mutex_lock(&conn->list_lock); > >>>>+ while (!g_queue_is_empty(&conn->primary_list) && > >>>>+ !g_queue_is_empty(&conn->secondary_list)) { > >>>>+ pkt = g_queue_pop_head(&conn->primary_list); > >>>>+ result = g_queue_find_custom(&conn->secondary_list, > >>>>+ pkt, > >>>>(GCompareFunc)colo_packet_compare_all); > >>>>+ > >>>>+ if (result) { > >>>>+ ret = compare_chr_send(pkt->s->chr_out, pkt->data, > >>>>pkt->size); > >>>>+ if (ret < 0) { > >>>>+ error_report("colo_send_primary_packet failed"); > >>>>+ } > >>>>+ trace_colo_compare_main("packet same and release packet"); > >>>>+ g_queue_remove(&conn->secondary_list, result->data); > >>>>+ } else { > >>>>+ trace_colo_compare_main("packet different"); > >>>>+ g_queue_push_head(&conn->primary_list, pkt); > >>>Is this possible that the packet from secondary has not been arrived on > >>>time? If yes, do we still need to notify the checkpoint here? > >>Yes,the packet of secondary may not arrived. > >>we will hold primary packet to next periodic checkpoint > >>to flush it. and more, I consider to set a timer > >>to flush timeout(200ms???) packet like Dave's branch. > >> > >> > >>Thanks > >>zhangchen > >I was wondering maybe you can merge or unify all other changes from > >Dave's branch? > > > > Yes, I will unify some codes from Dave's colo-proxy branch. Of course always check what I've written; some of that branch was quite hacky itself so don't just assume it's good! Dave > > Thanks > Zhang Chen > > >. > > > > -- > Thanks > zhangchen > > > -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff --git a/net/colo-compare.c b/net/colo-compare.c index dc57eac..4b5a2d4 100644 --- a/net/colo-compare.c +++ b/net/colo-compare.c @@ -26,6 +26,7 @@ #include "qemu/jhash.h" #include "net/eth.h" +#define DEBUG_TCP_COMPARE 1 #define TYPE_COLO_COMPARE "colo-compare" #define COLO_COMPARE(obj) \ OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) @@ -90,6 +91,13 @@ typedef struct CompareState { GQueue unprocessed_connections; /* proxy current hash size */ uint32_t hashtable_size; + + /* notify compare thread */ + QemuEvent event; + /* compare thread, a thread for each NIC */ + QemuThread thread; + int thread_status; + } CompareState; typedef struct CompareClass { @@ -132,6 +140,15 @@ enum { SECONDARY_IN, }; +enum { + /* compare thread isn't started */ + COMPARE_THREAD_NONE, + /* compare thread is running */ + COMPARE_THREAD_RUNNING, + /* compare thread exit */ + COMPARE_THREAD_EXIT, +}; + static void packet_destroy(void *opaque, void *user_data); static int compare_chr_send(CharDriverState *out, const uint8_t *buf, @@ -336,6 +353,94 @@ static void packet_destroy(void *opaque, void *user_data) g_slice_free(Packet, pkt); } +static inline void colo_dump_packet(Packet *pkt) +{ + int i; + for (i = 0; i < pkt->size; i++) { + printf("%02x ", ((uint8_t *)pkt->data)[i]); + } + printf("\n"); +} + +/* + * The IP packets sent by primary and secondary + * will be compared in here + * TODO support ip fragment, Out-Of-Order + * return: 0 means packet same + * > 0 || < 0 means packet different + */ +static int colo_packet_compare(Packet *ppkt, Packet *spkt) +{ + trace_colo_compare_with_int("ppkt size", ppkt->size); + trace_colo_compare_with_char("ppkt ip_src", inet_ntoa(ppkt->ip->ip_src)); + trace_colo_compare_with_char("ppkt ip_dst", inet_ntoa(ppkt->ip->ip_dst)); + trace_colo_compare_with_int("spkt size", spkt->size); + trace_colo_compare_with_char("spkt ip_src", inet_ntoa(spkt->ip->ip_src)); + trace_colo_compare_with_char("spkt ip_dst", inet_ntoa(spkt->ip->ip_dst)); + + if (ppkt->size == spkt->size) { + return memcmp(ppkt->data, spkt->data, spkt->size); + } else { + return -1; + } +} + +static int colo_packet_compare_all(Packet *spkt, Packet *ppkt) +{ + trace_colo_compare_main("compare all"); + return colo_packet_compare(ppkt, spkt); +} + +/* + * called from the compare thread on the primary + * for compare connection + */ +static void colo_compare_connection(void *opaque, void *user_data) +{ + Connection *conn = opaque; + Packet *pkt = NULL; + GList *result = NULL; + int ret; + + qemu_mutex_lock(&conn->list_lock); + while (!g_queue_is_empty(&conn->primary_list) && + !g_queue_is_empty(&conn->secondary_list)) { + pkt = g_queue_pop_head(&conn->primary_list); + result = g_queue_find_custom(&conn->secondary_list, + pkt, (GCompareFunc)colo_packet_compare_all); + + if (result) { + ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size); + if (ret < 0) { + error_report("colo_send_primary_packet failed"); + } + trace_colo_compare_main("packet same and release packet"); + g_queue_remove(&conn->secondary_list, result->data); + } else { + trace_colo_compare_main("packet different"); + g_queue_push_head(&conn->primary_list, pkt); + /* TODO: colo_notify_checkpoint();*/ + break; + } + } + qemu_mutex_unlock(&conn->list_lock); +} + +static void *colo_compare_thread(void *opaque) +{ + CompareState *s = opaque; + + while (s->thread_status == COMPARE_THREAD_RUNNING) { + qemu_event_wait(&s->event); + qemu_event_reset(&s->event); + qemu_mutex_lock(&s->conn_list_lock); + g_queue_foreach(&s->conn_list, colo_compare_connection, NULL); + qemu_mutex_unlock(&s->conn_list_lock); + } + + return NULL; +} + static int compare_chr_send(CharDriverState *out, const uint8_t *buf, uint32_t size) @@ -440,6 +545,8 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size) if (packet_enqueue(s, PRIMARY_IN)) { trace_colo_compare_main("primary: unsupported packet in"); compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len); + } else { + qemu_event_set(&s->event); } } else if (ret == -1) { qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL); @@ -461,6 +568,8 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size) trace_colo_compare_main("secondary: unsupported packet in"); /* should we send sec arp pkt? */ compare_chr_send(s->chr_out, s->sec_rs.buf, s->sec_rs.packet_len); + } else { + qemu_event_set(&s->event); } } else if (ret == -1) { qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL); @@ -519,6 +628,8 @@ static void compare_set_outdev(Object *obj, const char *value, Error **errp) static void colo_compare_complete(UserCreatable *uc, Error **errp) { CompareState *s = COLO_COMPARE(uc); + char thread_name[64]; + static int compare_id; if (!s->pri_indev || !s->sec_indev || !s->outdev) { error_setg(errp, "colo compare needs 'primary_in' ," @@ -564,6 +675,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) QTAILQ_INSERT_TAIL(&net_compares, s, next); g_queue_init(&s->conn_list); + qemu_event_init(&s->event, false); qemu_mutex_init(&s->conn_list_lock); s->hashtable_size = 0; @@ -572,6 +684,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) g_free, connection_destroy); + s->thread_status = COMPARE_THREAD_RUNNING; + sprintf(thread_name, "compare %d", compare_id); + qemu_thread_create(&s->thread, thread_name, + colo_compare_thread, s, + QEMU_THREAD_JOINABLE); + compare_id++; + return; } @@ -607,6 +726,13 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data) QTAILQ_REMOVE(&net_compares, s, next); } qemu_mutex_destroy(&s->conn_list_lock); + + if (s->thread.thread) { + s->thread_status = COMPARE_THREAD_EXIT; + qemu_event_set(&s->event); + qemu_thread_join(&s->thread); + } + qemu_event_destroy(&s->event); } static void colo_compare_init(Object *obj) diff --git a/trace-events b/trace-events index 8862288..978c47f 100644 --- a/trace-events +++ b/trace-events @@ -1919,3 +1919,5 @@ aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64 # net/colo-compare.c colo_compare_main(const char *chr) "chr: %s" +colo_compare_with_int(const char *sta, int size) ": %s = %d" +colo_compare_with_char(const char *sta, const char *stb) ": %s = %s"