Message ID | 20210916032641.1413293-4-fallentree@fb.com (mailing list archive) |
---|---|
State | Changes Requested |
Delegated to: | BPF |
Headers | show |
Series | selftests/bpf: Add parallelism to test_progs | expand |
On Wed, Sep 15, 2021 at 8:26 PM Yucong Sun <fallentree@fb.com> wrote: > > From: Yucong Sun <sunyucong@gmail.com> > > This patch modify some tests to provide serial_test_name() instead of > test_name() to indicate it must run on worker 0. On encountering these tests, > all other threads will wait on a conditional variable, which worker 0 will > signal once the tests has finished running. > > Additionally, before running the test, thread 0 also check and wait until all > other threads has finished their current work, to make sure the pinned test > really are the only test running in the system. > > After this change, all tests should pass in '-j' mode. > > Signed-off-by: Yucong Sun <sunyucong@gmail.com> > --- I don't like this approach, it's over-complicated. I see two ways to do this more simple: 1) Let main process run all the serial tests before even instantiating workers, then run parallel tests with worker. The benefit of this is that we can structure code to have main testing loop that in sequential mode will run both parallel and serial tests, while in parallel mode will run only serial tests. 2) Teach worker 0 to run all serial tests one by one on start, before any parallel tests are run. Then we need to teach other workers to wait for it or teach main process to wait for worker 0 to finish before dispatching work to worker 1+. I think this is more convoluted and complicated. I certainly prefer #1. Additional benefit is that the worker and server's code would need to work consistently with all the data structured (preserving error logs until the end), etc. So it's a good test and forcing function to unify parallel and sequential modes. WDYT? > .../selftests/bpf/prog_tests/bpf_obj_id.c | 2 +- > .../bpf/prog_tests/select_reuseport.c | 2 +- > .../testing/selftests/bpf/prog_tests/timer.c | 2 +- > .../selftests/bpf/prog_tests/xdp_bonding.c | 2 +- > .../selftests/bpf/prog_tests/xdp_link.c | 2 +- > tools/testing/selftests/bpf/test_progs.c | 112 ++++++++++++++---- > 6 files changed, 95 insertions(+), 27 deletions(-) > [...] > @@ -954,15 +969,42 @@ void *dispatch_thread(void *ctx) > > test = &prog_test_defs[current_test_idx]; > test_to_run = current_test_idx; > - current_test_idx++; > > - pthread_mutex_unlock(¤t_test_lock); > - } > + test = &prog_test_defs[test_to_run]; that's the same test as current_test_idx above?... > > - if (!test->should_run) { > - continue; > - } > + if (!test->should_run) { > + current_test_idx++; > + pthread_mutex_unlock(¤t_test_lock); > + goto next; > + } > + > + if (is_serial_test(current_test_idx)) { > + if (data->worker_id != 0) { > + if (env.debug) > + fprintf(stderr, "[%d]: Waiting for thread 0 to finish serialized test: %d.\n", > + data->worker_id, current_test_idx + 1); > + /* wait for worker 0 to pick this job up and finish */ > + pthread_cond_wait(&wait_for_worker0, ¤t_test_lock); > + pthread_mutex_unlock(¤t_test_lock); > + goto next; > + } else { > + /* wait until all other worker has parked */ > + for (int i = 1; i < env.workers; i++) { > + if (env.worker_current_test[i] != -1) { > + if (env.debug) > + fprintf(stderr, "[%d]: Waiting for other threads to finish current test...\n", data->worker_id); > + pthread_mutex_unlock(¤t_test_lock); > + usleep(1 * 1000 * 1000); hm... I wonder if this contributes to those 20 seconds run time even for very fast tests... > + goto next; > + } > + } > + } > + } else { > + current_test_idx++; > + } [...] > + while (!all_finished) { > + all_finished = true; > + for (int i = 0; i < env.workers; i++) { > + if (!dispatcher_threads[i]) > + continue; > + > + if (pthread_tryjoin_np(dispatcher_threads[i], NULL) == EBUSY) { > + all_finished = false; > + if (!env.debug) continue; > + if (env.worker_current_test[i] == -1) > + fprintf(stderr, "Still waiting for thread %d (blocked by thread 0).\n", i); > + else > + fprintf(stderr, "Still waiting for thread %d (test #%d:%s).\n", > + i, env.worker_current_test[i] + 1, > + get_test_name(env.worker_current_test[i])); > + } else { > + dispatcher_threads[i] = 0; > + } > } > + usleep(10 * 1000 * 1000); and here you have 10 seconds just waiting doing nothing... > } > + > free(dispatcher_threads); > free(env.worker_current_test); > free(data); > @@ -1326,6 +1388,12 @@ int main(int argc, char **argv) > test->should_run = true; > else > test->should_run = false; > + > + if (test->run_test == NULL && test->run_serial_test == NULL) { > + fprintf(stderr, "Test %d:%s must have either test_%s() or serial_test_%sl() defined.\n", but not both, so let's check !!test->run_test == !!test->run_serial_test to make sure that only one is specified > + test->test_num, test->test_name, test->test_name, test->test_name); > + exit(EXIT_ERR_SETUP_INFRA); > + } > } > > /* ignore workers if we are just listing */ > -- > 2.30.2 >
diff --git a/tools/testing/selftests/bpf/prog_tests/bpf_obj_id.c b/tools/testing/selftests/bpf/prog_tests/bpf_obj_id.c index 284d5921c345..eb8eeebe6935 100644 --- a/tools/testing/selftests/bpf/prog_tests/bpf_obj_id.c +++ b/tools/testing/selftests/bpf/prog_tests/bpf_obj_id.c @@ -3,7 +3,7 @@ #define nr_iters 2 -void test_bpf_obj_id(void) +void serial_test_bpf_obj_id(void) { const __u64 array_magic_value = 0xfaceb00c; const __u32 array_key = 0; diff --git a/tools/testing/selftests/bpf/prog_tests/select_reuseport.c b/tools/testing/selftests/bpf/prog_tests/select_reuseport.c index 4efd337d6a3c..b5a0b7ed4310 100644 --- a/tools/testing/selftests/bpf/prog_tests/select_reuseport.c +++ b/tools/testing/selftests/bpf/prog_tests/select_reuseport.c @@ -858,7 +858,7 @@ void test_map_type(enum bpf_map_type mt) cleanup(); } -void test_select_reuseport(void) +void serial_test_select_reuseport(void) { saved_tcp_fo = read_int_sysctl(TCP_FO_SYSCTL); if (saved_tcp_fo < 0) diff --git a/tools/testing/selftests/bpf/prog_tests/timer.c b/tools/testing/selftests/bpf/prog_tests/timer.c index 25f40e1b9967..bbd074d407fb 100644 --- a/tools/testing/selftests/bpf/prog_tests/timer.c +++ b/tools/testing/selftests/bpf/prog_tests/timer.c @@ -39,7 +39,7 @@ static int timer(struct timer *timer_skel) return 0; } -void test_timer(void) +void serial_test_timer(void) { struct timer *timer_skel = NULL; int err; diff --git a/tools/testing/selftests/bpf/prog_tests/xdp_bonding.c b/tools/testing/selftests/bpf/prog_tests/xdp_bonding.c index 370d220288a6..bb6e0d0c5f79 100644 --- a/tools/testing/selftests/bpf/prog_tests/xdp_bonding.c +++ b/tools/testing/selftests/bpf/prog_tests/xdp_bonding.c @@ -468,7 +468,7 @@ static struct bond_test_case bond_test_cases[] = { { "xdp_bonding_xor_layer34", BOND_MODE_XOR, BOND_XMIT_POLICY_LAYER34, }, }; -void test_xdp_bonding(void) +void serial_test_xdp_bonding(void) { libbpf_print_fn_t old_print_fn; struct skeletons skeletons = {}; diff --git a/tools/testing/selftests/bpf/prog_tests/xdp_link.c b/tools/testing/selftests/bpf/prog_tests/xdp_link.c index 46eed0a33c23..983ab0b47d30 100644 --- a/tools/testing/selftests/bpf/prog_tests/xdp_link.c +++ b/tools/testing/selftests/bpf/prog_tests/xdp_link.c @@ -6,7 +6,7 @@ #define IFINDEX_LO 1 -void test_xdp_link(void) +void serial_test_xdp_link(void) { __u32 duration = 0, id1, id2, id0 = 0, prog_fd1, prog_fd2, err; DECLARE_LIBBPF_OPTS(bpf_xdp_set_link_opts, opts, .old_fd = -1); diff --git a/tools/testing/selftests/bpf/test_progs.c b/tools/testing/selftests/bpf/test_progs.c index 77ed9204cc4a..c980ed766947 100644 --- a/tools/testing/selftests/bpf/test_progs.c +++ b/tools/testing/selftests/bpf/test_progs.c @@ -50,6 +50,7 @@ struct prog_test_def { const char *test_name; int test_num; void (*run_test)(void); + void (*run_serial_test)(void); bool force_log; int error_cnt; int skip_cnt; @@ -457,14 +458,17 @@ static int load_bpf_testmod(void) } /* extern declarations for test funcs */ -#define DEFINE_TEST(name) extern void test_##name(void); +#define DEFINE_TEST(name) \ + extern void test_##name(void) __weak; \ + extern void serial_test_##name(void) __weak; #include <prog_tests/tests.h> #undef DEFINE_TEST static struct prog_test_def prog_test_defs[] = { -#define DEFINE_TEST(name) { \ - .test_name = #name, \ - .run_test = &test_##name, \ +#define DEFINE_TEST(name) { \ + .test_name = #name, \ + .run_test = &test_##name, \ + .run_serial_test = &serial_test_##name, \ }, #include <prog_tests/tests.h> #undef DEFINE_TEST @@ -835,6 +839,7 @@ void sigint_handler(int signum) { static int current_test_idx = 0; static pthread_mutex_t current_test_lock; static pthread_mutex_t stdout_output_lock; +static pthread_cond_t wait_for_worker0 = PTHREAD_COND_INITIALIZER; struct test_result { int error_cnt; @@ -901,7 +906,10 @@ static void run_one_test(int test_num) { env.test = test; - test->run_test(); + if (test->run_test) + test->run_test(); + else if (test->run_serial_test) + test->run_serial_test(); /* ensure last sub-test is finalized properly */ if (test->subtest_name) @@ -925,6 +933,11 @@ static const char *get_test_name(int idx) return test->test_name; } +static inline bool is_serial_test(int idx) +{ + return prog_test_defs[idx].run_serial_test != NULL; +} + struct dispatch_data { int worker_id; int sock_fd; @@ -943,6 +956,8 @@ void *dispatch_thread(void *ctx) struct prog_test_def *test; struct test_result *result; + env.worker_current_test[data->worker_id] = -1; + /* grab a test */ { pthread_mutex_lock(¤t_test_lock); @@ -954,15 +969,42 @@ void *dispatch_thread(void *ctx) test = &prog_test_defs[current_test_idx]; test_to_run = current_test_idx; - current_test_idx++; - pthread_mutex_unlock(¤t_test_lock); - } + test = &prog_test_defs[test_to_run]; - if (!test->should_run) { - continue; - } + if (!test->should_run) { + current_test_idx++; + pthread_mutex_unlock(¤t_test_lock); + goto next; + } + + if (is_serial_test(current_test_idx)) { + if (data->worker_id != 0) { + if (env.debug) + fprintf(stderr, "[%d]: Waiting for thread 0 to finish serialized test: %d.\n", + data->worker_id, current_test_idx + 1); + /* wait for worker 0 to pick this job up and finish */ + pthread_cond_wait(&wait_for_worker0, ¤t_test_lock); + pthread_mutex_unlock(¤t_test_lock); + goto next; + } else { + /* wait until all other worker has parked */ + for (int i = 1; i < env.workers; i++) { + if (env.worker_current_test[i] != -1) { + if (env.debug) + fprintf(stderr, "[%d]: Waiting for other threads to finish current test...\n", data->worker_id); + pthread_mutex_unlock(¤t_test_lock); + usleep(1 * 1000 * 1000); + goto next; + } + } + } + } else { + current_test_idx++; + } + pthread_mutex_unlock(¤t_test_lock); + } /* run test through worker */ { @@ -1035,6 +1077,14 @@ void *dispatch_thread(void *ctx) } } /* wait for test done */ + + /* unblock all other dispatcher threads */ + if (is_serial_test(test_to_run) && data->worker_id == 0) { + current_test_idx++; + pthread_cond_broadcast(&wait_for_worker0); + } +next: + continue; } /* while (true) */ error: if (env.debug) @@ -1060,16 +1110,19 @@ static int server_main(void) { pthread_t *dispatcher_threads; struct dispatch_data *data; + int all_finished = false; dispatcher_threads = calloc(sizeof(pthread_t), env.workers); data = calloc(sizeof(struct dispatch_data), env.workers); env.worker_current_test = calloc(sizeof(int), env.workers); + for (int i = 0; i < env.workers; i++) { int rc; data[i].worker_id = i; data[i].sock_fd = env.worker_socks[i]; + env.worker_current_test[i] = -1; rc = pthread_create(&dispatcher_threads[i], NULL, dispatch_thread, &data[i]); if (rc < 0) { perror("Failed to launch dispatcher thread"); @@ -1078,19 +1131,28 @@ static int server_main(void) } /* wait for all dispatcher to finish */ - for (int i = 0; i < env.workers; i++) { - while (true) { - struct timespec timeout = { - .tv_sec = time(NULL) + 5, - .tv_nsec = 0 - }; - if (pthread_timedjoin_np(dispatcher_threads[i], NULL, &timeout) != ETIMEDOUT) - break; - if (env.debug) - fprintf(stderr, "Still waiting for thread %d (test %d).\n", - i, env.worker_current_test[i] + 1); + while (!all_finished) { + all_finished = true; + for (int i = 0; i < env.workers; i++) { + if (!dispatcher_threads[i]) + continue; + + if (pthread_tryjoin_np(dispatcher_threads[i], NULL) == EBUSY) { + all_finished = false; + if (!env.debug) continue; + if (env.worker_current_test[i] == -1) + fprintf(stderr, "Still waiting for thread %d (blocked by thread 0).\n", i); + else + fprintf(stderr, "Still waiting for thread %d (test #%d:%s).\n", + i, env.worker_current_test[i] + 1, + get_test_name(env.worker_current_test[i])); + } else { + dispatcher_threads[i] = 0; + } } + usleep(10 * 1000 * 1000); } + free(dispatcher_threads); free(env.worker_current_test); free(data); @@ -1326,6 +1388,12 @@ int main(int argc, char **argv) test->should_run = true; else test->should_run = false; + + if (test->run_test == NULL && test->run_serial_test == NULL) { + fprintf(stderr, "Test %d:%s must have either test_%s() or serial_test_%sl() defined.\n", + test->test_num, test->test_name, test->test_name, test->test_name); + exit(EXIT_ERR_SETUP_INFRA); + } } /* ignore workers if we are just listing */