Message ID | 1469497794-16976-3-git-send-email-zhangchen.fnst@cn.fujitsu.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
On 2016年07月26日 09:49, Zhang Chen wrote: > COLO-base used by colo-compare and filter-rewriter. s/used/is used/ > this can share common data structure like:net packet, s/this/This/ and ':' looks unnecessary. > and share other functions. s/share// > > Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com> > Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com> > Signed-off-by: Wen Congyang <wency@cn.fujitsu.com> > --- > net/Makefile.objs | 1 + > net/colo-base.c | 74 +++++++++++++++++++++++++++++++++ > net/colo-base.h | 38 +++++++++++++++++ > net/colo-compare.c | 119 ++++++++++++++++++++++++++++++++++++++++++++++++++++- > trace-events | 3 ++ > 5 files changed, 233 insertions(+), 2 deletions(-) > create mode 100644 net/colo-base.c > create mode 100644 net/colo-base.h > > diff --git a/net/Makefile.objs b/net/Makefile.objs > index ba92f73..119589f 100644 > --- a/net/Makefile.objs > +++ b/net/Makefile.objs > @@ -17,3 +17,4 @@ common-obj-y += filter.o > common-obj-y += filter-buffer.o > common-obj-y += filter-mirror.o > common-obj-y += colo-compare.o > +common-obj-y += colo-base.o > diff --git a/net/colo-base.c b/net/colo-base.c > new file mode 100644 > index 0000000..f5d5de9 > --- /dev/null > +++ b/net/colo-base.c I was thinking maybe just use colo.c is better here. > @@ -0,0 +1,74 @@ > +/* > + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) > + * (a.k.a. Fault Tolerance or Continuous Replication) > + * > + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. > + * Copyright (c) 2016 FUJITSU LIMITED > + * Copyright (c) 2016 Intel Corporation > + * > + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com> > + * > + * This work is licensed under the terms of the GNU GPL, version 2 or > + * later. See the COPYING file in the top-level directory. > + */ > + > +#include "qemu/osdep.h" > +#include "qemu/error-report.h" > +#include "net/colo-base.h" > + > +int parse_packet_early(Packet *pkt) > +{ > + int network_length; > + uint8_t *data = pkt->data; > + uint16_t l3_proto; > + ssize_t l2hdr_len = eth_get_l2_hdr_length(data); > + > + if (pkt->size < ETH_HLEN) { > + error_report("pkt->size < ETH_HLEN"); > + return 1; > + } > + pkt->network_layer = data + ETH_HLEN; > + l3_proto = eth_get_l3_proto(data, l2hdr_len); > + if (l3_proto != ETH_P_IP) { > + return 1; > + } > + > + network_length = pkt->ip->ip_hl * 4; > + if (pkt->size < ETH_HLEN + network_length) { > + error_report("pkt->size < network_layer + network_length"); > + return 1; > + } > + pkt->transport_layer = pkt->network_layer + network_length; > + if (!pkt->transport_layer) { Looks like at least pkt->network_layer can't be zero, so I wonder how this condition were met. > + error_report("pkt->transport_layer is valid"); > + return 1; > + } > + > + return 0; > +} > + > +Packet *packet_new(const void *data, int size) > +{ > + Packet *pkt = g_slice_new(Packet); > + > + pkt->data = g_memdup(data, size); > + pkt->size = size; > + > + return pkt; > +} > + > +void packet_destroy(void *opaque, void *user_data) What's the reason of using void * instead of Packet * directly? And user_data were not used in this function. > +{ > + Packet *pkt = opaque; > + > + g_free(pkt->data); > + g_slice_free(Packet, pkt); > +} > + > +/* > + * Clear hashtable, stop this hash growing really huge > + */ > +void connection_hashtable_reset(GHashTable *connection_track_table) > +{ > + g_hash_table_remove_all(connection_track_table); > +} > diff --git a/net/colo-base.h b/net/colo-base.h > new file mode 100644 > index 0000000..48835e7 > --- /dev/null > +++ b/net/colo-base.h > @@ -0,0 +1,38 @@ > +/* > + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) > + * (a.k.a. Fault Tolerance or Continuous Replication) > + * > + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. > + * Copyright (c) 2016 FUJITSU LIMITED > + * Copyright (c) 2016 Intel Corporation > + * > + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com> > + * > + * This work is licensed under the terms of the GNU GPL, version 2 or > + * later. See the COPYING file in the top-level directory. > + */ > + > +#ifndef QEMU_COLO_BASE_H > +#define QEMU_COLO_BASE_H > + > +#include "slirp/slirp.h" > +#include "qemu/jhash.h" > + > +#define HASHTABLE_MAX_SIZE 16384 > + > +typedef struct Packet { > + void *data; > + union { > + uint8_t *network_layer; I think it would be better to use "network_header" here. > + struct ip *ip; > + }; > + uint8_t *transport_layer; And "transport_header" here. And why not union tcp header here? I'm asking since ip header were unioned above. > + int size; > +} Packet; > + > +int parse_packet_early(Packet *pkt); > +void connection_hashtable_reset(GHashTable *connection_track_table); > +Packet *packet_new(const void *data, int size); > +void packet_destroy(void *opaque, void *user_data); > + > +#endif /* QEMU_COLO_BASE_H */ > diff --git a/net/colo-compare.c b/net/colo-compare.c > index 0402958..7c52cc8 100644 > --- a/net/colo-compare.c > +++ b/net/colo-compare.c > @@ -27,13 +27,38 @@ > #include "sysemu/char.h" > #include "qemu/sockets.h" > #include "qapi-visit.h" > +#include "net/colo-base.h" > +#include "trace.h" > > #define TYPE_COLO_COMPARE "colo-compare" > #define COLO_COMPARE(obj) \ > OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) > > #define COMPARE_READ_LEN_MAX NET_BUFSIZE > +#define MAX_QUEUE_SIZE 1024 > > +/* > + + CompareState ++ > + | | > + +---------------+ +---------------+ +---------------+ > + |conn list +--->conn +--------->conn | > + +---------------+ +---------------+ +---------------+ > + | | | | | | > + +---------------+ +---v----+ +---v----+ +---v----+ +---v----+ > + |primary | |secondary |primary | |secondary > + |packet | |packet + |packet | |packet + > + +--------+ +--------+ +--------+ +--------+ > + | | | | > + +---v----+ +---v----+ +---v----+ +---v----+ > + |primary | |secondary |primary | |secondary > + |packet | |packet + |packet | |packet + > + +--------+ +--------+ +--------+ +--------+ > + | | | | > + +---v----+ +---v----+ +---v----+ +---v----+ > + |primary | |secondary |primary | |secondary > + |packet | |packet + |packet | |packet + > + +--------+ +--------+ +--------+ +--------+ > +*/ > typedef struct CompareState { > Object parent; > > @@ -46,12 +71,89 @@ typedef struct CompareState { > QTAILQ_ENTRY(CompareState) next; > SocketReadState pri_rs; > SocketReadState sec_rs; > + > + /* hashtable to save connection */ > + GHashTable *connection_track_table; > + /* to save unprocessed_connections */ > + GQueue unprocessed_connections; > + /* proxy current hash size */ > + uint32_t hashtable_size; > } CompareState; > > typedef struct CompareClass { > ObjectClass parent_class; > } CompareClass; > > +enum { > + PRIMARY_IN = 0, > + SECONDARY_IN, > +}; > + > +static int compare_chr_send(CharDriverState *out, > + const uint8_t *buf, > + uint32_t size); > + > +/* > + * Return 0 on success, if return -1 means the pkt > + * is unsupported(arp and ipv6) and will be sent later > + */ > +static int packet_enqueue(CompareState *s, int mode) > +{ > + Packet *pkt = NULL; > + > + if (mode == PRIMARY_IN) { > + pkt = packet_new(s->pri_rs.buf, s->pri_rs.packet_len); > + } else { > + pkt = packet_new(s->sec_rs.buf, s->sec_rs.packet_len); > + } > + > + if (parse_packet_early(pkt)) { > + packet_destroy(pkt, NULL); > + pkt = NULL; > + return -1; > + } > + /* TODO: get connection key from pkt */ > + > + /* > + * TODO: use connection key get conn from > + * connection_track_table > + */ > + > + /* > + * TODO: insert pkt to it's conn->primary_list > + * or conn->secondary_list > + */ > + > + return 0; > +} > + > +static int compare_chr_send(CharDriverState *out, > + const uint8_t *buf, > + uint32_t size) > +{ > + int ret = 0; > + uint32_t len = htonl(size); > + > + if (!size) { > + return 0; > + } > + > + ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len)); > + if (ret != sizeof(len)) { > + goto err; > + } > + > + ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size); > + if (ret != size) { > + goto err; > + } > + > + return 0; > + > +err: > + return ret < 0 ? ret : -EIO; > +} > + > static char *compare_get_pri_indev(Object *obj, Error **errp) > { > CompareState *s = COLO_COMPARE(obj); > @@ -99,12 +201,21 @@ static void compare_set_outdev(Object *obj, const char *value, Error **errp) > > static void compare_pri_rs_finalize(SocketReadState *pri_rs) > { > - /* if packet_enqueue pri pkt failed we will send unsupported packet */ > + CompareState *s = container_of(pri_rs, CompareState, pri_rs); > + > + if (packet_enqueue(s, PRIMARY_IN)) { > + trace_colo_compare_main("primary: unsupported packet in"); > + compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len); > + } > } > > static void compare_sec_rs_finalize(SocketReadState *sec_rs) > { > - /* if packet_enqueue sec pkt failed we will notify trace */ > + CompareState *s = container_of(sec_rs, CompareState, sec_rs); > + > + if (packet_enqueue(s, SECONDARY_IN)) { > + trace_colo_compare_main("secondary: unsupported packet in"); > + } > } > > /* > @@ -156,6 +267,10 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) > net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize); > net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize); > > + s->hashtable_size = 0; > + > + /* use g_hash_table_new_full() to new a hashtable */ > + > return; > } > > diff --git a/trace-events b/trace-events > index ca7211b..703de1a 100644 > --- a/trace-events > +++ b/trace-events > @@ -1916,3 +1916,6 @@ aspeed_vic_update_fiq(int flags) "Raising FIQ: %d" > aspeed_vic_update_irq(int flags) "Raising IRQ: %d" > aspeed_vic_read(uint64_t offset, unsigned size, uint32_t value) "From 0x%" PRIx64 " of size %u: 0x%" PRIx32 > aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64 " of size %u: 0x%" PRIx32 > + > +# net/colo-compare.c > +colo_compare_main(const char *chr) ": %s"
On 08/02/2016 02:38 PM, Jason Wang wrote: > > > On 2016年07月26日 09:49, Zhang Chen wrote: >> COLO-base used by colo-compare and filter-rewriter. > > s/used/is used/ > OK~ >> this can share common data structure like:net packet, > > s/this/This/ and ':' looks unnecessary. OK~ >> and share other functions. > > s/share// > OK~ >> >> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com> >> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com> >> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com> >> --- >> net/Makefile.objs | 1 + >> net/colo-base.c | 74 +++++++++++++++++++++++++++++++++ >> net/colo-base.h | 38 +++++++++++++++++ >> net/colo-compare.c | 119 >> ++++++++++++++++++++++++++++++++++++++++++++++++++++- >> trace-events | 3 ++ >> 5 files changed, 233 insertions(+), 2 deletions(-) >> create mode 100644 net/colo-base.c >> create mode 100644 net/colo-base.h >> >> diff --git a/net/Makefile.objs b/net/Makefile.objs >> index ba92f73..119589f 100644 >> --- a/net/Makefile.objs >> +++ b/net/Makefile.objs >> @@ -17,3 +17,4 @@ common-obj-y += filter.o >> common-obj-y += filter-buffer.o >> common-obj-y += filter-mirror.o >> common-obj-y += colo-compare.o >> +common-obj-y += colo-base.o >> diff --git a/net/colo-base.c b/net/colo-base.c >> new file mode 100644 >> index 0000000..f5d5de9 >> --- /dev/null >> +++ b/net/colo-base.c > > I was thinking maybe just use colo.c is better here. > If you think that OK, I will change the name in next version, But it haven't COLO function in this .c, just some packet and connection data structure or common function. >> @@ -0,0 +1,74 @@ >> +/* >> + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service >> (COLO) >> + * (a.k.a. Fault Tolerance or Continuous Replication) >> + * >> + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. >> + * Copyright (c) 2016 FUJITSU LIMITED >> + * Copyright (c) 2016 Intel Corporation >> + * >> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com> >> + * >> + * This work is licensed under the terms of the GNU GPL, version 2 or >> + * later. See the COPYING file in the top-level directory. >> + */ >> + >> +#include "qemu/osdep.h" >> +#include "qemu/error-report.h" >> +#include "net/colo-base.h" >> + >> +int parse_packet_early(Packet *pkt) >> +{ >> + int network_length; >> + uint8_t *data = pkt->data; >> + uint16_t l3_proto; >> + ssize_t l2hdr_len = eth_get_l2_hdr_length(data); >> + >> + if (pkt->size < ETH_HLEN) { >> + error_report("pkt->size < ETH_HLEN"); >> + return 1; >> + } >> + pkt->network_layer = data + ETH_HLEN; >> + l3_proto = eth_get_l3_proto(data, l2hdr_len); >> + if (l3_proto != ETH_P_IP) { >> + return 1; >> + } >> + >> + network_length = pkt->ip->ip_hl * 4; >> + if (pkt->size < ETH_HLEN + network_length) { >> + error_report("pkt->size < network_layer + network_length"); >> + return 1; >> + } >> + pkt->transport_layer = pkt->network_layer + network_length; >> + if (!pkt->transport_layer) { > > Looks like at least pkt->network_layer can't be zero, so I wonder how > this condition were met. I will remove this check in next version. > >> + error_report("pkt->transport_layer is valid"); >> + return 1; >> + } >> + >> + return 0; >> +} >> + >> +Packet *packet_new(const void *data, int size) >> +{ >> + Packet *pkt = g_slice_new(Packet); >> + >> + pkt->data = g_memdup(data, size); >> + pkt->size = size; >> + >> + return pkt; >> +} >> + >> +void packet_destroy(void *opaque, void *user_data) > > What's the reason of using void * instead of Packet * directly? And > user_data were not used in this function. > If we change void *opaque to Packet *pkt make it have this warning... net/colo-base.c:138: warning: passing argument 2 of 'g_queue_foreach' from incompatible pointer type g_queue_foreach(&conn->primary_list, packet_destroy, NULL); >> +{ >> + Packet *pkt = opaque; >> + >> + g_free(pkt->data); >> + g_slice_free(Packet, pkt); >> +} >> + >> +/* >> + * Clear hashtable, stop this hash growing really huge >> + */ >> +void connection_hashtable_reset(GHashTable *connection_track_table) >> +{ >> + g_hash_table_remove_all(connection_track_table); >> +} >> diff --git a/net/colo-base.h b/net/colo-base.h >> new file mode 100644 >> index 0000000..48835e7 >> --- /dev/null >> +++ b/net/colo-base.h >> @@ -0,0 +1,38 @@ >> +/* >> + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service >> (COLO) >> + * (a.k.a. Fault Tolerance or Continuous Replication) >> + * >> + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. >> + * Copyright (c) 2016 FUJITSU LIMITED >> + * Copyright (c) 2016 Intel Corporation >> + * >> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com> >> + * >> + * This work is licensed under the terms of the GNU GPL, version 2 or >> + * later. See the COPYING file in the top-level directory. >> + */ >> + >> +#ifndef QEMU_COLO_BASE_H >> +#define QEMU_COLO_BASE_H >> + >> +#include "slirp/slirp.h" >> +#include "qemu/jhash.h" >> + >> +#define HASHTABLE_MAX_SIZE 16384 >> + >> +typedef struct Packet { >> + void *data; >> + union { >> + uint8_t *network_layer; > > I think it would be better to use "network_header" here. OK,will fix in next version. > >> + struct ip *ip; >> + }; >> + uint8_t *transport_layer; > > And "transport_header" here. OK. > > And why not union tcp header here? I'm asking since ip header were > unioned above. I don't understand what you mean, can you give a example? Thanks Zhang Chen > >> + int size; >> +} Packet; >> + >> +int parse_packet_early(Packet *pkt); >> +void connection_hashtable_reset(GHashTable *connection_track_table); >> +Packet *packet_new(const void *data, int size); >> +void packet_destroy(void *opaque, void *user_data); >> + >> +#endif /* QEMU_COLO_BASE_H */ >> diff --git a/net/colo-compare.c b/net/colo-compare.c >> index 0402958..7c52cc8 100644 >> --- a/net/colo-compare.c >> +++ b/net/colo-compare.c >> @@ -27,13 +27,38 @@ >> #include "sysemu/char.h" >> #include "qemu/sockets.h" >> #include "qapi-visit.h" >> +#include "net/colo-base.h" >> +#include "trace.h" >> #define TYPE_COLO_COMPARE "colo-compare" >> #define COLO_COMPARE(obj) \ >> OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) >> #define COMPARE_READ_LEN_MAX NET_BUFSIZE >> +#define MAX_QUEUE_SIZE 1024 >> +/* >> + + CompareState ++ >> + | | >> + +---------------+ +---------------+ +---------------+ >> + |conn list +--->conn +--------->conn | >> + +---------------+ +---------------+ +---------------+ >> + | | | | | | >> + +---------------+ +---v----+ +---v----+ +---v----+ +---v----+ >> + |primary | |secondary |primary | |secondary >> + |packet | |packet + |packet | |packet + >> + +--------+ +--------+ +--------+ +--------+ >> + | | | | >> + +---v----+ +---v----+ +---v----+ +---v----+ >> + |primary | |secondary |primary | |secondary >> + |packet | |packet + |packet | |packet + >> + +--------+ +--------+ +--------+ +--------+ >> + | | | | >> + +---v----+ +---v----+ +---v----+ +---v----+ >> + |primary | |secondary |primary | |secondary >> + |packet | |packet + |packet | |packet + >> + +--------+ +--------+ +--------+ +--------+ >> +*/ >> typedef struct CompareState { >> Object parent; >> @@ -46,12 +71,89 @@ typedef struct CompareState { >> QTAILQ_ENTRY(CompareState) next; >> SocketReadState pri_rs; >> SocketReadState sec_rs; >> + >> + /* hashtable to save connection */ >> + GHashTable *connection_track_table; >> + /* to save unprocessed_connections */ >> + GQueue unprocessed_connections; >> + /* proxy current hash size */ >> + uint32_t hashtable_size; >> } CompareState; >> typedef struct CompareClass { >> ObjectClass parent_class; >> } CompareClass; >> +enum { >> + PRIMARY_IN = 0, >> + SECONDARY_IN, >> +}; >> + >> +static int compare_chr_send(CharDriverState *out, >> + const uint8_t *buf, >> + uint32_t size); >> + >> +/* >> + * Return 0 on success, if return -1 means the pkt >> + * is unsupported(arp and ipv6) and will be sent later >> + */ >> +static int packet_enqueue(CompareState *s, int mode) >> +{ >> + Packet *pkt = NULL; >> + >> + if (mode == PRIMARY_IN) { >> + pkt = packet_new(s->pri_rs.buf, s->pri_rs.packet_len); >> + } else { >> + pkt = packet_new(s->sec_rs.buf, s->sec_rs.packet_len); >> + } >> + >> + if (parse_packet_early(pkt)) { >> + packet_destroy(pkt, NULL); >> + pkt = NULL; >> + return -1; >> + } >> + /* TODO: get connection key from pkt */ >> + >> + /* >> + * TODO: use connection key get conn from >> + * connection_track_table >> + */ >> + >> + /* >> + * TODO: insert pkt to it's conn->primary_list >> + * or conn->secondary_list >> + */ >> + >> + return 0; >> +} >> + >> +static int compare_chr_send(CharDriverState *out, >> + const uint8_t *buf, >> + uint32_t size) >> +{ >> + int ret = 0; >> + uint32_t len = htonl(size); >> + >> + if (!size) { >> + return 0; >> + } >> + >> + ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len)); >> + if (ret != sizeof(len)) { >> + goto err; >> + } >> + >> + ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size); >> + if (ret != size) { >> + goto err; >> + } >> + >> + return 0; >> + >> +err: >> + return ret < 0 ? ret : -EIO; >> +} >> + >> static char *compare_get_pri_indev(Object *obj, Error **errp) >> { >> CompareState *s = COLO_COMPARE(obj); >> @@ -99,12 +201,21 @@ static void compare_set_outdev(Object *obj, >> const char *value, Error **errp) >> static void compare_pri_rs_finalize(SocketReadState *pri_rs) >> { >> - /* if packet_enqueue pri pkt failed we will send unsupported >> packet */ >> + CompareState *s = container_of(pri_rs, CompareState, pri_rs); >> + >> + if (packet_enqueue(s, PRIMARY_IN)) { >> + trace_colo_compare_main("primary: unsupported packet in"); >> + compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len); >> + } >> } >> static void compare_sec_rs_finalize(SocketReadState *sec_rs) >> { >> - /* if packet_enqueue sec pkt failed we will notify trace */ >> + CompareState *s = container_of(sec_rs, CompareState, sec_rs); >> + >> + if (packet_enqueue(s, SECONDARY_IN)) { >> + trace_colo_compare_main("secondary: unsupported packet in"); >> + } >> } >> /* >> @@ -156,6 +267,10 @@ static void colo_compare_complete(UserCreatable >> *uc, Error **errp) >> net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize); >> net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize); >> + s->hashtable_size = 0; >> + >> + /* use g_hash_table_new_full() to new a hashtable */ >> + >> return; >> } >> diff --git a/trace-events b/trace-events >> index ca7211b..703de1a 100644 >> --- a/trace-events >> +++ b/trace-events >> @@ -1916,3 +1916,6 @@ aspeed_vic_update_fiq(int flags) "Raising FIQ: %d" >> aspeed_vic_update_irq(int flags) "Raising IRQ: %d" >> aspeed_vic_read(uint64_t offset, unsigned size, uint32_t value) >> "From 0x%" PRIx64 " of size %u: 0x%" PRIx32 >> aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To >> 0x%" PRIx64 " of size %u: 0x%" PRIx32 >> + >> +# net/colo-compare.c >> +colo_compare_main(const char *chr) ": %s" > > > > . >
diff --git a/net/Makefile.objs b/net/Makefile.objs index ba92f73..119589f 100644 --- a/net/Makefile.objs +++ b/net/Makefile.objs @@ -17,3 +17,4 @@ common-obj-y += filter.o common-obj-y += filter-buffer.o common-obj-y += filter-mirror.o common-obj-y += colo-compare.o +common-obj-y += colo-base.o diff --git a/net/colo-base.c b/net/colo-base.c new file mode 100644 index 0000000..f5d5de9 --- /dev/null +++ b/net/colo-base.c @@ -0,0 +1,74 @@ +/* + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) + * (a.k.a. Fault Tolerance or Continuous Replication) + * + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. + * Copyright (c) 2016 FUJITSU LIMITED + * Copyright (c) 2016 Intel Corporation + * + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com> + * + * This work is licensed under the terms of the GNU GPL, version 2 or + * later. See the COPYING file in the top-level directory. + */ + +#include "qemu/osdep.h" +#include "qemu/error-report.h" +#include "net/colo-base.h" + +int parse_packet_early(Packet *pkt) +{ + int network_length; + uint8_t *data = pkt->data; + uint16_t l3_proto; + ssize_t l2hdr_len = eth_get_l2_hdr_length(data); + + if (pkt->size < ETH_HLEN) { + error_report("pkt->size < ETH_HLEN"); + return 1; + } + pkt->network_layer = data + ETH_HLEN; + l3_proto = eth_get_l3_proto(data, l2hdr_len); + if (l3_proto != ETH_P_IP) { + return 1; + } + + network_length = pkt->ip->ip_hl * 4; + if (pkt->size < ETH_HLEN + network_length) { + error_report("pkt->size < network_layer + network_length"); + return 1; + } + pkt->transport_layer = pkt->network_layer + network_length; + if (!pkt->transport_layer) { + error_report("pkt->transport_layer is valid"); + return 1; + } + + return 0; +} + +Packet *packet_new(const void *data, int size) +{ + Packet *pkt = g_slice_new(Packet); + + pkt->data = g_memdup(data, size); + pkt->size = size; + + return pkt; +} + +void packet_destroy(void *opaque, void *user_data) +{ + Packet *pkt = opaque; + + g_free(pkt->data); + g_slice_free(Packet, pkt); +} + +/* + * Clear hashtable, stop this hash growing really huge + */ +void connection_hashtable_reset(GHashTable *connection_track_table) +{ + g_hash_table_remove_all(connection_track_table); +} diff --git a/net/colo-base.h b/net/colo-base.h new file mode 100644 index 0000000..48835e7 --- /dev/null +++ b/net/colo-base.h @@ -0,0 +1,38 @@ +/* + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) + * (a.k.a. Fault Tolerance or Continuous Replication) + * + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. + * Copyright (c) 2016 FUJITSU LIMITED + * Copyright (c) 2016 Intel Corporation + * + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com> + * + * This work is licensed under the terms of the GNU GPL, version 2 or + * later. See the COPYING file in the top-level directory. + */ + +#ifndef QEMU_COLO_BASE_H +#define QEMU_COLO_BASE_H + +#include "slirp/slirp.h" +#include "qemu/jhash.h" + +#define HASHTABLE_MAX_SIZE 16384 + +typedef struct Packet { + void *data; + union { + uint8_t *network_layer; + struct ip *ip; + }; + uint8_t *transport_layer; + int size; +} Packet; + +int parse_packet_early(Packet *pkt); +void connection_hashtable_reset(GHashTable *connection_track_table); +Packet *packet_new(const void *data, int size); +void packet_destroy(void *opaque, void *user_data); + +#endif /* QEMU_COLO_BASE_H */ diff --git a/net/colo-compare.c b/net/colo-compare.c index 0402958..7c52cc8 100644 --- a/net/colo-compare.c +++ b/net/colo-compare.c @@ -27,13 +27,38 @@ #include "sysemu/char.h" #include "qemu/sockets.h" #include "qapi-visit.h" +#include "net/colo-base.h" +#include "trace.h" #define TYPE_COLO_COMPARE "colo-compare" #define COLO_COMPARE(obj) \ OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) #define COMPARE_READ_LEN_MAX NET_BUFSIZE +#define MAX_QUEUE_SIZE 1024 +/* + + CompareState ++ + | | + +---------------+ +---------------+ +---------------+ + |conn list +--->conn +--------->conn | + +---------------+ +---------------+ +---------------+ + | | | | | | + +---------------+ +---v----+ +---v----+ +---v----+ +---v----+ + |primary | |secondary |primary | |secondary + |packet | |packet + |packet | |packet + + +--------+ +--------+ +--------+ +--------+ + | | | | + +---v----+ +---v----+ +---v----+ +---v----+ + |primary | |secondary |primary | |secondary + |packet | |packet + |packet | |packet + + +--------+ +--------+ +--------+ +--------+ + | | | | + +---v----+ +---v----+ +---v----+ +---v----+ + |primary | |secondary |primary | |secondary + |packet | |packet + |packet | |packet + + +--------+ +--------+ +--------+ +--------+ +*/ typedef struct CompareState { Object parent; @@ -46,12 +71,89 @@ typedef struct CompareState { QTAILQ_ENTRY(CompareState) next; SocketReadState pri_rs; SocketReadState sec_rs; + + /* hashtable to save connection */ + GHashTable *connection_track_table; + /* to save unprocessed_connections */ + GQueue unprocessed_connections; + /* proxy current hash size */ + uint32_t hashtable_size; } CompareState; typedef struct CompareClass { ObjectClass parent_class; } CompareClass; +enum { + PRIMARY_IN = 0, + SECONDARY_IN, +}; + +static int compare_chr_send(CharDriverState *out, + const uint8_t *buf, + uint32_t size); + +/* + * Return 0 on success, if return -1 means the pkt + * is unsupported(arp and ipv6) and will be sent later + */ +static int packet_enqueue(CompareState *s, int mode) +{ + Packet *pkt = NULL; + + if (mode == PRIMARY_IN) { + pkt = packet_new(s->pri_rs.buf, s->pri_rs.packet_len); + } else { + pkt = packet_new(s->sec_rs.buf, s->sec_rs.packet_len); + } + + if (parse_packet_early(pkt)) { + packet_destroy(pkt, NULL); + pkt = NULL; + return -1; + } + /* TODO: get connection key from pkt */ + + /* + * TODO: use connection key get conn from + * connection_track_table + */ + + /* + * TODO: insert pkt to it's conn->primary_list + * or conn->secondary_list + */ + + return 0; +} + +static int compare_chr_send(CharDriverState *out, + const uint8_t *buf, + uint32_t size) +{ + int ret = 0; + uint32_t len = htonl(size); + + if (!size) { + return 0; + } + + ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len)); + if (ret != sizeof(len)) { + goto err; + } + + ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size); + if (ret != size) { + goto err; + } + + return 0; + +err: + return ret < 0 ? ret : -EIO; +} + static char *compare_get_pri_indev(Object *obj, Error **errp) { CompareState *s = COLO_COMPARE(obj); @@ -99,12 +201,21 @@ static void compare_set_outdev(Object *obj, const char *value, Error **errp) static void compare_pri_rs_finalize(SocketReadState *pri_rs) { - /* if packet_enqueue pri pkt failed we will send unsupported packet */ + CompareState *s = container_of(pri_rs, CompareState, pri_rs); + + if (packet_enqueue(s, PRIMARY_IN)) { + trace_colo_compare_main("primary: unsupported packet in"); + compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len); + } } static void compare_sec_rs_finalize(SocketReadState *sec_rs) { - /* if packet_enqueue sec pkt failed we will notify trace */ + CompareState *s = container_of(sec_rs, CompareState, sec_rs); + + if (packet_enqueue(s, SECONDARY_IN)) { + trace_colo_compare_main("secondary: unsupported packet in"); + } } /* @@ -156,6 +267,10 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize); net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize); + s->hashtable_size = 0; + + /* use g_hash_table_new_full() to new a hashtable */ + return; } diff --git a/trace-events b/trace-events index ca7211b..703de1a 100644 --- a/trace-events +++ b/trace-events @@ -1916,3 +1916,6 @@ aspeed_vic_update_fiq(int flags) "Raising FIQ: %d" aspeed_vic_update_irq(int flags) "Raising IRQ: %d" aspeed_vic_read(uint64_t offset, unsigned size, uint32_t value) "From 0x%" PRIx64 " of size %u: 0x%" PRIx32 aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64 " of size %u: 0x%" PRIx32 + +# net/colo-compare.c +colo_compare_main(const char *chr) ": %s"