From patchwork Fri Apr 19 18:05:36 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Hefty, Sean" X-Patchwork-Id: 10909603 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 4F98214DB for ; Fri, 19 Apr 2019 19:10:48 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 4328B28DFA for ; Fri, 19 Apr 2019 19:10:48 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 3797428DFC; Fri, 19 Apr 2019 19:10:48 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.9 required=2.0 tests=BAYES_00,MAILING_LIST_MULTI, RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 7812728DFA for ; Fri, 19 Apr 2019 19:10:47 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1729427AbfDSTKq convert rfc822-to-8bit (ORCPT ); Fri, 19 Apr 2019 15:10:46 -0400 Received: from mga01.intel.com ([192.55.52.88]:16195 "EHLO mga01.intel.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1727716AbfDSTKq (ORCPT ); Fri, 19 Apr 2019 15:10:46 -0400 X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga001.fm.intel.com ([10.253.24.23]) by fmsmga101.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 19 Apr 2019 11:05:37 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.60,370,1549958400"; d="scan'208";a="166189352" Received: from orsmsx109.amr.corp.intel.com ([10.22.240.7]) by fmsmga001.fm.intel.com with ESMTP; 19 Apr 2019 11:05:39 -0700 Received: from orsmsx152.amr.corp.intel.com (10.22.226.39) by ORSMSX109.amr.corp.intel.com (10.22.240.7) with Microsoft SMTP Server (TLS) id 14.3.408.0; Fri, 19 Apr 2019 11:05:37 -0700 Received: from orsmsx109.amr.corp.intel.com ([169.254.11.52]) by ORSMSX152.amr.corp.intel.com ([169.254.8.32]) with mapi id 14.03.0415.000; Fri, 19 Apr 2019 11:05:37 -0700 From: "Hefty, Sean" To: "linux-rdma (linux-rdma@vger.kernel.org)" Subject: [PATCH rdma-core 1/5] rsockets: Use service thread to accept connections Thread-Topic: [PATCH rdma-core 1/5] rsockets: Use service thread to accept connections Thread-Index: AdT22Ymwc9KiPl/EQXehwd4IQEZAbg== Date: Fri, 19 Apr 2019 18:05:36 +0000 Message-ID: <1828884A29C6694DAF28B7E6B8A82373B3E1F8FD@ORSMSX109.amr.corp.intel.com> Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-titus-metadata-40: eyJDYXRlZ29yeUxhYmVscyI6IiIsIk1ldGFkYXRhIjp7Im5zIjoiaHR0cDpcL1wvd3d3LnRpdHVzLmNvbVwvbnNcL0ludGVsMyIsImlkIjoiODM5MDQ0NjgtMTNjMS00ZjRlLTlhOGMtYjIyYzBkMDBlNTUwIiwicHJvcHMiOlt7Im4iOiJDVFBDbGFzc2lmaWNhdGlvbiIsInZhbHMiOlt7InZhbHVlIjoiQ1RQX05UIn1dfV19LCJTdWJqZWN0TGFiZWxzIjpbXSwiVE1DVmVyc2lvbiI6IjE3LjEwLjE4MDQuNDkiLCJUcnVzdGVkTGFiZWxIYXNoIjoiOFI2eWRqcWxzNlBhWk9OYStrcjZGcVdsR3BXWXNBc1psTnh4Y1dGa0dWQWw0QlkrT0VWMUJjMFZQVldFUzR4ZCJ9 x-ctpclassification: CTP_NT dlp-product: dlpe-windows dlp-version: 11.0.600.7 dlp-reaction: no-action x-originating-ip: [10.22.254.139] MIME-Version: 1.0 Sender: linux-rdma-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: linux-rdma@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP Rsockets currently uses the application thread to drive the progress state of new connections. However, some applications expect that new connections can be established without the application taking specific actions. For example, some apps do this sequence: s = socket(); c = socket(); fcntl(s, O_NONBLOCK); listen(s); connect(c); a = accept(s); In rsockets, this hangs at connect because nothing processes the incoming connection request. This problem was reported when integrating rsockets in the Java Development Kit. To better support the socket semantic, move the processing of connection requests to a service thread. We setup a socketpair on any listening socket. After a connection has been established, the listening socket is notified via the socketpair. With this change, a single thread app can connect to itself and transfer data. Signed-off-by: Sean Hefty --- librdmacm/rsocket.c | 190 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 149 insertions(+), 41 deletions(-) diff --git a/librdmacm/rsocket.c b/librdmacm/rsocket.c index 48f30717..f64719e8 100644 --- a/librdmacm/rsocket.c +++ b/librdmacm/rsocket.c @@ -77,7 +77,9 @@ enum { RS_SVC_REM_DGRAM, RS_SVC_ADD_KEEPALIVE, RS_SVC_REM_KEEPALIVE, - RS_SVC_MOD_KEEPALIVE + RS_SVC_MOD_KEEPALIVE, + RS_SVC_ADD_LISTEN, + RS_SVC_REM_LISTEN, }; struct rs_svc_msg { @@ -109,6 +111,12 @@ static struct rs_svc tcp_svc = { .context_size = sizeof(*tcp_svc_timeouts), .run = tcp_svc_run }; +static struct pollfd *listen_svc_fds; +static void *listen_svc_run(void *arg); +static struct rs_svc listen_svc = { + .context_size = sizeof(*listen_svc_fds), + .run = listen_svc_run +}; static uint16_t def_iomap_size = 0; static uint16_t def_inline = 64; @@ -310,6 +318,7 @@ struct rsocket { struct rdma_cm_id *cm_id; uint64_t tcp_opts; unsigned int keepalive_time; + int accept_queue[2]; unsigned int ctrl_seqno; unsigned int ctrl_max_seqno; @@ -1026,6 +1035,11 @@ static void rs_free(struct rsocket *rs) rdma_destroy_id(rs->cm_id); } + if (rs->accept_queue[0] > 0 || rs->accept_queue[1] > 0) { + close(rs->accept_queue[0]); + close(rs->accept_queue[1]); + } + fastlock_destroy(&rs->map_lock); fastlock_destroy(&rs->cq_wait_lock); fastlock_destroy(&rs->cq_lock); @@ -1203,45 +1217,52 @@ int rlisten(int socket, int backlog) if (!rs) return ERR(EBADF); - if (rs->state != rs_listening) { - ret = rdma_listen(rs->cm_id, backlog); - if (!ret) - rs->state = rs_listening; - } else { - ret = 0; + if (rs->state == rs_listening) + return 0; + + ret = rdma_listen(rs->cm_id, backlog); + if (ret) + return ret; + + ret = socketpair(AF_UNIX, SOCK_STREAM, 0, rs->accept_queue); + if (ret) + return ret; + + if (rs->fd_flags & O_NONBLOCK) { + ret = set_fd_nonblock(rs->accept_queue[0], true); + if (ret) + return ret; } - return ret; + + ret = set_fd_nonblock(rs->cm_id->channel->fd, true); + if (ret) + return ret; + + ret = rs_notify_svc(&listen_svc, rs, RS_SVC_ADD_LISTEN); + if (ret) + return ret; + + rs->state = rs_listening; + return 0; } -/* - * Nonblocking is usually not inherited between sockets, but we need to - * inherit it here to establish the connection only. This is needed to - * prevent rdma_accept from blocking until the remote side finishes - * establishing the connection. If we were to allow rdma_accept to block, - * then a single thread cannot establish a connection with itself, or - * two threads which try to connect to each other can deadlock trying to - * form a connection. - * - * Data transfers on the new socket remain blocking unless the user - * specifies otherwise through rfcntl. - */ -int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) +/* Accepting new connection requests is currently a blocking operation */ +static void rs_accept(struct rsocket *rs) { - struct rsocket *rs, *new_rs; + struct rsocket *new_rs; struct rdma_conn_param param; struct rs_conn_data *creq, cresp; + struct rdma_cm_id *cm_id; int ret; - rs = idm_lookup(&idm, socket); - if (!rs) - return ERR(EBADF); + ret = rdma_get_request(rs->cm_id, &cm_id); + if (ret) + return; + new_rs = rs_alloc(rs, rs->type); if (!new_rs) - return ERR(ENOMEM); - - ret = rdma_get_request(rs->cm_id, &new_rs->cm_id); - if (ret) goto err; + new_rs->cm_id = cm_id; ret = rs_insert(new_rs, new_rs->cm_id->channel->fd); if (ret < 0) @@ -1249,13 +1270,8 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) creq = (struct rs_conn_data *) (new_rs->cm_id->event->param.conn.private_data + rs_conn_data_offset(rs)); - if (creq->version != 1) { - ret = ERR(ENOTSUP); + if (creq->version != 1) goto err; - } - - if (rs->fd_flags & O_NONBLOCK) - set_fd_nonblock(new_rs->cm_id->channel->fd, true); ret = rs_create_ep(new_rs); if (ret) @@ -1274,13 +1290,34 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) else goto err; + write_all(rs->accept_queue[1], &new_rs, sizeof(new_rs)); + return; + +err: + rdma_reject(cm_id, NULL, 0); + if (new_rs) + rs_free(new_rs); +} + +int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) +{ + struct rsocket *rs, *new_rs; + int ret; + + rs = idm_lookup(&idm, socket); + if (!rs) + return ERR(EBADF); + + if (rs->state != rs_listening) + return ERR(EBADF); + + ret = read(rs->accept_queue[0], &new_rs, sizeof(new_rs)); + if (ret != sizeof(new_rs)) + return ret; + if (addr && addrlen) rgetpeername(new_rs->index, addr, addrlen); return new_rs->index; - -err: - rs_free(new_rs); - return ret; } static int rs_do_connect(struct rsocket *rs) @@ -3314,8 +3351,15 @@ int rclose(int socket) if (rs->type == SOCK_STREAM) { if (rs->state & rs_connected) rshutdown(socket, SHUT_RDWR); - else if (rs->opts & RS_OPT_SVC_ACTIVE) - rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE); + + if (rs->opts & RS_OPT_SVC_ACTIVE) { + if (rs->state == rs_listening) + rs_notify_svc(&listen_svc, rs, + RS_SVC_REM_LISTEN); + else + rs_notify_svc(&tcp_svc, rs, + RS_SVC_REM_KEEPALIVE); + } } else { ds_shutdown(rs); } @@ -4359,3 +4403,67 @@ static void *tcp_svc_run(void *arg) return NULL; } + +static void listen_svc_process_sock(struct rs_svc *svc) +{ + struct rs_svc_msg msg; + + read_all(svc->sock[1], &msg, sizeof msg); + switch (msg.cmd) { + case RS_SVC_ADD_LISTEN: + msg.status = rs_svc_add_rs(svc, msg.rs); + if (!msg.status) { + msg.rs->opts |= RS_OPT_SVC_ACTIVE; + listen_svc_fds = svc->contexts; + listen_svc_fds[svc->cnt].fd = msg.rs->cm_id-> + channel->fd; + listen_svc_fds[svc->cnt].events = POLLIN; + listen_svc_fds[svc->cnt].revents = 0; + } + break; + case RS_SVC_REM_LISTEN: + msg.status = rs_svc_rm_rs(svc, msg.rs); + if (!msg.status) + msg.rs->opts &= ~RS_OPT_SVC_ACTIVE; + break; + case RS_SVC_NOOP: + msg.status = 0; + break; + default: + break; + } + write_all(svc->sock[1], &msg, sizeof msg); +} + +static void *listen_svc_run(void *arg) +{ + struct rs_svc *svc = arg; + struct rs_svc_msg msg; + int i, ret; + + ret = rs_svc_grow_sets(svc, 4); + if (ret) { + msg.status = ret; + write_all(svc->sock[1], &msg, sizeof msg); + return (void *) (uintptr_t) ret; + } + + listen_svc_fds = svc->contexts; + listen_svc_fds[0].fd = svc->sock[1]; + listen_svc_fds[0].events = POLLIN; + do { + for (i = 0; i <= svc->cnt; i++) + listen_svc_fds[i].revents = 0; + + poll(listen_svc_fds, svc->cnt + 1, -1); + if (listen_svc_fds[0].revents) + listen_svc_process_sock(svc); + + for (i = 1; i <= svc->cnt; i++) { + if (listen_svc_fds[i].revents) + rs_accept(svc->rss[i]); + } + } while (svc->cnt >= 1); + + return NULL; +}