From patchwork Thu Jun 30 09:14:20 2022 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dylan Yudaken X-Patchwork-Id: 12901506 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from vger.kernel.org (vger.kernel.org [23.128.96.18]) by smtp.lore.kernel.org (Postfix) with ESMTP id 93DB7CCA481 for ; Thu, 30 Jun 2022 09:18:56 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S233757AbiF3JS4 (ORCPT ); Thu, 30 Jun 2022 05:18:56 -0400 Received: from lindbergh.monkeyblade.net ([23.128.96.19]:44222 "EHLO lindbergh.monkeyblade.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S234241AbiF3JS0 (ORCPT ); Thu, 30 Jun 2022 05:18:26 -0400 Received: from mx0a-00082601.pphosted.com (mx0a-00082601.pphosted.com [67.231.145.42]) by lindbergh.monkeyblade.net (Postfix) with ESMTPS id A3387201A5 for ; Thu, 30 Jun 2022 02:17:21 -0700 (PDT) Received: from pps.filterd (m0148461.ppops.net [127.0.0.1]) by mx0a-00082601.pphosted.com (8.17.1.5/8.17.1.5) with ESMTP id 25U4dbep001372 for ; Thu, 30 Jun 2022 02:17:21 -0700 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=fb.com; h=from : to : cc : subject : date : message-id : in-reply-to : references : mime-version : content-transfer-encoding : content-type; s=facebook; bh=0Kdm69wPWm8gnba0Ex79OTnKsCnfQtGcYVgZAiRSJW4=; b=hMyMcxeQd7Wkab96eCykp9uFgnnXUJd/tCtdDwfkOE/6/Z9mi7PIrr9L++0PgWO1VRrM msz3LrkkaRKQ81Vw9bp74oFC2wU61DKnnS4MVr2P2kMbo2tE+Ob5FEaDVZPjrRpUxtPy LmBlnV6q8jGs/zeCtaDBqsYrr8Ro/I6WU3Y= Received: from maileast.thefacebook.com ([163.114.130.16]) by mx0a-00082601.pphosted.com (PPS) with ESMTPS id 3h150e94n6-2 (version=TLSv1.2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128 verify=NOT) for ; Thu, 30 Jun 2022 02:17:21 -0700 Received: from twshared18317.08.ash9.facebook.com (2620:10d:c0a8:1b::d) by mail.thefacebook.com (2620:10d:c0a8:82::e) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256) id 15.1.2375.28; Thu, 30 Jun 2022 02:17:19 -0700 Received: by devbig038.lla2.facebook.com (Postfix, from userid 572232) id C779F259A056; Thu, 30 Jun 2022 02:14:23 -0700 (PDT) From: Dylan Yudaken To: Jens Axboe , Pavel Begunkov , CC: , Dylan Yudaken Subject: [PATCH v2 liburing 5/7] add recv-multishot test Date: Thu, 30 Jun 2022 02:14:20 -0700 Message-ID: <20220630091422.1463570-6-dylany@fb.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20220630091422.1463570-1-dylany@fb.com> References: <20220630091422.1463570-1-dylany@fb.com> MIME-Version: 1.0 X-FB-Internal: Safe X-Proofpoint-GUID: 7Cg0kOx7YWvEb5W5O05DdCFmTnYBfrkT X-Proofpoint-ORIG-GUID: 7Cg0kOx7YWvEb5W5O05DdCFmTnYBfrkT X-Proofpoint-Virus-Version: vendor=baseguard engine=ICAP:2.0.205,Aquarius:18.0.883,Hydra:6.0.517,FMLib:17.11.122.1 definitions=2022-06-30_05,2022-06-28_01,2022-06-22_01 Precedence: bulk List-ID: X-Mailing-List: io-uring@vger.kernel.org add a test for multishot receive functionality Signed-off-by: Dylan Yudaken --- test/Makefile | 1 + test/recv-multishot.c | 342 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 343 insertions(+) create mode 100644 test/recv-multishot.c diff --git a/test/Makefile b/test/Makefile index ad47d2d..e718583 100644 --- a/test/Makefile +++ b/test/Makefile @@ -125,6 +125,7 @@ test_srcs := \ read-write.c \ recv-msgall.c \ recv-msgall-stream.c \ + recv-multishot.c \ register-restrictions.c \ rename.c \ ringbuf-read.c \ diff --git a/test/recv-multishot.c b/test/recv-multishot.c new file mode 100644 index 0000000..f1487b4 --- /dev/null +++ b/test/recv-multishot.c @@ -0,0 +1,342 @@ +// SPDX-License-Identifier: MIT + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "liburing.h" +#include "helpers.h" + + +enum early_error_t { + ERROR_NONE = 0, + ERROR_NOT_ENOUGH_BUFFERS, + ERROR_EARLY_CLOSE_SENDER, + ERROR_EARLY_CLOSE_RECEIVER, + ERROR_EARLY_OVERFLOW, + ERROR_EARLY_LAST +}; + +struct args { + bool recvmsg; + bool stream; + bool wait_each; + enum early_error_t early_error; +}; + +static int test(struct args *args) +{ + int const N = 8; + int const N_BUFFS = N * 64; + int const N_CQE_OVERFLOW = 4; + int const min_cqes = 2; + struct io_uring ring; + struct io_uring_cqe *cqe; + struct io_uring_sqe *sqe; + int fds[2], ret, i, j, total_sent_bytes = 0, total_recv_bytes = 0; + int send_buff[256]; + int *recv_buffs[N_BUFFS]; + int *at; + struct io_uring_cqe recv_cqe[N_BUFFS]; + int recv_cqes = 0; + bool early_error = false; + bool early_error_started = false; + struct msghdr msg = { }; + struct __kernel_timespec timeout = { + .tv_sec = 1, + }; + + + memset(recv_buffs, 0, sizeof(recv_buffs)); + + if (args->early_error == ERROR_EARLY_OVERFLOW) { + struct io_uring_params params = { + .flags = IORING_SETUP_CQSIZE, + .cq_entries = N_CQE_OVERFLOW + }; + + ret = io_uring_queue_init_params(N_CQE_OVERFLOW, &ring, ¶ms); + } else { + ret = io_uring_queue_init(32, &ring, 0); + } + if (ret) { + fprintf(stderr, "queue init failed: %d\n", ret); + return ret; + } + + ret = t_create_socket_pair(fds, args->stream); + if (ret) { + fprintf(stderr, "t_create_socket_pair failed: %d\n", ret); + return ret; + } + + for (i = 0; i < ARRAY_SIZE(send_buff); i++) + send_buff[i] = i; + + for (i = 0; i < ARRAY_SIZE(recv_buffs); i++) { + /* prepare some different sized buffers */ + int buffer_size = (i % 2 == 0 && args->stream) ? 1 : N * sizeof(int); + + recv_buffs[i] = malloc(sizeof(*at) * buffer_size); + + if (i > 2 && args->early_error == ERROR_NOT_ENOUGH_BUFFERS) + continue; + + sqe = io_uring_get_sqe(&ring); + io_uring_prep_provide_buffers(sqe, recv_buffs[i], + buffer_size * sizeof(*recv_buffs[i]), 1, 7, i); + if (io_uring_submit_and_wait_timeout(&ring, &cqe, 1, &timeout, NULL) != 0) { + fprintf(stderr, "provide buffers failed: %d\n", ret); + ret = -1; + goto cleanup; + } + io_uring_cqe_seen(&ring, cqe); + } + + sqe = io_uring_get_sqe(&ring); + if (args->recvmsg) { + memset(&msg, 0, sizeof(msg)); + msg.msg_namelen = sizeof(struct sockaddr_in); + io_uring_prep_recvmsg_multishot(sqe, fds[0], &msg, 0); + } else { + io_uring_prep_recv_multishot(sqe, fds[0], NULL, 0, 0); + } + sqe->flags |= IOSQE_BUFFER_SELECT; + sqe->buf_group = 7; + io_uring_sqe_set_data64(sqe, 1234); + io_uring_submit(&ring); + + at = &send_buff[0]; + total_sent_bytes = 0; + for (i = 0; i < N; i++) { + int to_send = sizeof(*at) * (i+1); + + total_sent_bytes += to_send; + if (send(fds[1], at, to_send, 0) != to_send) { + if (early_error_started) + break; + fprintf(stderr, "send failed %d\n", errno); + ret = -1; + goto cleanup; + } + + if (i == 2) { + if (args->early_error == ERROR_EARLY_CLOSE_RECEIVER) { + /* allow previous sends to complete */ + usleep(1000); + + sqe = io_uring_get_sqe(&ring); + io_uring_prep_recv(sqe, fds[0], NULL, 0, 0); + io_uring_prep_cancel64(sqe, 1234, 0); + sqe->flags |= IOSQE_CQE_SKIP_SUCCESS; + io_uring_submit(&ring); + early_error_started = true; + } + if (args->early_error == ERROR_EARLY_CLOSE_SENDER) { + early_error_started = true; + shutdown(fds[1], SHUT_RDWR); + close(fds[1]); + } + } + at += (i+1); + + if (args->wait_each) { + ret = io_uring_wait_cqes(&ring, &cqe, 1, &timeout, NULL); + if (ret) { + fprintf(stderr, "wait_each failed: %d\n", ret); + ret = -1; + goto cleanup; + } + while (io_uring_peek_cqe(&ring, &cqe) == 0) { + recv_cqe[recv_cqes++] = *cqe; + if (cqe->flags & IORING_CQE_F_MORE) { + io_uring_cqe_seen(&ring, cqe); + } else { + early_error = true; + io_uring_cqe_seen(&ring, cqe); + } + } + if (early_error) + break; + } + } + + close(fds[1]); + + /* allow sends to finish */ + usleep(1000); + + if ((args->stream && !early_error) || recv_cqes < min_cqes) { + ret = io_uring_wait_cqes(&ring, &cqe, 1, &timeout, NULL); + if (ret && ret != -ETIME) { + fprintf(stderr, "wait final failed: %d\n", ret); + ret = -1; + goto cleanup; + } + } + + while (io_uring_peek_cqe(&ring, &cqe) == 0) { + recv_cqe[recv_cqes++] = *cqe; + io_uring_cqe_seen(&ring, cqe); + } + + ret = -1; + at = &send_buff[0]; + if (recv_cqes < min_cqes) { + fprintf(stderr, "not enough cqes: have=%d vs %d\n", recv_cqes, min_cqes); + goto cleanup; + } + for (i = 0; i < recv_cqes; i++) { + cqe = &recv_cqe[i]; + + bool const is_last = i == recv_cqes - 1; + + bool const should_be_last = + (cqe->res <= 0) || + (args->stream && is_last) || + (args->early_error == ERROR_EARLY_OVERFLOW && + !args->wait_each && i == N_CQE_OVERFLOW); + int *this_recv; + + + if (should_be_last) { + if (!is_last) { + fprintf(stderr, "not last cqe had error %d\n", i); + goto cleanup; + } + + switch (args->early_error) { + case ERROR_NOT_ENOUGH_BUFFERS: + if (cqe->res != -ENOBUFS) { + fprintf(stderr, + "ERROR_NOT_ENOUGH_BUFFERS: res %d\n", cqe->res); + goto cleanup; + } + break; + case ERROR_EARLY_OVERFLOW: + if (cqe->res < 0) { + fprintf(stderr, + "ERROR_EARLY_OVERFLOW: res %d\n", cqe->res); + goto cleanup; + } + break; + case ERROR_EARLY_CLOSE_RECEIVER: + if (cqe->res != -ECANCELED) { + fprintf(stderr, + "ERROR_EARLY_CLOSE_RECEIVER: res %d\n", cqe->res); + goto cleanup; + } + break; + case ERROR_NONE: + case ERROR_EARLY_CLOSE_SENDER: + if (cqe->res != 0) { + fprintf(stderr, "early error: res %d\n", cqe->res); + goto cleanup; + } + break; + case ERROR_EARLY_LAST: + fprintf(stderr, "bad error_early\n"); + goto cleanup; + }; + + if (cqe->res <= 0 && cqe->flags & IORING_CQE_F_BUFFER) { + fprintf(stderr, "final BUFFER flag set\n"); + goto cleanup; + } + + if (cqe->flags & IORING_CQE_F_MORE) { + fprintf(stderr, "final MORE flag set\n"); + goto cleanup; + } + + if (cqe->res <= 0) + continue; + } else { + if (!(cqe->flags & IORING_CQE_F_MORE)) { + fprintf(stderr, "MORE flag not set\n"); + goto cleanup; + } + } + + if (!(cqe->flags & IORING_CQE_F_BUFFER)) { + fprintf(stderr, "BUFFER flag not set\n"); + goto cleanup; + } + + total_recv_bytes += cqe->res; + if (cqe->res % 4 != 0) { + /* + * doesn't seem to happen in practice, would need some + * work to remove this requirement + */ + fprintf(stderr, "unexpectedly aligned buffer cqe->res=%d\n", cqe->res); + goto cleanup; + } + + /* check buffer arrived in order (for tcp) */ + this_recv = recv_buffs[cqe->flags >> 16]; + for (j = 0; args->stream && j < cqe->res / 4; j++) { + int sent = *at++; + int recv = *this_recv++; + + if (sent != recv) { + fprintf(stderr, "recv=%d sent=%d\n", recv, sent); + goto cleanup; + } + } + } + + if (args->early_error == ERROR_NONE && total_recv_bytes < total_sent_bytes) { + fprintf(stderr, + "missing recv: recv=%d sent=%d\n", total_recv_bytes, total_sent_bytes); + goto cleanup; + } + + /* check the final one */ + cqe = &recv_cqe[recv_cqes-1]; + + ret = 0; +cleanup: + for (i = 0; i < ARRAY_SIZE(recv_buffs); i++) + free(recv_buffs[i]); + close(fds[0]); + close(fds[1]); + io_uring_queue_exit(&ring); + + return ret; +} + +int main(int argc, char *argv[]) +{ + int ret; + int loop; + int early_error = 0; + + if (argc > 1) + return 0; + + for (loop = 0; loop < 7; loop++) { + struct args a = { + .stream = loop & 0x01, + .recvmsg = loop & 0x02, + .wait_each = loop & 0x4, + }; + for (early_error = 0; early_error < ERROR_EARLY_LAST; early_error++) { + a.early_error = (enum early_error_t)early_error; + ret = test(&a); + if (ret) { + fprintf(stderr, + "test stream=%d recvmsg=%d wait_each=%d early_error=%d failed\n", + a.stream, a.recvmsg, a.wait_each, a.early_error); + return ret; + } + } + } + return 0; +}