@@ -352,6 +352,10 @@ int tracecmd_msg_recv_trace_resp(struct tracecmd_msg_handle *msg_handle,
int *nr_cpus, int *page_size,
unsigned int **ports);
+int tracecmd_msg_rcv_time_sync(struct tracecmd_msg_handle *msg_handle);
+int tracecmd_msg_snd_time_sync(struct tracecmd_msg_handle *msg_handle,
+ char *clock_str, long long *toffset);
+
int tracecmd_msg_wait_close(struct tracecmd_msg_handle *msg_handle);
/* --- Plugin handling --- */
@@ -225,6 +225,11 @@ void show_instance_file(struct buffer_instance *instance, const char *name);
int count_cpus(void);
+struct buffer_instance *clock_synch_enable(int fd, char *clock,
+ int *tx_event, int *rx_event);
+void clock_synch_disable(int fd, struct buffer_instance *instance);
+struct tep_handle *clock_synch_get_tep(struct buffer_instance *instance);
+
/* No longer in event-utils.h */
void __noreturn die(const char *fmt, ...); /* Can be overriden */
void *malloc_or_die(unsigned int size); /* Can be overridden */
@@ -404,6 +404,9 @@ static int communicate_with_client(struct tracecmd_msg_handle *msg_handle)
msg_handle->version = V3_PROTOCOL;
+ /* time sync with the v3 client */
+ tracecmd_msg_rcv_time_sync(msg_handle);
+
/* read the CPU count, the page size, and options */
if ((pagesize = tracecmd_msg_initial_setting(msg_handle)) < 0)
goto out;
@@ -18,9 +18,11 @@
#include <stdarg.h>
#include <string.h>
#include <unistd.h>
+#include <time.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <linux/types.h>
+#include <linux/vm_sockets.h>
#include "trace-cmd-local.h"
#include "trace-local.h"
@@ -28,6 +30,8 @@
typedef __u32 u32;
typedef __be32 be32;
+typedef __u64 u64;
+typedef __s64 s64;
static inline void dprint(const char *fmt, ...)
{
@@ -50,11 +54,19 @@ static inline void dprint(const char *fmt, ...)
unsigned int page_size;
+/* Try a few times to get an accurate time sync */
+static int tsync_tries = 5000;
+
struct tracecmd_msg_server {
struct tracecmd_msg_handle handle;
int done;
};
+struct clock_synch_event {
+ int id;
+ unsigned long long ts;
+};
+
static struct tracecmd_msg_server *
make_server(struct tracecmd_msg_handle *msg_handle)
{
@@ -90,6 +102,16 @@ struct tracecmd_msg_trace_resp {
be32 page_size;
} __attribute__((packed));
+struct tracecmd_msg_time_sync_init {
+ char clock[32];
+ s64 toffset;
+} __attribute__((packed));
+
+struct tracecmd_msg_time_sync {
+ u64 tlocal_rx;
+ u64 tlocal_tx;
+} __attribute__((packed));
+
struct tracecmd_msg_header {
be32 size;
be32 cmd;
@@ -97,13 +119,15 @@ struct tracecmd_msg_header {
} __attribute__((packed));
#define MSG_MAP \
- C(CLOSE, 0, 0), \
- C(TINIT, 1, sizeof(struct tracecmd_msg_tinit)), \
- C(RINIT, 2, sizeof(struct tracecmd_msg_rinit)), \
- C(SEND_DATA, 3, 0), \
- C(FIN_DATA, 4, 0), \
- C(TRACE_REQ, 5, sizeof(struct tracecmd_msg_trace_req)), \
- C(TRACE_RESP, 6, sizeof(struct tracecmd_msg_trace_resp)),
+ C(CLOSE, 0, 0), \
+ C(TINIT, 1, sizeof(struct tracecmd_msg_tinit)), \
+ C(RINIT, 2, sizeof(struct tracecmd_msg_rinit)), \
+ C(SEND_DATA, 3, 0), \
+ C(FIN_DATA, 4, 0), \
+ C(TRACE_REQ, 5, sizeof(struct tracecmd_msg_trace_req)),\
+ C(TRACE_RESP, 6, sizeof(struct tracecmd_msg_trace_resp)),\
+ C(TIME_SYNC_INIT, 7, sizeof(struct tracecmd_msg_time_sync_init)),\
+ C(TIME_SYNC, 8, sizeof(struct tracecmd_msg_time_sync)),
#undef C
#define C(a,b,c) MSG_##a = b
@@ -133,10 +157,12 @@ static const char *cmd_to_name(int cmd)
struct tracecmd_msg {
struct tracecmd_msg_header hdr;
union {
- struct tracecmd_msg_tinit tinit;
- struct tracecmd_msg_rinit rinit;
- struct tracecmd_msg_trace_req trace_req;
- struct tracecmd_msg_trace_resp trace_resp;
+ struct tracecmd_msg_tinit tinit;
+ struct tracecmd_msg_rinit rinit;
+ struct tracecmd_msg_trace_req trace_req;
+ struct tracecmd_msg_trace_resp trace_resp;
+ struct tracecmd_msg_time_sync_init time_sync_init;
+ struct tracecmd_msg_time_sync time_sync;
};
union {
struct tracecmd_msg_opt *opt;
@@ -825,6 +851,269 @@ out:
return ret;
}
+static int
+find_events_in_page(struct tep_handle *pevent, void *page,
+ int size, struct clock_synch_event *events)
+{
+ struct tep_event *event = NULL;
+ struct tep_record *last_record = NULL;
+ struct tep_record *record;
+ int id, i, j, cnt = 0;
+
+ if (size <= 0)
+ return 0;
+ for (i = 0; events[i].id; i++) {
+ if (!events[i].ts)
+ cnt++;
+ }
+
+ while (cnt) {
+ event = NULL;
+ record = tracecmd_read_page_record(pevent, page, size,
+ last_record);
+ if (!record)
+ break;
+ free_record(last_record);
+ id = tep_data_type(pevent, record);
+ for (i = 0; events[i].id; i++)
+ if (!events[i].ts && events[i].id == id) {
+ event = tep_data_event_from_type(pevent, id);
+ break;
+ }
+ if (event) {
+ for (j = 0; j < i; j++)
+ if (events[j].ts && events[j].ts > record->ts)
+ break;
+ if (j == i) {
+ events[i].ts = record->ts;
+ cnt--;
+ }
+ }
+ last_record = record;
+ }
+ free_record(last_record);
+
+ return cnt;
+}
+
+static int clock_synch_find_events(struct tep_handle *tep,
+ struct buffer_instance *instance,
+ struct clock_synch_event *events)
+{
+ struct dirent *dent;
+ int ts = 0;
+ void *page;
+ char *path;
+ char *file;
+ DIR *dir;
+ int len;
+ int fd;
+ int r;
+
+ page_size = getpagesize();
+
+ path = get_instance_file(instance, "per_cpu");
+ if (!path)
+ return ts;
+
+ dir = opendir(path);
+ if (!dir)
+ goto out;
+
+ len = strlen(path);
+ file = malloc(len + strlen("trace_pipe_raw") + 32);
+ page = malloc(page_size);
+ if (!file || !page)
+ die("Failed to allocate time_stamp info");
+
+ while ((dent = readdir(dir))) {
+
+ const char *name = dent->d_name;
+
+ if (strncmp(name, "cpu", 3) != 0)
+ continue;
+ sprintf(file, "%s/%s/trace_pipe_raw", path, name);
+ fd = open(file, O_RDONLY | O_NONBLOCK);
+ if (fd < 0)
+ continue;
+ do {
+ r = read(fd, page, page_size);
+ if (r > 0) {
+ ts = find_events_in_page(tep, page, r, events);
+ if (!ts)
+ break;
+ }
+ } while (r > 0);
+ close(fd);
+ }
+
+ free(file);
+ free(page);
+ closedir(dir);
+
+ out:
+ tracecmd_put_tracing_file(path);
+ return ts;
+}
+
+extern struct tep_handle *clock_synch_get_tep(struct buffer_instance *instance);
+
+int tracecmd_msg_rcv_time_sync(struct tracecmd_msg_handle *msg_handle)
+{
+ struct buffer_instance *vinst;
+ struct tracecmd_msg msg;
+ struct tep_handle *tep;
+ int ret;
+ char *clock;
+ struct clock_synch_event events[3];
+
+ ret = tracecmd_msg_recv(msg_handle->fd, &msg);
+ if (ret < 0 || ntohl(msg.hdr.cmd) != MSG_TIME_SYNC_INIT)
+ return 0;
+ if (!msg.time_sync_init.clock[0])
+ return 0;
+
+ clock = strdup(msg.time_sync_init.clock);
+ events[2].id = 0;
+ vinst = clock_synch_enable(msg_handle->fd, clock,
+ &events[1].id, &events[0].id);
+ tep = clock_synch_get_tep(vinst);
+ tracecmd_msg_init(MSG_TIME_SYNC_INIT, &msg);
+ tracecmd_msg_send(msg_handle->fd, &msg);
+
+ do {
+ events[0].ts = 0;
+ events[1].ts = 0;
+ ret = tracecmd_msg_recv(msg_handle->fd, &msg);
+ if (ret < 0 || ntohl(msg.hdr.cmd) != MSG_TIME_SYNC)
+ break;
+
+ tracecmd_msg_send(msg_handle->fd, &msg);
+ clock_synch_find_events(tep, vinst, events);
+ tracecmd_msg_init(MSG_TIME_SYNC, &msg);
+ msg.time_sync.tlocal_tx = htonll(events[1].ts);
+ msg.time_sync.tlocal_rx = htonll(events[0].ts);
+ tracecmd_msg_send(msg_handle->fd, &msg);
+ } while (true);
+ clock_synch_disable(msg_handle->fd, vinst);
+ if (ntohl(msg.hdr.cmd) == MSG_TIME_SYNC_INIT)
+ printf("\n\rtimeoffset %lld\n\r", msg.time_sync_init.toffset);
+ msg_free(&msg);
+ tep_free(tep);
+ free(clock);
+ return ret;
+}
+
+int tracecmd_msg_snd_time_sync(struct tracecmd_msg_handle *msg_handle,
+ char *clock_str, long long *toffset)
+{
+ static struct buffer_instance *vinst;
+ struct tep_handle *tep;
+ unsigned long long path;
+ unsigned long long min_l = 0;
+ long long ms_diff, sm_diff;
+ long long m_t1, m_t4;
+ long long s_t2, s_t3;
+ long long ptp_off, t_offs;
+ int pos = 0;
+ int neg = 0;
+ long long ptp_n = 0;
+ long long ptp_p = 0;
+ int ok_loop, sync_loop = tsync_tries;
+ struct tracecmd_msg msg_req;
+ struct tracecmd_msg msg_resp;
+ int ret = 0;
+ char *clock;
+ struct clock_synch_event events[4];
+
+ clock = clock_str;
+ if (!clock)
+ clock = "local";
+
+ tracecmd_msg_init(MSG_TIME_SYNC_INIT, &msg_req);
+ if (toffset == NULL) {
+ msg_req.time_sync_init.clock[0] = 0;
+ tracecmd_msg_send(msg_handle->fd, &msg_req);
+ return 0;
+ }
+ strncpy(msg_req.time_sync_init.clock, clock, 16);
+ tracecmd_msg_send(msg_handle->fd, &msg_req);
+ ret = tracecmd_msg_recv(msg_handle->fd, &msg_resp);
+ if (ret < 0 || ntohl(msg_resp.hdr.cmd) != MSG_TIME_SYNC_INIT)
+ return 0;
+ events[3].id = 0;
+ vinst = clock_synch_enable(msg_handle->fd, clock_str,
+ &events[0].id, &events[1].id);
+ events[2].ts = events[1].id;
+ tep = clock_synch_get_tep(vinst);
+ *toffset = 0;
+ ok_loop = 0;
+ do {
+ memset(&msg_resp, 0, sizeof(msg_resp));
+ events[0].ts = 0;
+ events[1].ts = 0;
+ events[2].ts = 0;
+ tracecmd_msg_init(MSG_TIME_SYNC, &msg_req);
+
+ tracecmd_msg_send(msg_handle->fd, &msg_req);
+ ret = tracecmd_msg_recv(msg_handle->fd, &msg_resp);
+ if (ret < 0 || ntohl(msg_resp.hdr.cmd) != MSG_TIME_SYNC)
+ break;
+ ret = tracecmd_msg_recv(msg_handle->fd, &msg_resp);
+ if (ret < 0 || ntohl(msg_resp.hdr.cmd) != MSG_TIME_SYNC)
+ break;
+ clock_synch_find_events(tep, vinst, events);
+ m_t1 = events[0].ts;
+ m_t4 = events[1].ts;
+ s_t2 = htonll(msg_resp.time_sync.tlocal_rx);
+ s_t3 = ntohll(msg_resp.time_sync.tlocal_tx);
+ if (!m_t1 || !m_t4 || !s_t2 || !s_t3)
+ continue;
+ ok_loop++;
+ path = m_t4 - m_t1;
+ ms_diff = s_t2 - m_t1;
+ sm_diff = m_t4 - s_t3;
+ if (!min_l || min_l > path) {
+ min_l = path;
+
+ if (ms_diff >= sm_diff) {
+ ptp_off = (ms_diff - sm_diff)/2;
+ } else {
+ ptp_off = (sm_diff - ms_diff)/2;
+ ptp_off = -ptp_off;
+ }
+ }
+ if (ms_diff >= sm_diff) {
+ ptp_p += (ms_diff - sm_diff)/2;
+ pos++;
+ } else {
+ ptp_n += (sm_diff - ms_diff)/2;
+ neg++;
+ }
+ } while (--sync_loop);
+
+ clock_synch_disable(msg_handle->fd, vinst);
+
+ if (pos)
+ t_offs = ptp_p/pos;
+ else if (neg)
+ t_offs = -(ptp_n/neg);
+
+ tracecmd_msg_init(MSG_TIME_SYNC_INIT, &msg_req);
+ msg_req.time_sync_init.clock[0] = 0;
+ msg_req.time_sync_init.toffset = ptp_off;
+ tracecmd_msg_send(msg_handle->fd, &msg_req);
+
+ msg_free(&msg_req);
+ msg_free(&msg_resp);
+#if 0
+ *toffset = t_offs;
+#else
+ *toffset = ptp_off;
+#endif
+ return ret;
+}
+
static int make_trace_resp(struct tracecmd_msg *msg,
int page_size, int nr_cpus, unsigned int *ports)
{
@@ -3159,7 +3159,36 @@ static void check_protocol_version(struct tracecmd_msg_handle *msg_handle)
}
}
-static struct tracecmd_msg_handle *setup_network(struct buffer_instance *instance)
+static void sync_time_with_listener_v3(struct tracecmd_msg_handle *msg_handle,
+ struct common_record_context *ctx)
+{
+ long long toffset = 0;
+
+ if (ctx->data_flags & DATA_FL_DATE ||
+ ctx->data_flags & DATA_FL_OFFSET) {
+ tracecmd_msg_snd_time_sync(msg_handle, ctx->clock, NULL);
+ return;
+ }
+
+ tracecmd_msg_snd_time_sync(msg_handle, ctx->clock, &toffset);
+
+ free(ctx->date2ts);
+ /* 20 digits + \0 */
+ ctx->date2ts = malloc(21);
+ if (ctx->date2ts) {
+ if (toffset < 0) {
+ snprintf(ctx->date2ts, 19, "0x%llx", -toffset);
+ ctx->data_flags |= DATA_FL_OFFSET_N;
+ } else {
+ snprintf(ctx->date2ts, 19, "0x%llx", toffset);
+ ctx->data_flags |= DATA_FL_OFFSET;
+ }
+ }
+
+}
+
+static struct tracecmd_msg_handle *setup_network(struct buffer_instance *instance,
+ struct common_record_context *ctx)
{
struct tracecmd_msg_handle *msg_handle = NULL;
struct addrinfo hints;
@@ -3296,12 +3325,13 @@ static void add_options(struct tracecmd_output *handle,
}
static struct tracecmd_msg_handle *
-setup_connection(struct buffer_instance *instance, struct common_record_context *ctx)
+setup_connection(struct buffer_instance *instance,
+ struct common_record_context *ctx)
{
struct tracecmd_msg_handle *msg_handle;
struct tracecmd_output *network_handle;
- msg_handle = setup_network(instance);
+ msg_handle = setup_network(instance, ctx);
/* Now create the handle through this socket */
if (msg_handle->version == V3_PROTOCOL) {
@@ -3328,7 +3358,8 @@ static void finish_network(struct tracecmd_msg_handle *msg_handle)
free(host);
}
-static void connect_to_agent(struct buffer_instance *instance)
+static void connect_to_agent(struct buffer_instance *instance,
+ struct common_record_context *ctx)
{
struct tracecmd_msg_handle *msg_handle;
int sd, nr_cpus, page_size;
@@ -3354,9 +3385,12 @@ static void connect_to_agent(struct buffer_instance *instance)
/* the msg_handle now points to the guest fd */
instance->msg_handle = msg_handle;
+
+ tracecmd_msg_rcv_time_sync(msg_handle);
}
-static void setup_guest(struct buffer_instance *instance)
+static void setup_guest(struct buffer_instance *instance,
+ struct common_record_context *ctx)
{
struct tracecmd_msg_handle *msg_handle = instance->msg_handle;
char *file;
@@ -3378,10 +3412,13 @@ static void setup_guest(struct buffer_instance *instance)
close(fd);
}
-static void setup_agent(struct buffer_instance *instance, struct common_record_context *ctx)
+static void setup_agent(struct buffer_instance *instance,
+ struct common_record_context *ctx)
{
struct tracecmd_output *network_handle;
+ sync_time_with_listener_v3(instance->msg_handle, ctx);
+
network_handle = tracecmd_create_init_fd_msg(instance->msg_handle,
listed_events);
add_options(network_handle, ctx);
@@ -3401,7 +3438,7 @@ void start_threads(enum trace_type type, struct common_record_context *ctx)
for_all_instances(instance) {
/* Start the connection now to find out how many CPUs we need */
if (instance->flags & BUFFER_FL_GUEST)
- connect_to_agent(instance);
+ connect_to_agent(instance, ctx);
total_cpu_count += instance->cpu_count;
}
@@ -3417,7 +3454,7 @@ void start_threads(enum trace_type type, struct common_record_context *ctx)
if (instance->flags & BUFFER_FL_AGENT) {
setup_agent(instance, ctx);
} else if (instance->flags & BUFFER_FL_GUEST) {
- setup_guest(instance);
+ setup_guest(instance, ctx);
} else if (host) {
instance->msg_handle = setup_connection(instance, ctx);
if (!instance->msg_handle)
@@ -5774,3 +5811,151 @@ int trace_record_agent(struct tracecmd_msg_handle *msg_handle,
free(argv_plus);
return 0;
}
+
+static void get_vsocket_params(int fd, int *lcid,
+ int *lport, int *rcid, int *rport)
+{
+ struct sockaddr_vm addr;
+ socklen_t addr_len = sizeof(addr);
+
+ memset(&addr, 0, sizeof(addr));
+ if (getsockname(fd, (struct sockaddr *)&addr, &addr_len))
+ return;
+ if (addr.svm_family != AF_VSOCK)
+ return;
+ *lport = addr.svm_port;
+ *lcid = addr.svm_cid;
+
+ memset(&addr, 0, sizeof(addr));
+ addr_len = sizeof(addr);
+ if (getpeername(fd, (struct sockaddr *)&addr, &addr_len))
+ return;
+ if (addr.svm_family != AF_VSOCK)
+ return;
+
+ *rport = addr.svm_port;
+ *rcid = addr.svm_cid;
+}
+
+static void set_clock_synch_events(int cfd, struct buffer_instance *instance,
+ bool enable, int *tx_event, int *rx_event)
+{
+ int lcid, lport, rcid, rport;
+ char buffer[255];
+ FILE *fp;
+ char *path;
+ int fd;
+
+ path = get_instance_file(instance, "events/vsock/enable");
+ fd = open(path, O_WRONLY);
+ if (fd < 0)
+ return;
+ if (enable)
+ write(fd, "1", 2);
+ else
+ write(fd, "0", 2);
+ close(fd);
+ tracecmd_put_tracing_file(path);
+
+ if (enable) {
+ get_vsocket_params(cfd, &lcid, &lport, &rcid, &rport);
+
+ snprintf(buffer, 255,
+ "src_cid==%d && src_port==%d && dst_cid==%d && dst_port==%d && len!=0",
+ rcid, rport, lcid, lport);
+ path = get_instance_file(instance,
+ "events/vsock/virtio_transport_recv_pkt/filter");
+ fd = open(path, O_WRONLY);
+ write(fd, buffer, strlen(buffer)+1);
+ close(fd);
+ tracecmd_put_tracing_file(path);
+
+ snprintf(buffer, 255,
+ "src_cid==%d && src_port==%d && dst_cid==%d && dst_port==%d && len!=0",
+ lcid, lport, rcid, rport);
+ path = get_instance_file(instance,
+ "events/vsock/virtio_transport_alloc_pkt/filter");
+ fd = open(path, O_WRONLY);
+ write(fd, buffer, strlen(buffer)+1);
+ close(fd);
+ tracecmd_put_tracing_file(path);
+
+ if (tx_event) {
+ path = get_instance_file(instance,
+ "events/vsock/virtio_transport_alloc_pkt/id");
+ fp = fopen(path, "r");
+ fgets(buffer, 255, (FILE *) fp);
+ *tx_event = atoi(buffer);
+ fclose(fp);
+ tracecmd_put_tracing_file(path);
+ }
+ if (rx_event) {
+ path = get_instance_file(instance,
+ "events/vsock/virtio_transport_recv_pkt/id");
+ fp = fopen(path, "r");
+ fgets(buffer, 255, (FILE *) fp);
+ *rx_event = atoi(buffer);
+ fclose(fp);
+ tracecmd_put_tracing_file(path);
+ }
+ }
+
+ path = get_instance_file(instance, "events/vsock/enable");
+
+ if (enable)
+ write_tracing_on(instance, 1);
+ else
+ write_tracing_on(instance, 0);
+}
+
+static void vsock_trace_reset(struct buffer_instance *vinst)
+{
+ char *path;
+ int fd;
+
+ path = get_instance_file(vinst, "trace");
+ fd = open(path, O_WRONLY);
+ tracecmd_put_tracing_file(path);
+ write(fd, "0", 2);
+ close(fd);
+}
+
+struct tep_handle *clock_synch_get_tep(struct buffer_instance *instance)
+{
+ struct tep_handle *tep = NULL;
+ char *path;
+
+ path = get_instance_dir(instance);
+ tep = tracecmd_local_events_system(path, "vsock");
+ tracecmd_put_tracing_file(path);
+
+ tep_set_file_bigendian(tep, tracecmd_host_bigendian());
+ tep_set_host_bigendian(tep, tracecmd_host_bigendian());
+
+ return tep;
+}
+
+struct buffer_instance *clock_synch_enable(int fd, char *clock,
+ int *tx_event, int *rx_event)
+{
+ char inst_name[256];
+ struct buffer_instance *vinst;
+
+ snprintf(inst_name, 256, "clock_synch-%d", rand()%100);
+ vinst = create_instance(strdup(inst_name));
+ init_instance(vinst);
+ vinst->cpu_count = local_cpu_count;
+ make_one_instances(vinst);
+ vsock_trace_reset(vinst);
+ vinst->clock = strdup(clock);
+ set_clock(vinst);
+ set_clock_synch_events(fd, vinst, true, tx_event, rx_event);
+ return vinst;
+}
+
+void clock_synch_disable(int fd, struct buffer_instance *instance)
+{
+ set_clock_synch_events(fd, instance, false, NULL, NULL);
+ tracecmd_remove_one_instance(instance);
+ /* todo: clean up the instance */
+}