@@ -77,7 +77,9 @@ enum {
RS_SVC_REM_DGRAM,
RS_SVC_ADD_KEEPALIVE,
RS_SVC_REM_KEEPALIVE,
- RS_SVC_MOD_KEEPALIVE
+ RS_SVC_MOD_KEEPALIVE,
+ RS_SVC_ADD_LISTEN,
+ RS_SVC_REM_LISTEN,
};
struct rs_svc_msg {
@@ -109,6 +111,12 @@ static struct rs_svc tcp_svc = {
.context_size = sizeof(*tcp_svc_timeouts),
.run = tcp_svc_run
};
+static struct pollfd *listen_svc_fds;
+static void *listen_svc_run(void *arg);
+static struct rs_svc listen_svc = {
+ .context_size = sizeof(*listen_svc_fds),
+ .run = listen_svc_run
+};
static uint16_t def_iomap_size = 0;
static uint16_t def_inline = 64;
@@ -310,6 +318,7 @@ struct rsocket {
struct rdma_cm_id *cm_id;
uint64_t tcp_opts;
unsigned int keepalive_time;
+ int accept_queue[2];
unsigned int ctrl_seqno;
unsigned int ctrl_max_seqno;
@@ -1026,6 +1035,11 @@ static void rs_free(struct rsocket *rs)
rdma_destroy_id(rs->cm_id);
}
+ if (rs->accept_queue[0] > 0 || rs->accept_queue[1] > 0) {
+ close(rs->accept_queue[0]);
+ close(rs->accept_queue[1]);
+ }
+
fastlock_destroy(&rs->map_lock);
fastlock_destroy(&rs->cq_wait_lock);
fastlock_destroy(&rs->cq_lock);
@@ -1203,45 +1217,52 @@ int rlisten(int socket, int backlog)
if (!rs)
return ERR(EBADF);
- if (rs->state != rs_listening) {
- ret = rdma_listen(rs->cm_id, backlog);
- if (!ret)
- rs->state = rs_listening;
- } else {
- ret = 0;
+ if (rs->state == rs_listening)
+ return 0;
+
+ ret = rdma_listen(rs->cm_id, backlog);
+ if (ret)
+ return ret;
+
+ ret = socketpair(AF_UNIX, SOCK_STREAM, 0, rs->accept_queue);
+ if (ret)
+ return ret;
+
+ if (rs->fd_flags & O_NONBLOCK) {
+ ret = set_fd_nonblock(rs->accept_queue[0], true);
+ if (ret)
+ return ret;
}
- return ret;
+
+ ret = set_fd_nonblock(rs->cm_id->channel->fd, true);
+ if (ret)
+ return ret;
+
+ ret = rs_notify_svc(&listen_svc, rs, RS_SVC_ADD_LISTEN);
+ if (ret)
+ return ret;
+
+ rs->state = rs_listening;
+ return 0;
}
-/*
- * Nonblocking is usually not inherited between sockets, but we need to
- * inherit it here to establish the connection only. This is needed to
- * prevent rdma_accept from blocking until the remote side finishes
- * establishing the connection. If we were to allow rdma_accept to block,
- * then a single thread cannot establish a connection with itself, or
- * two threads which try to connect to each other can deadlock trying to
- * form a connection.
- *
- * Data transfers on the new socket remain blocking unless the user
- * specifies otherwise through rfcntl.
- */
-int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+/* Accepting new connection requests is currently a blocking operation */
+static void rs_accept(struct rsocket *rs)
{
- struct rsocket *rs, *new_rs;
+ struct rsocket *new_rs;
struct rdma_conn_param param;
struct rs_conn_data *creq, cresp;
+ struct rdma_cm_id *cm_id;
int ret;
- rs = idm_lookup(&idm, socket);
- if (!rs)
- return ERR(EBADF);
+ ret = rdma_get_request(rs->cm_id, &cm_id);
+ if (ret)
+ return;
+
new_rs = rs_alloc(rs, rs->type);
if (!new_rs)
- return ERR(ENOMEM);
-
- ret = rdma_get_request(rs->cm_id, &new_rs->cm_id);
- if (ret)
goto err;
+ new_rs->cm_id = cm_id;
ret = rs_insert(new_rs, new_rs->cm_id->channel->fd);
if (ret < 0)
@@ -1249,13 +1270,8 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
creq = (struct rs_conn_data *)
(new_rs->cm_id->event->param.conn.private_data + rs_conn_data_offset(rs));
- if (creq->version != 1) {
- ret = ERR(ENOTSUP);
+ if (creq->version != 1)
goto err;
- }
-
- if (rs->fd_flags & O_NONBLOCK)
- set_fd_nonblock(new_rs->cm_id->channel->fd, true);
ret = rs_create_ep(new_rs);
if (ret)
@@ -1274,13 +1290,34 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
else
goto err;
+ write_all(rs->accept_queue[1], &new_rs, sizeof(new_rs));
+ return;
+
+err:
+ rdma_reject(cm_id, NULL, 0);
+ if (new_rs)
+ rs_free(new_rs);
+}
+
+int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+{
+ struct rsocket *rs, *new_rs;
+ int ret;
+
+ rs = idm_lookup(&idm, socket);
+ if (!rs)
+ return ERR(EBADF);
+
+ if (rs->state != rs_listening)
+ return ERR(EBADF);
+
+ ret = read(rs->accept_queue[0], &new_rs, sizeof(new_rs));
+ if (ret != sizeof(new_rs))
+ return ret;
+
if (addr && addrlen)
rgetpeername(new_rs->index, addr, addrlen);
return new_rs->index;
-
-err:
- rs_free(new_rs);
- return ret;
}
static int rs_do_connect(struct rsocket *rs)
@@ -3314,8 +3351,15 @@ int rclose(int socket)
if (rs->type == SOCK_STREAM) {
if (rs->state & rs_connected)
rshutdown(socket, SHUT_RDWR);
- else if (rs->opts & RS_OPT_SVC_ACTIVE)
- rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
+
+ if (rs->opts & RS_OPT_SVC_ACTIVE) {
+ if (rs->state == rs_listening)
+ rs_notify_svc(&listen_svc, rs,
+ RS_SVC_REM_LISTEN);
+ else
+ rs_notify_svc(&tcp_svc, rs,
+ RS_SVC_REM_KEEPALIVE);
+ }
} else {
ds_shutdown(rs);
}
@@ -4359,3 +4403,67 @@ static void *tcp_svc_run(void *arg)
return NULL;
}
+
+static void listen_svc_process_sock(struct rs_svc *svc)
+{
+ struct rs_svc_msg msg;
+
+ read_all(svc->sock[1], &msg, sizeof msg);
+ switch (msg.cmd) {
+ case RS_SVC_ADD_LISTEN:
+ msg.status = rs_svc_add_rs(svc, msg.rs);
+ if (!msg.status) {
+ msg.rs->opts |= RS_OPT_SVC_ACTIVE;
+ listen_svc_fds = svc->contexts;
+ listen_svc_fds[svc->cnt].fd = msg.rs->cm_id->
+ channel->fd;
+ listen_svc_fds[svc->cnt].events = POLLIN;
+ listen_svc_fds[svc->cnt].revents = 0;
+ }
+ break;
+ case RS_SVC_REM_LISTEN:
+ msg.status = rs_svc_rm_rs(svc, msg.rs);
+ if (!msg.status)
+ msg.rs->opts &= ~RS_OPT_SVC_ACTIVE;
+ break;
+ case RS_SVC_NOOP:
+ msg.status = 0;
+ break;
+ default:
+ break;
+ }
+ write_all(svc->sock[1], &msg, sizeof msg);
+}
+
+static void *listen_svc_run(void *arg)
+{
+ struct rs_svc *svc = arg;
+ struct rs_svc_msg msg;
+ int i, ret;
+
+ ret = rs_svc_grow_sets(svc, 4);
+ if (ret) {
+ msg.status = ret;
+ write_all(svc->sock[1], &msg, sizeof msg);
+ return (void *) (uintptr_t) ret;
+ }
+
+ listen_svc_fds = svc->contexts;
+ listen_svc_fds[0].fd = svc->sock[1];
+ listen_svc_fds[0].events = POLLIN;
+ do {
+ for (i = 0; i <= svc->cnt; i++)
+ listen_svc_fds[i].revents = 0;
+
+ poll(listen_svc_fds, svc->cnt + 1, -1);
+ if (listen_svc_fds[0].revents)
+ listen_svc_process_sock(svc);
+
+ for (i = 1; i <= svc->cnt; i++) {
+ if (listen_svc_fds[i].revents)
+ rs_accept(svc->rss[i]);
+ }
+ } while (svc->cnt >= 1);
+
+ return NULL;
+}
Rsockets currently uses the application thread to drive the progress state of new connections. However, some applications expect that new connections can be established without the application taking specific actions. For example, some apps do this sequence: s = socket(); c = socket(); fcntl(s, O_NONBLOCK); listen(s); connect(c); a = accept(s); In rsockets, this hangs at connect because nothing processes the incoming connection request. This problem was reported when integrating rsockets in the Java Development Kit. To better support the socket semantic, move the processing of connection requests to a service thread. We setup a socketpair on any listening socket. After a connection has been established, the listening socket is notified via the socketpair. With this change, a single thread app can connect to itself and transfer data. Signed-off-by: Sean Hefty <sean.hefty@intel.com> --- librdmacm/rsocket.c | 190 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 149 insertions(+), 41 deletions(-)