From patchwork Wed Mar 1 16:10:10 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Pavel Begunkov X-Patchwork-Id: 13156211 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from vger.kernel.org (vger.kernel.org [23.128.96.18]) by smtp.lore.kernel.org (Postfix) with ESMTP id 9CD67C64ED6 for ; Wed, 1 Mar 2023 16:14:00 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S229725AbjCAQN7 (ORCPT ); Wed, 1 Mar 2023 11:13:59 -0500 Received: from lindbergh.monkeyblade.net ([23.128.96.19]:34400 "EHLO lindbergh.monkeyblade.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S229734AbjCAQN6 (ORCPT ); Wed, 1 Mar 2023 11:13:58 -0500 Received: from mail-wr1-x435.google.com (mail-wr1-x435.google.com [IPv6:2a00:1450:4864:20::435]) by lindbergh.monkeyblade.net (Postfix) with ESMTPS id 02D642BF33 for ; Wed, 1 Mar 2023 08:13:57 -0800 (PST) Received: by mail-wr1-x435.google.com with SMTP id g3so4929083wri.6 for ; Wed, 01 Mar 2023 08:13:56 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20210112; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=AnHkTJ61Q7Bqw6KixEH1vawdQtVxM2jdLrGN4p7QCGA=; b=GpAorfIqZzCT+xvSpaWcIbeE2H9U3yQ+QfAzUZycttlLNvcZe2giA1mZASOcyQ5tOg ZhJHmcLbgfbLigDX1KZ6ke+DSsKClXGAvMTKQAZ/2N3aFpPne/4QYc1wPUM2CYg9HjuB 9VkorClv4Eq98qx1NB0kwViwpP/6A77STy4HUebezAkfjD+ZJrGFcvnBgyAmSBSwjYIQ rhKAz/yYEId/0mOkFODLnLGtIpt0enT7/ijQlyyG7P4XrHG1/TjBZphTw6xEadrJpp/D rzhMHhNuKX4AHPejNoscxABQGtBa/jLUXqqkCzVuWEOXaG2awmDge2ySmisAAJSmLLQo u6Iw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20210112; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=AnHkTJ61Q7Bqw6KixEH1vawdQtVxM2jdLrGN4p7QCGA=; b=fbVqkiPy1G6uC9roC8+4+RdEXQ2lQduFW5vmFUhDk03Umj2Z/GE8idF5If2CfLbCmB a0G+Yi23tfE5czDFl1boz4KC7ekzhAsnsPlvj6eewqyaTdwmC9+sQqvI9YZ1hcWZcXOH Mu+XyTeIsg6IgL58ze9kSjoW6SspGTIbD8h1DRhWGt0/g4z6w4HL2/EV3iRj7Bc5sqqL J90YuMIxEy9Bk3bMd53GaksKqh0c3pUe7Y0I+cgT+W6YdrriksuRh6yyvc2MPFcQF7um TtCPB7z1BSazJXGXWswiwxPA5SM8i0ImUAlve6fFDALytOYAJo/G/vEoA0t+PyWXW6oz TyKg== X-Gm-Message-State: AO0yUKWgu71N+sRSfIO1m6zr25r8lnoxjMWfos389fbMi5UZM0qtDlW1 y7rIDoJronUoVHFB06PivOjTBGDk/Sc= X-Google-Smtp-Source: AK7set9wRg65i9Xnm/IRsWnQL/XC9pgJEm8ZPIRJcT+a1v+65h17fyhC6m1qZcXT5KUNQpdBPUJPaw== X-Received: by 2002:adf:f611:0:b0:2cc:498d:b902 with SMTP id t17-20020adff611000000b002cc498db902mr4965495wrp.59.1677687235378; Wed, 01 Mar 2023 08:13:55 -0800 (PST) Received: from 127.com ([2620:10d:c092:600::2:94bb]) by smtp.gmail.com with ESMTPSA id s2-20020adfeb02000000b002cda9aa1dc1sm2701474wrn.111.2023.03.01.08.13.54 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Wed, 01 Mar 2023 08:13:55 -0800 (PST) From: Pavel Begunkov To: io-uring@vger.kernel.org Cc: Jens Axboe , asml.silence@gmail.com Subject: [PATCH liburing 1/3] examples/send-zc: add affinity / CPU pinning Date: Wed, 1 Mar 2023 16:10:10 +0000 Message-Id: <28b0e6d9c7d0c6adecb17fb72f6dfefb9bb51b47.1677686850.git.asml.silence@gmail.com> X-Mailer: git-send-email 2.39.1 In-Reply-To: References: MIME-Version: 1.0 Precedence: bulk List-ID: X-Mailing-List: io-uring@vger.kernel.org Pass '-C ' to pin threads and io-wq to the specified CPU. Signed-off-by: Pavel Begunkov --- examples/send-zerocopy.c | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/examples/send-zerocopy.c b/examples/send-zerocopy.c index c9b5506..a86106f 100644 --- a/examples/send-zerocopy.c +++ b/examples/send-zerocopy.c @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -50,6 +51,7 @@ static int cfg_nr_reqs = 8; static bool cfg_fixed_buf = 1; static bool cfg_hugetlb = 0; static bool cfg_defer_taskrun = 0; +static int cfg_cpu = -1; static int cfg_family = PF_UNSPEC; static int cfg_payload_len; @@ -79,6 +81,32 @@ static void t_error(int status, int errnum, const char *format, ...) exit(status); } +static void set_cpu_affinity(void) +{ + cpu_set_t mask; + + if (cfg_cpu == -1) + return; + + CPU_ZERO(&mask); + CPU_SET(cfg_cpu, &mask); + if (sched_setaffinity(0, sizeof(mask), &mask)) + t_error(1, errno, "unable to pin cpu\n"); +} + +static void set_iowq_affinity(struct io_uring *ring) +{ + cpu_set_t mask; + int ret; + + if (cfg_cpu == -1) + return; + + ret = io_uring_register_iowq_aff(ring, 1, &mask); + if (ret) + t_error(1, ret, "unabled to set io-wq affinity\n"); +} + static unsigned long gettimeofday_ms(void) { struct timeval tv; @@ -172,6 +200,9 @@ static void do_tx(int domain, int type, int protocol) if (ret) t_error(1, ret, "io_uring: queue init"); + set_cpu_affinity(); + set_iowq_affinity(&ring); + if (cfg_fixed_files) { ret = io_uring_register_files(&ring, &fd, 1); if (ret < 0) @@ -303,7 +334,7 @@ static void parse_opts(int argc, char **argv) cfg_payload_len = max_payload_len; - while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:d")) != -1) { + while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:")) != -1) { switch (c) { case '4': if (cfg_family != PF_UNSPEC) @@ -344,6 +375,9 @@ static void parse_opts(int argc, char **argv) case 'd': cfg_defer_taskrun = 1; break; + case 'C': + cfg_cpu = strtol(optarg, NULL, 0); + break; } } @@ -363,6 +397,7 @@ int main(int argc, char **argv) const char *cfg_test; parse_opts(argc, argv); + set_cpu_affinity(); payload = payload_buf; if (cfg_hugetlb) { From patchwork Wed Mar 1 16:10:11 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Pavel Begunkov X-Patchwork-Id: 13156212 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from vger.kernel.org (vger.kernel.org [23.128.96.18]) by smtp.lore.kernel.org (Postfix) with ESMTP id C3ED2C7EE2F for ; Wed, 1 Mar 2023 16:14:01 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S229714AbjCAQOB (ORCPT ); Wed, 1 Mar 2023 11:14:01 -0500 Received: from lindbergh.monkeyblade.net ([23.128.96.19]:34410 "EHLO lindbergh.monkeyblade.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S229696AbjCAQN7 (ORCPT ); Wed, 1 Mar 2023 11:13:59 -0500 Received: from mail-wr1-x42e.google.com (mail-wr1-x42e.google.com [IPv6:2a00:1450:4864:20::42e]) by lindbergh.monkeyblade.net (Postfix) with ESMTPS id 8310423C66 for ; Wed, 1 Mar 2023 08:13:57 -0800 (PST) Received: by mail-wr1-x42e.google.com with SMTP id h11so2065693wrm.5 for ; Wed, 01 Mar 2023 08:13:57 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20210112; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=uHr5i057W1LM9EO6PQPpMU+CDIWhLlR30DFdy7/Zr8s=; b=VvmJYlSrKI/+78saX3GPIMwXfgm1Aeiqw/+krnxlVTv0DZVhByym4XaV1rTuwyc2am FFI1s3gbh3JjSGdizk9aQVeo0Yqt/+4zRUdAKzhe0q52aE0CUwv/j98aPcxh9iSvj1D6 iJuxVUWz2p5jPQh2Bn6XjJeN7mS7BI6gKhZyBB3gf6wbG4HdZVEi0xQYhKTiQMiYPEVM O1kEtIOFTwdYQrq5OltZ58BthPinIldxPEDiglzYpNYwHKC4HK+IeUcjio9S32mc5y0G X+H7ScizcylyNHKURzE1eNl9jRbNGDusYtLgOtS/83tZPuoPe21cFtjJpSqb8XTU5AfX I88Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20210112; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=uHr5i057W1LM9EO6PQPpMU+CDIWhLlR30DFdy7/Zr8s=; b=cevasl379S1qh4cX0dL9mhl9t8qWa2SwUKUa1zRZ9IdKfnigsrU8l3lKFqQT7HBdTk xzkN/klb372PbdgmkMnCF/rhKK/G9fyE1IjZ9JS944rxTjOoUuQTtzxjysdcJcoh3k4W Y/TMVzaAHBL6bzTgglepGqjl8wIT6CpveevXBJW5VZ8Vc6L3PNpOzrLKSJjvfTt3otPF 1XEXogHQymZXu+WT5VI9G+MP6d59gNPqx8yeplwqEyRKFNH9v9mIJFPmfTeHh2ssd2Uq 49yIG5bxxSWRjkNUDi5of68Vxo2FARjRl7QbSYnmUYLBg9gMTej4ZzcYfAKX7ZdRKORP ZI0Q== X-Gm-Message-State: AO0yUKUXJkEh825IjH2GIKOt2hgQk4SUHdFRUeLaT/KZhLHdsCot/3qW uDgXjOIOQqxEiu83HC00L74SPLIN2zc= X-Google-Smtp-Source: AK7set/wEILgp7IaNOuCS+8yHioYC5qCX2e1KujRzbtylAmLVtLTyNjFacvsPU3ix9/LVqrmyfe72w== X-Received: by 2002:a5d:5967:0:b0:2c7:17db:bf5c with SMTP id e39-20020a5d5967000000b002c717dbbf5cmr5223171wri.25.1677687235897; Wed, 01 Mar 2023 08:13:55 -0800 (PST) Received: from 127.com ([2620:10d:c092:600::2:94bb]) by smtp.gmail.com with ESMTPSA id s2-20020adfeb02000000b002cda9aa1dc1sm2701474wrn.111.2023.03.01.08.13.55 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Wed, 01 Mar 2023 08:13:55 -0800 (PST) From: Pavel Begunkov To: io-uring@vger.kernel.org Cc: Jens Axboe , asml.silence@gmail.com Subject: [PATCH liburing 2/3] examples/send-zc: add multithreading Date: Wed, 1 Mar 2023 16:10:11 +0000 Message-Id: X-Mailer: git-send-email 2.39.1 In-Reply-To: References: MIME-Version: 1.0 Precedence: bulk List-ID: X-Mailing-List: io-uring@vger.kernel.org '-T ' will create the specified number of threads to test in parallel. Each thread will have its own connection. Signed-off-by: Pavel Begunkov --- examples/Makefile | 3 + examples/send-zerocopy.c | 116 ++++++++++++++++++++++++++------------- 2 files changed, 81 insertions(+), 38 deletions(-) diff --git a/examples/Makefile b/examples/Makefile index e561e05..20ac53c 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -10,6 +10,9 @@ ifneq ($(MAKECMDGOALS),clean) include ../config-host.mak endif +LDFLAGS ?= +override LDFLAGS += -L../src/ -luring -lpthread + example_srcs := \ io_uring-cp.c \ io_uring-test.c \ diff --git a/examples/send-zerocopy.c b/examples/send-zerocopy.c index a86106f..c0549a1 100644 --- a/examples/send-zerocopy.c +++ b/examples/send-zerocopy.c @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -43,6 +44,16 @@ #define ZC_TAG 0xfffffffULL #define MAX_SUBMIT_NR 512 +#define MAX_THREADS 100 + +struct thread_data { + pthread_t thread; + void *ret; + int idx; + unsigned long long packets; + unsigned long long bytes; + struct sockaddr_storage dst_addr; +}; static bool cfg_reg_ringfd = true; static bool cfg_fixed_files = 1; @@ -52,17 +63,21 @@ static bool cfg_fixed_buf = 1; static bool cfg_hugetlb = 0; static bool cfg_defer_taskrun = 0; static int cfg_cpu = -1; +static unsigned cfg_nr_threads = 1; static int cfg_family = PF_UNSPEC; +static int cfg_type = 0; static int cfg_payload_len; static int cfg_port = 8000; static int cfg_runtime_ms = 4200; static socklen_t cfg_alen; -static struct sockaddr_storage cfg_dst_addr; +static char *str_addr = NULL; static char payload_buf[IP_MAXPACKET] __attribute__((aligned(4096))); static char *payload; +static struct thread_data threads[MAX_THREADS]; +static pthread_barrier_t barrier; /* * Implementation of error(3), prints an error message and exits. @@ -126,12 +141,13 @@ static void setup_sockaddr(int domain, const char *str_addr, { struct sockaddr_in6 *addr6 = (void *) sockaddr; struct sockaddr_in *addr4 = (void *) sockaddr; + int port = cfg_port; switch (domain) { case PF_INET: memset(addr4, 0, sizeof(*addr4)); addr4->sin_family = AF_INET; - addr4->sin_port = htons(cfg_port); + addr4->sin_port = htons(port); if (str_addr && inet_pton(AF_INET, str_addr, &(addr4->sin_addr)) != 1) t_error(1, 0, "ipv4 parse error: %s", str_addr); @@ -139,7 +155,7 @@ static void setup_sockaddr(int domain, const char *str_addr, case PF_INET6: memset(addr6, 0, sizeof(*addr6)); addr6->sin6_family = AF_INET6; - addr6->sin6_port = htons(cfg_port); + addr6->sin6_port = htons(port); if (str_addr && inet_pton(AF_INET6, str_addr, &(addr6->sin6_addr)) != 1) t_error(1, 0, "ipv6 parse error: %s", str_addr); @@ -149,21 +165,6 @@ static void setup_sockaddr(int domain, const char *str_addr, } } -static int do_setup_tx(int domain, int type, int protocol) -{ - int fd; - - fd = socket(domain, type, protocol); - if (fd == -1) - t_error(1, errno, "socket t"); - - do_setsockopt(fd, SOL_SOCKET, SO_SNDBUF, 1 << 21); - - if (connect(fd, (void *) &cfg_dst_addr, cfg_alen)) - t_error(1, errno, "connect"); - return fd; -} - static inline struct io_uring_cqe *wait_cqe_fast(struct io_uring *ring) { struct io_uring_cqe *cqe; @@ -179,11 +180,9 @@ static inline struct io_uring_cqe *wait_cqe_fast(struct io_uring *ring) return cqe; } -static void do_tx(int domain, int type, int protocol) +static void do_tx(struct thread_data *td, int domain, int type, int protocol) { const int notif_slack = 128; - unsigned long packets = 0; - unsigned long bytes = 0; struct io_uring ring; struct iovec iov; uint64_t tstop; @@ -194,7 +193,14 @@ static void do_tx(int domain, int type, int protocol) if (cfg_defer_taskrun) ring_flags |= IORING_SETUP_DEFER_TASKRUN; - fd = do_setup_tx(domain, type, protocol); + fd = socket(domain, type, protocol); + if (fd == -1) + t_error(1, errno, "socket t"); + + do_setsockopt(fd, SOL_SOCKET, SO_SNDBUF, 1 << 21); + + if (connect(fd, (void *)&td->dst_addr, cfg_alen)) + t_error(1, errno, "connect, idx %i", td->idx); ret = io_uring_queue_init(512, &ring, ring_flags); if (ret) @@ -221,6 +227,8 @@ static void do_tx(int domain, int type, int protocol) if (ret) t_error(1, ret, "io_uring: buffer registration"); + pthread_barrier_wait(&barrier); + tstop = gettimeofday_ms() + cfg_runtime_ms; do { struct io_uring_sqe *sqe; @@ -272,8 +280,8 @@ static void do_tx(int domain, int type, int protocol) compl_cqes++; if (cqe->res >= 0) { - packets++; - bytes += cqe->res; + td->packets++; + td->bytes += cqe->res; } else if (cqe->res == -ECONNREFUSED || cqe->res == -EPIPE || cqe->res == -ECONNRESET) { fprintf(stderr, "Connection failure"); @@ -290,11 +298,6 @@ out_fail: if (close(fd)) t_error(1, errno, "close"); - fprintf(stderr, "tx=%lu (MB=%lu), tx/s=%lu (MB/s=%lu)\n", - packets, bytes >> 20, - packets / (cfg_runtime_ms / 1000), - (bytes >> 20) / (cfg_runtime_ms / 1000)); - while (compl_cqes) { struct io_uring_cqe *cqe = wait_cqe_fast(&ring); @@ -304,14 +307,16 @@ out_fail: io_uring_queue_exit(&ring); } -static void do_test(int domain, int type, int protocol) + +static void *do_test(void *arg) { - int i; + struct thread_data *td = arg; + int protocol = 0; - for (i = 0; i < IP_MAXPACKET; i++) - payload[i] = 'a' + (i % 26); + setup_sockaddr(cfg_family, str_addr, &td->dst_addr); - do_tx(domain, type, protocol); + do_tx(td, cfg_family, cfg_type, protocol); + pthread_exit(&td->ret); } static void usage(const char *filepath) @@ -334,7 +339,7 @@ static void parse_opts(int argc, char **argv) cfg_payload_len = max_payload_len; - while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:")) != -1) { + while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:T:")) != -1) { switch (c) { case '4': if (cfg_family != PF_UNSPEC) @@ -378,6 +383,11 @@ static void parse_opts(int argc, char **argv) case 'C': cfg_cpu = strtol(optarg, NULL, 0); break; + case 'T': + cfg_nr_threads = strtol(optarg, NULL, 0); + if (cfg_nr_threads > MAX_THREADS) + t_error(1, 0, "too many threads\n"); + break; } } @@ -386,7 +396,7 @@ static void parse_opts(int argc, char **argv) if (cfg_payload_len > max_payload_len) t_error(1, 0, "-s: payload exceeds max (%d)", max_payload_len); - setup_sockaddr(cfg_family, daddr, &cfg_dst_addr); + str_addr = daddr; if (optind != argc - 1) usage(argv[0]); @@ -394,7 +404,11 @@ static void parse_opts(int argc, char **argv) int main(int argc, char **argv) { + unsigned long long packets = 0, bytes = 0; + struct thread_data *td; const char *cfg_test; + void *res; + int i; parse_opts(argc, argv); set_cpu_affinity(); @@ -412,11 +426,37 @@ int main(int argc, char **argv) cfg_test = argv[argc - 1]; if (!strcmp(cfg_test, "tcp")) - do_test(cfg_family, SOCK_STREAM, 0); + cfg_type = SOCK_STREAM; else if (!strcmp(cfg_test, "udp")) - do_test(cfg_family, SOCK_DGRAM, 0); + cfg_type = SOCK_DGRAM; else t_error(1, 0, "unknown cfg_test %s", cfg_test); + pthread_barrier_init(&barrier, NULL, cfg_nr_threads); + + for (i = 0; i < IP_MAXPACKET; i++) + payload[i] = 'a' + (i % 26); + + for (i = 0; i < cfg_nr_threads; i++) { + td = &threads[i]; + td->idx = i; + } + + for (i = 0; i < cfg_nr_threads; i++) + pthread_create(&threads[i].thread, NULL, do_test, td); + + for (i = 0; i < cfg_nr_threads; i++) { + td = &threads[i]; + pthread_join(td->thread, &res); + packets += td->packets; + bytes += td->bytes; + } + + fprintf(stderr, "tx=%llu (MB=%llu), tx/s=%llu (MB/s=%llu)\n", + packets, bytes >> 20, + packets / (cfg_runtime_ms / 1000), + (bytes >> 20) / (cfg_runtime_ms / 1000)); + + pthread_barrier_destroy(&barrier); return 0; } From patchwork Wed Mar 1 16:10:12 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Pavel Begunkov X-Patchwork-Id: 13156213 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from vger.kernel.org (vger.kernel.org [23.128.96.18]) by smtp.lore.kernel.org (Postfix) with ESMTP id 41E17C7EE32 for ; Wed, 1 Mar 2023 16:14:01 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S229632AbjCAQOA (ORCPT ); Wed, 1 Mar 2023 11:14:00 -0500 Received: from lindbergh.monkeyblade.net ([23.128.96.19]:34412 "EHLO lindbergh.monkeyblade.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S229873AbjCAQN7 (ORCPT ); Wed, 1 Mar 2023 11:13:59 -0500 Received: from mail-wr1-x436.google.com (mail-wr1-x436.google.com [IPv6:2a00:1450:4864:20::436]) by lindbergh.monkeyblade.net (Postfix) with ESMTPS id 1C20D2CC5B for ; Wed, 1 Mar 2023 08:13:58 -0800 (PST) Received: by mail-wr1-x436.google.com with SMTP id l1so10750171wry.12 for ; Wed, 01 Mar 2023 08:13:58 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20210112; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=igvNa7PyXdnplqOJFyMw8vP4ahEamqmmn2S6RC/Th3w=; b=KTNQqLSg4Ea9PF/hsWwzvkUDYQV37QxRUkEGzfcTPvbgakAyWm3GQ16z7a22x8gVao vbqwAqZp3iG4Core3oCeriZ3Erybz7Z0ewzLWocRzytcTB7nVxDj90B+OdFN1hz8mlBw +VtKw6GqXHEI3cVA4ppxw4xOj7eD3PA7ub0rrjON3NzNcXkPP0efnxPzK+rGqsUE5OlI IJVKieCtss0z8t9X/MrBiUl28FfP/Jqzq7dObfEIPjUm0WH8/sY5J+IIPNEzvimEB7QP pQKMmCycj1yAWkuUkPHd31EmWUlBb0Cj9fEVMwpCb3MTjrN7rKx5pJHLpK+Sguy7x5oi smqw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20210112; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=igvNa7PyXdnplqOJFyMw8vP4ahEamqmmn2S6RC/Th3w=; b=SpJOrH1dhDb/MbDcegLYX+KEe8rV0pecLDMBJgBE3BCLXnOSlGnLnLGg5ip3SPhJ53 GKO58Rbew9z7o4Baq7c6AUL377qNwzOMnxgKK6hCT70lRq+YY1wHXCLdTITr+lvgVBT2 AMaL+aHC3JDXy1NliVAruoOJqj7ZhkoitH366a3MW339JZr5y3WkUOl/Irw+FiH0urzX XyfIHKpoQWDJE1EhKhS3k8lYOy6IRgeUeRuTuHikkAA1Z30UDoXiOhg3308Iqg06qBFT 5La/4sdnI6bFV1NBqzZpQuF4bYNGcRObIs5wO3h2KV2qfMbN4yasb06SgxHOQDtX1mcJ RupQ== X-Gm-Message-State: AO0yUKUxoL567aBBFRJiflsfVzVmjWnRX8upwK+x00l1ujX8GnQr7dB9 4/nA5o1na0xyeuqmQn6QvIWQKalVfSI= X-Google-Smtp-Source: AK7set/K6d6xoNx43iNEowUwDtCBWYLCZ3z2R3z1e5+wsajCh8LInAD/Nn6SMMRlfUdmXjKAQjK3og== X-Received: by 2002:adf:e545:0:b0:2c7:cdf:e296 with SMTP id z5-20020adfe545000000b002c70cdfe296mr5389052wrm.66.1677687236415; Wed, 01 Mar 2023 08:13:56 -0800 (PST) Received: from 127.com ([2620:10d:c092:600::2:94bb]) by smtp.gmail.com with ESMTPSA id s2-20020adfeb02000000b002cda9aa1dc1sm2701474wrn.111.2023.03.01.08.13.55 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Wed, 01 Mar 2023 08:13:56 -0800 (PST) From: Pavel Begunkov To: io-uring@vger.kernel.org Cc: Jens Axboe , asml.silence@gmail.com Subject: [PATCH liburing 3/3] examples/send-zc: add the receive part Date: Wed, 1 Mar 2023 16:10:12 +0000 Message-Id: <00e887532822d4744741655532be4fbede0f18b0.1677686850.git.asml.silence@gmail.com> X-Mailer: git-send-email 2.39.1 In-Reply-To: References: MIME-Version: 1.0 Precedence: bulk List-ID: X-Mailing-List: io-uring@vger.kernel.org '-R' will switch the benchmark into the server mode accepting data. For TCP the number of threads should match the number of threads of the client. For UDP just one thread/connection should be enough. Signed-off-by: Pavel Begunkov --- examples/send-zerocopy.c | 146 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 143 insertions(+), 3 deletions(-) diff --git a/examples/send-zerocopy.c b/examples/send-zerocopy.c index c0549a1..75e516c 100644 --- a/examples/send-zerocopy.c +++ b/examples/send-zerocopy.c @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -53,6 +54,7 @@ struct thread_data { unsigned long long packets; unsigned long long bytes; struct sockaddr_storage dst_addr; + int fd; }; static bool cfg_reg_ringfd = true; @@ -63,6 +65,7 @@ static bool cfg_fixed_buf = 1; static bool cfg_hugetlb = 0; static bool cfg_defer_taskrun = 0; static int cfg_cpu = -1; +static bool cfg_rx = 0; static unsigned cfg_nr_threads = 1; static int cfg_family = PF_UNSPEC; @@ -165,6 +168,135 @@ static void setup_sockaddr(int domain, const char *str_addr, } } +static int do_poll(int fd, int events) +{ + struct pollfd pfd; + int ret; + + pfd.events = events; + pfd.revents = 0; + pfd.fd = fd; + + ret = poll(&pfd, 1, -1); + if (ret == -1) + t_error(1, errno, "poll"); + + return ret && (pfd.revents & events); +} + +/* Flush all outstanding bytes for the tcp receive queue */ +static int do_flush_tcp(struct thread_data *td, int fd) +{ + int ret; + + /* MSG_TRUNC flushes up to len bytes */ + ret = recv(fd, NULL, 1 << 21, MSG_TRUNC | MSG_DONTWAIT); + if (ret == -1 && errno == EAGAIN) + return 0; + if (ret == -1) + t_error(1, errno, "flush"); + if (!ret) + return 1; + + td->packets++; + td->bytes += ret; + return 0; +} + +/* Flush all outstanding datagrams. Verify first few bytes of each. */ +static int do_flush_datagram(struct thread_data *td, int fd, int type) +{ + int ret, off = 0; + char buf[64]; + + /* MSG_TRUNC will return full datagram length */ + ret = recv(fd, buf, sizeof(buf), MSG_DONTWAIT | MSG_TRUNC); + if (ret == -1 && errno == EAGAIN) + return 0; + + if (ret == -1) + t_error(1, errno, "recv"); + if (ret != cfg_payload_len) + t_error(1, 0, "recv: ret=%u != %u", ret, cfg_payload_len); + if (ret > sizeof(buf) - off) + ret = sizeof(buf) - off; + if (memcmp(buf + off, payload, ret)) + t_error(1, 0, "recv: data mismatch"); + + td->packets++; + td->bytes += cfg_payload_len; + return 0; +} + +static void do_setup_rx(int domain, int type, int protocol) +{ + struct sockaddr_storage addr = {}; + struct thread_data *td; + int listen_fd, fd, i; + + fd = socket(domain, type, protocol); + if (fd == -1) + t_error(1, errno, "socket r"); + + do_setsockopt(fd, SOL_SOCKET, SO_RCVBUF, 1 << 21); + do_setsockopt(fd, SOL_SOCKET, SO_RCVLOWAT, 1 << 16); + do_setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1); + + setup_sockaddr(cfg_family, str_addr, &addr); + + if (bind(fd, (void *)&addr, cfg_alen)) + t_error(1, errno, "bind"); + + if (type != SOCK_STREAM) { + if (cfg_nr_threads != 1) + t_error(1, 0, "udp rx cant multithread"); + threads[0].fd = fd; + return; + } + + listen_fd = fd; + if (listen(listen_fd, cfg_nr_threads)) + t_error(1, errno, "listen"); + + for (i = 0; i < cfg_nr_threads; i++) { + td = &threads[i]; + + fd = accept(listen_fd, NULL, NULL); + if (fd == -1) + t_error(1, errno, "accept"); + td->fd = fd; + } + + if (close(listen_fd)) + t_error(1, errno, "close listen sock"); +} + +static void *do_rx(void *arg) +{ + struct thread_data *td = arg; + const int cfg_receiver_wait_ms = 400; + uint64_t tstop; + int ret, fd = td->fd; + + tstop = gettimeofday_ms() + cfg_runtime_ms + cfg_receiver_wait_ms; + do { + if (cfg_type == SOCK_STREAM) + ret = do_flush_tcp(td, fd); + else + ret = do_flush_datagram(td, fd, cfg_type); + + if (ret) + break; + + do_poll(fd, POLLIN); + } while (gettimeofday_ms() < tstop); + + if (close(fd)) + t_error(1, errno, "close"); + pthread_exit(&td->ret); + return NULL; +} + static inline struct io_uring_cqe *wait_cqe_fast(struct io_uring *ring) { struct io_uring_cqe *cqe; @@ -284,7 +416,7 @@ static void do_tx(struct thread_data *td, int domain, int type, int protocol) td->bytes += cqe->res; } else if (cqe->res == -ECONNREFUSED || cqe->res == -EPIPE || cqe->res == -ECONNRESET) { - fprintf(stderr, "Connection failure"); + fprintf(stderr, "Connection failure\n"); goto out_fail; } else if (cqe->res != -EAGAIN) { t_error(1, cqe->res, "send failed"); @@ -317,6 +449,7 @@ static void *do_test(void *arg) do_tx(td, cfg_family, cfg_type, protocol); pthread_exit(&td->ret); + return NULL; } static void usage(const char *filepath) @@ -339,7 +472,7 @@ static void parse_opts(int argc, char **argv) cfg_payload_len = max_payload_len; - while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:T:")) != -1) { + while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:T:R")) != -1) { switch (c) { case '4': if (cfg_family != PF_UNSPEC) @@ -388,6 +521,9 @@ static void parse_opts(int argc, char **argv) if (cfg_nr_threads > MAX_THREADS) t_error(1, 0, "too many threads\n"); break; + case 'R': + cfg_rx = 1; + break; } } @@ -442,8 +578,12 @@ int main(int argc, char **argv) td->idx = i; } + if (cfg_rx) + do_setup_rx(cfg_family, cfg_type, 0); + for (i = 0; i < cfg_nr_threads; i++) - pthread_create(&threads[i].thread, NULL, do_test, td); + pthread_create(&threads[i].thread, NULL, + !cfg_rx ? do_test : do_rx, &threads[i]); for (i = 0; i < cfg_nr_threads; i++) { td = &threads[i];