From patchwork Tue Dec 17 00:06:21 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: John Ousterhout X-Patchwork-Id: 13910710 X-Patchwork-Delegate: kuba@kernel.org Received: from smtp1.cs.Stanford.EDU (smtp1.cs.stanford.edu [171.64.64.25]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by smtp.subspace.kernel.org (Postfix) with ESMTPS id 5EEA21E4AE for ; Tue, 17 Dec 2024 00:07:15 +0000 (UTC) Authentication-Results: smtp.subspace.kernel.org; arc=none smtp.client-ip=171.64.64.25 ARC-Seal: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1734394038; cv=none; b=D2xzXNBougBQ9b+obkYs+4GSGYNk89cikIjH4jA2IKYjW6Qp59BZk/D53rfqQgKfxdWcHxyUIoXRYRj5YEOxVx+w9XI9jY6cx6j7V5HY/t3wZb38DVA4zl1Ct5UCubOs3/ws6gm4GO8cCAp8SnJxjNl4u1mZathaiQm+DeH9HVs= ARC-Message-Signature: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1734394038; c=relaxed/simple; bh=BQOL+6fESpdiW5C/+hnCzOfM+2V9b8CRUB/nZCH/Dl8=; h=From:To:Cc:Subject:Date:Message-ID:In-Reply-To:References: MIME-Version; b=LuCo/k0H/S5Ux2VwWF2FCEWF6ZNQjJjjBqV2fMg1WMfAtNDBqoObvFCOK1aUZA9MwA9kyc1oE6ThtXWCpOtYUWGdA2Gm1+Sgoa3hcNnP5q0JhC0sIA9626TJsUPIcVf4/e1s7MeXnqq9YdObFAgXrucxvVDYh2DPYMg9zqJmrmI= ARC-Authentication-Results: i=1; smtp.subspace.kernel.org; dmarc=pass (p=none dis=none) header.from=cs.stanford.edu; spf=pass smtp.mailfrom=cs.stanford.edu; dkim=pass (2048-bit key) header.d=cs.stanford.edu header.i=@cs.stanford.edu header.b=eTbir1Vf; arc=none smtp.client-ip=171.64.64.25 Authentication-Results: smtp.subspace.kernel.org; dmarc=pass (p=none dis=none) header.from=cs.stanford.edu Authentication-Results: smtp.subspace.kernel.org; spf=pass smtp.mailfrom=cs.stanford.edu Authentication-Results: smtp.subspace.kernel.org; dkim=pass (2048-bit key) header.d=cs.stanford.edu header.i=@cs.stanford.edu header.b="eTbir1Vf" DKIM-Signature: v=1; a=rsa-sha256; q=dns/txt; c=relaxed/relaxed; d=cs.stanford.edu; s=cs2308; h=Content-Transfer-Encoding:MIME-Version: References:In-Reply-To:Message-ID:Date:Subject:Cc:To:From:Sender:Reply-To: Content-Type:Content-ID:Content-Description:Resent-Date:Resent-From: Resent-Sender:Resent-To:Resent-Cc:Resent-Message-ID:List-Id:List-Help: List-Unsubscribe:List-Subscribe:List-Post:List-Owner:List-Archive; bh=iZ7hxEqezoEikyaumkJBGvJK8h722MzYPii9ITyPaKQ=; t=1734394035; x=1735258035; b=eTbir1VfnRYfuO6H7S0xuHUkB9oARdCLAWVruJX6HBgl2j126EZZUQpM46siHQz1cbrrdlO7Ihx f9imyPoTzdDG7tZ3bMKBGZHogxGHvWVfoYBXocCjPKa7aR40SHRR/NK5kfDxmh0rNj4aJGySwz4Cb tmDbnuo+P8dkeJ7cIjgSEk0NSc5Odv5CpgBMAlcG6d4Q+jF7kjFf1oXEO2HiEF2pkJbci5rIvYh4/ rx0eKjcjYC+Xpgm17A26xi3x/QZ4b9dbjX9oGKqs2NTM2o+BpNumGQSP4UcfmUYBmomHsvyj3VPWQ +faoag133kRyC94CsL9xokGJ6me9O9k5Prog==; Received: from ouster448.stanford.edu ([172.24.72.71]:53919 helo=localhost.localdomain) by smtp1.cs.Stanford.EDU with esmtpsa (TLS1.2) tls TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 (Exim 4.94.2) (envelope-from ) id 1tNL7I-0002IN-R3; Mon, 16 Dec 2024 16:07:15 -0800 From: John Ousterhout To: netdev@vger.kernel.org Cc: pabeni@redhat.com, edumazet@google.com, horms@kernel.org, kuba@kernel.org, John Ousterhout Subject: [PATCH net-next v4 08/12] net: homa: create homa_incoming.c Date: Mon, 16 Dec 2024 16:06:21 -0800 Message-ID: <20241217000626.2958-9-ouster@cs.stanford.edu> X-Mailer: git-send-email 2.45.1 In-Reply-To: <20241217000626.2958-1-ouster@cs.stanford.edu> References: <20241217000626.2958-1-ouster@cs.stanford.edu> Precedence: bulk X-Mailing-List: netdev@vger.kernel.org List-Id: List-Subscribe: List-Unsubscribe: MIME-Version: 1.0 X-Spam-Score: -99.2 X-Scan-Signature: 90e34d83f99f61221c3983b46e90d836 X-Patchwork-Delegate: kuba@kernel.org This file contains most of the code for handling incoming packets, including top-level dispatching code plus specific handlers for each pack type. It also contains code for dispatching fully-received messages to waiting application threads. Signed-off-by: John Ousterhout --- net/homa/homa_incoming.c | 1073 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 1073 insertions(+) create mode 100644 net/homa/homa_incoming.c diff --git a/net/homa/homa_incoming.c b/net/homa/homa_incoming.c new file mode 100644 index 000000000000..2504b74f5713 --- /dev/null +++ b/net/homa/homa_incoming.c @@ -0,0 +1,1073 @@ +// SPDX-License-Identifier: BSD-2-Clause + +/* This file contains functions that handle incoming Homa messages. + */ + +#include "homa_impl.h" +#include "homa_peer.h" +#include "homa_pool.h" + +/** + * homa_message_in_init() - Constructor for homa_message_in. + * @rpc: RPC whose msgin structure should be initialized. + * @length: Total number of bytes in message. + * Return: Zero for successful initialization, or a negative errno + * if rpc->msgin could not be initialized. + */ +int homa_message_in_init(struct homa_rpc *rpc, int length) +{ + int err; + + rpc->msgin.length = length; + skb_queue_head_init(&rpc->msgin.packets); + rpc->msgin.recv_end = 0; + INIT_LIST_HEAD(&rpc->msgin.gaps); + rpc->msgin.bytes_remaining = length; + rpc->msgin.resend_all = 0; + rpc->msgin.num_bpages = 0; + err = homa_pool_allocate(rpc); + if (err != 0) + return err; + return 0; +} + +/** + * homa_gap_new() - Create a new gap and add it to a list. + * @next: Add the new gap just before this list element. + * @start: Offset of first byte covered by the gap. + * @end: Offset of byte just after the last one covered by the gap. + * Return: Pointer to the new gap, or NULL if memory couldn't be allocated + * for the gap object. + */ +struct homa_gap *homa_gap_new(struct list_head *next, int start, int end) +{ + struct homa_gap *gap; + + gap = kmalloc(sizeof(*gap), GFP_KERNEL); + if (!gap) + return NULL; + gap->start = start; + gap->end = end; + gap->time = sched_clock(); + list_add_tail(&gap->links, next); + return gap; +} + +/** + * homa_gap_retry() - Send RESEND requests for all of the unreceived + * gaps in a message. + * @rpc: RPC to check; must be locked by caller. + */ +void homa_gap_retry(struct homa_rpc *rpc) +{ + struct homa_resend_hdr resend; + struct homa_gap *gap; + + list_for_each_entry(gap, &rpc->msgin.gaps, links) { + resend.offset = htonl(gap->start); + resend.length = htonl(gap->end - gap->start); + homa_xmit_control(RESEND, &resend, sizeof(resend), rpc); + } +} + +/** + * homa_add_packet() - Add an incoming packet to the contents of a + * partially received message. + * @rpc: Add the packet to the msgin for this RPC. + * @skb: The new packet. This function takes ownership of the packet + * (the packet will either be freed or added to rpc->msgin.packets). + */ +void homa_add_packet(struct homa_rpc *rpc, struct sk_buff *skb) +{ + struct homa_data_hdr *h = (struct homa_data_hdr *)skb->data; + struct homa_gap *gap, *dummy, *gap2; + int start = ntohl(h->seg.offset); + int length = homa_data_len(skb); + int end = start + length; + + if ((start + length) > rpc->msgin.length) + goto discard; + + if (start == rpc->msgin.recv_end) { + /* Common case: packet is sequential. */ + rpc->msgin.recv_end += length; + goto keep; + } + + if (start > rpc->msgin.recv_end) { + /* Packet creates a new gap. */ + if (!homa_gap_new(&rpc->msgin.gaps, + rpc->msgin.recv_end, start)) { + pr_err("Homa couldn't allocate gap: insufficient memory\n"); + goto discard; + } + rpc->msgin.recv_end = end; + goto keep; + } + + /* Must now check to see if the packet fills in part or all of + * an existing gap. + */ + list_for_each_entry_safe(gap, dummy, &rpc->msgin.gaps, links) { + /* Is packet at the start of this gap? */ + if (start <= gap->start) { + if (end <= gap->start) + continue; + if (start < gap->start) + goto discard; + if (end > gap->end) + goto discard; + gap->start = end; + if (gap->start >= gap->end) { + list_del(&gap->links); + kfree(gap); + } + goto keep; + } + + /* Is packet at the end of this gap? BTW, at this point we know + * the packet can't cover the entire gap. + */ + if (end >= gap->end) { + if (start >= gap->end) + continue; + if (end > gap->end) + goto discard; + gap->end = start; + goto keep; + } + + /* Packet is in the middle of the gap; must split the gap. */ + gap2 = homa_gap_new(&gap->links, gap->start, start); + if (!gap2) { + pr_err("Homa couldn't allocate gap for split: insufficient memory\n"); + goto discard; + } + gap2->time = gap->time; + gap->start = end; + goto keep; + } + +discard: + kfree_skb(skb); + return; + +keep: + __skb_queue_tail(&rpc->msgin.packets, skb); + rpc->msgin.bytes_remaining -= length; +} + +/** + * homa_copy_to_user() - Copy as much data as possible from incoming + * packet buffers to buffers in user space. + * @rpc: RPC for which data should be copied. Must be locked by caller. + * Return: Zero for success or a negative errno if there is an error. + * It is possible for the RPC to be freed while this function + * executes (it releases and reacquires the RPC lock). If that + * happens, -EINVAL will be returned and the state of @rpc + * will be RPC_DEAD. + */ +int homa_copy_to_user(struct homa_rpc *rpc) + __releases(rpc->bucket_lock) + __acquires(rpc->bucket_lock) +{ +#define MAX_SKBS 20 + struct sk_buff *skbs[MAX_SKBS]; + int error = 0; + int n = 0; /* Number of filled entries in skbs. */ + int i; + + /* Tricky note: we can't hold the RPC lock while we're actually + * copying to user space, because (a) it's illegal to hold a spinlock + * while copying to user space and (b) we'd like for homa_softirq + * to add more packets to the RPC while we're copying these out. + * So, collect a bunch of packets to copy, then release the lock, + * copy them, and reacquire the lock. + */ + while (true) { + struct sk_buff *skb; + + if (rpc->state == RPC_DEAD) { + error = -EINVAL; + break; + } + + skb = __skb_dequeue(&rpc->msgin.packets); + if (skb) { + skbs[n] = skb; + n++; + if (n < MAX_SKBS) + continue; + } + if (n == 0) + break; + + /* At this point we've collected a batch of packets (or + * run out of packets); copy any available packets out to + * user space. + */ + atomic_or(RPC_COPYING_TO_USER, &rpc->flags); + homa_rpc_unlock(rpc); + + /* Each iteration of this loop copies out one skb. */ + for (i = 0; i < n; i++) { + struct homa_data_hdr *h = (struct homa_data_hdr *) + skbs[i]->data; + int pkt_length = homa_data_len(skbs[i]); + int offset = ntohl(h->seg.offset); + int buf_bytes, chunk_size; + struct iov_iter iter; + int copied = 0; + char *dst; + + /* Each iteration of this loop copies to one + * user buffer. + */ + while (copied < pkt_length) { + chunk_size = pkt_length - copied; + dst = homa_pool_get_buffer(rpc, offset + copied, + &buf_bytes); + if (buf_bytes < chunk_size) { + if (buf_bytes == 0) + /* skb has data beyond message + * end? + */ + break; + chunk_size = buf_bytes; + } + error = import_ubuf(READ, (void __user *)dst, + chunk_size, &iter); + if (error) + goto free_skbs; + error = skb_copy_datagram_iter(skbs[i], + sizeof(*h) + + copied, &iter, + chunk_size); + if (error) + goto free_skbs; + copied += chunk_size; + } + } + +free_skbs: + for (i = 0; i < n; i++) + kfree_skb(skbs[i]); + n = 0; + atomic_or(APP_NEEDS_LOCK, &rpc->flags); + homa_rpc_lock(rpc, "homa_copy_to_user"); + atomic_andnot(APP_NEEDS_LOCK | RPC_COPYING_TO_USER, + &rpc->flags); + if (error) + break; + } + return error; +} + +/** + * homa_dispatch_pkts() - Top-level function that processes a batch of packets, + * all related to the same RPC. + * @skb: First packet in the batch, linked through skb->next. + * @homa: Overall information about the Homa transport. + */ +void homa_dispatch_pkts(struct sk_buff *skb, struct homa *homa) +{ +#define MAX_ACKS 10 + const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb); + struct homa_data_hdr *h = (struct homa_data_hdr *)skb->data; + __u64 id = homa_local_id(h->common.sender_id); + int dport = ntohs(h->common.dport); + + /* Used to collect acks from data packets so we can process them + * all at the end (can't process them inline because that may + * require locking conflicting RPCs). If we run out of space just + * ignore the extra acks; they'll be regenerated later through the + * explicit mechanism. + */ + struct homa_ack acks[MAX_ACKS]; + struct homa_rpc *rpc = NULL; + struct homa_sock *hsk; + struct sk_buff *next; + int num_acks = 0; + + /* Find the appropriate socket.*/ + hsk = homa_sock_find(homa->port_map, dport); + if (!hsk) { + if (skb_is_ipv6(skb)) + icmp6_send(skb, ICMPV6_DEST_UNREACH, + ICMPV6_PORT_UNREACH, 0, NULL, IP6CB(skb)); + else + icmp_send(skb, ICMP_DEST_UNREACH, + ICMP_PORT_UNREACH, 0); + while (skb) { + next = skb->next; + kfree_skb(skb); + skb = next; + } + return; + } + + /* Each iteration through the following loop processes one packet. */ + for (; skb; skb = next) { + h = (struct homa_data_hdr *)skb->data; + next = skb->next; + + /* Relinquish the RPC lock temporarily if it's needed + * elsewhere. + */ + if (rpc) { + int flags = atomic_read(&rpc->flags); + + if (flags & APP_NEEDS_LOCK) { + homa_rpc_unlock(rpc); + homa_spin(200); + rpc = NULL; + } + } + + /* Find and lock the RPC if we haven't already done so. */ + if (!rpc) { + if (!homa_is_client(id)) { + /* We are the server for this RPC. */ + if (h->common.type == DATA) { + int created; + + /* Create a new RPC if one doesn't + * already exist. + */ + rpc = homa_rpc_new_server(hsk, &saddr, + h, &created); + if (IS_ERR(rpc)) { + pr_warn("homa_pkt_dispatch couldn't create server rpc: error %lu", + -PTR_ERR(rpc)); + rpc = NULL; + goto discard; + } + } else { + rpc = homa_find_server_rpc(hsk, &saddr, + id); + } + } else { + rpc = homa_find_client_rpc(hsk, id); + } + } + if (unlikely(!rpc)) { + if (h->common.type != NEED_ACK && + h->common.type != ACK && + h->common.type != RESEND) + goto discard; + } else { + if (h->common.type == DATA || + h->common.type == BUSY || + h->common.type == NEED_ACK) + rpc->silent_ticks = 0; + rpc->peer->outstanding_resends = 0; + } + + switch (h->common.type) { + case DATA: + if (h->ack.client_id) { + /* Save the ack for processing later, when we + * have released the RPC lock. + */ + if (num_acks < MAX_ACKS) { + acks[num_acks] = h->ack; + num_acks++; + } + } + homa_data_pkt(skb, rpc); + break; + case RESEND: + homa_resend_pkt(skb, rpc, hsk); + break; + case UNKNOWN: + homa_unknown_pkt(skb, rpc); + break; + case BUSY: + /* Nothing to do for these packets except reset + * silent_ticks, which happened above. + */ + goto discard; + case NEED_ACK: + homa_need_ack_pkt(skb, hsk, rpc); + break; + case ACK: + homa_ack_pkt(skb, hsk, rpc); + rpc = NULL; + + /* It isn't safe to process more packets once we've + * released the RPC lock (this should never happen). + */ + BUG_ON(next); + break; + default: + goto discard; + } + continue; + +discard: + kfree_skb(skb); + } + if (rpc) + homa_rpc_unlock(rpc); + + while (num_acks > 0) { + num_acks--; + homa_rpc_acked(hsk, &saddr, &acks[num_acks]); + } + + if (hsk->dead_skbs >= 2 * hsk->homa->dead_buffs_limit) + /* We get here if neither homa_wait_for_message + * nor homa_timer can keep up with reaping dead + * RPCs. See reap.txt for details. + */ + homa_rpc_reap(hsk, hsk->homa->reap_limit); +} + +/** + * homa_data_pkt() - Handler for incoming DATA packets + * @skb: Incoming packet; size known to be large enough for the header. + * This function now owns the packet. + * @rpc: Information about the RPC corresponding to this packet. + * Must be locked by the caller. + */ +void homa_data_pkt(struct sk_buff *skb, struct homa_rpc *rpc) +{ + struct homa_data_hdr *h = (struct homa_data_hdr *)skb->data; + + if (rpc->state != RPC_INCOMING && homa_is_client(rpc->id)) { + if (unlikely(rpc->state != RPC_OUTGOING)) + goto discard; + rpc->state = RPC_INCOMING; + if (homa_message_in_init(rpc, ntohl(h->message_length)) != 0) + goto discard; + } else if (rpc->state != RPC_INCOMING) { + /* Must be server; note that homa_rpc_new_server already + * initialized msgin and allocated buffers. + */ + if (unlikely(rpc->msgin.length >= 0)) + goto discard; + } + + if (rpc->msgin.num_bpages == 0) + /* Drop packets that arrive when we can't allocate buffer + * space. If we keep them around, packet buffer usage can + * exceed available cache space, resulting in poor + * performance. + */ + goto discard; + + homa_add_packet(rpc, skb); + + if (skb_queue_len(&rpc->msgin.packets) != 0 && + !(atomic_read(&rpc->flags) & RPC_PKTS_READY)) { + atomic_or(RPC_PKTS_READY, &rpc->flags); + homa_sock_lock(rpc->hsk, "homa_data_pkt"); + homa_rpc_handoff(rpc); + homa_sock_unlock(rpc->hsk); + } + return; + +discard: + kfree_skb(skb); +} + +/** + * homa_resend_pkt() - Handler for incoming RESEND packets + * @skb: Incoming packet; size already verified large enough for header. + * This function now owns the packet. + * @rpc: Information about the RPC corresponding to this packet; must + * be locked by caller, but may be NULL if there is no RPC matching + * this packet + * @hsk: Socket on which the packet was received. + */ +void homa_resend_pkt(struct sk_buff *skb, struct homa_rpc *rpc, + struct homa_sock *hsk) +{ + struct homa_resend_hdr *h = (struct homa_resend_hdr *)skb->data; + struct homa_busy_hdr busy; + + if (!rpc) { + homa_xmit_unknown(skb, hsk); + goto done; + } + + if (!homa_is_client(rpc->id) && rpc->state != RPC_OUTGOING) { + /* We are the server for this RPC and don't yet have a + * response packet, so just send BUSY. + */ + homa_xmit_control(BUSY, &busy, sizeof(busy), rpc); + goto done; + } + if (ntohl(h->length) == 0) + /* This RESEND is from a server just trying to determine + * whether the client still cares about the RPC; return + * BUSY so the server doesn't time us out. + */ + homa_xmit_control(BUSY, &busy, sizeof(busy), rpc); + homa_resend_data(rpc, ntohl(h->offset), + ntohl(h->offset) + ntohl(h->length)); + +done: + kfree_skb(skb); +} + +/** + * homa_unknown_pkt() - Handler for incoming UNKNOWN packets. + * @skb: Incoming packet; size known to be large enough for the header. + * This function now owns the packet. + * @rpc: Information about the RPC corresponding to this packet. + */ +void homa_unknown_pkt(struct sk_buff *skb, struct homa_rpc *rpc) +{ + if (homa_is_client(rpc->id)) { + if (rpc->state == RPC_OUTGOING) { + /* It appears that everything we've already transmitted + * has been lost; retransmit it. + */ + homa_resend_data(rpc, 0, rpc->msgout.next_xmit_offset); + goto done; + } + + } else { + homa_rpc_free(rpc); + } +done: + kfree_skb(skb); +} + +/** + * homa_need_ack_pkt() - Handler for incoming NEED_ACK packets + * @skb: Incoming packet; size already verified large enough for header. + * This function now owns the packet. + * @hsk: Socket on which the packet was received. + * @rpc: The RPC named in the packet header, or NULL if no such + * RPC exists. The RPC has been locked by the caller. + */ +void homa_need_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk, + struct homa_rpc *rpc) +{ + struct homa_common_hdr *h = (struct homa_common_hdr *)skb->data; + const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb); + __u64 id = homa_local_id(h->sender_id); + struct homa_peer *peer; + struct homa_ack_hdr ack; + + /* Return if it's not safe for the peer to purge its state + * for this RPC (the RPC still exists and we haven't received + * the entire response), or if we can't find peer info. + */ + if (rpc && (rpc->state != RPC_INCOMING || + rpc->msgin.bytes_remaining)) { + goto done; + } else { + peer = homa_peer_find(hsk->homa->peers, &saddr, &hsk->inet); + if (IS_ERR(peer)) + goto done; + } + + /* Send an ACK for this RPC. At the same time, include all of the + * other acks available for the peer. Note: can't use rpc below, + * since it may be NULL. + */ + ack.common.type = ACK; + ack.common.sport = h->dport; + ack.common.dport = h->sport; + ack.common.sender_id = cpu_to_be64(id); + ack.num_acks = htons(homa_peer_get_acks(peer, + HOMA_MAX_ACKS_PER_PKT, + ack.acks)); + __homa_xmit_control(&ack, sizeof(ack), peer, hsk); + +done: + kfree_skb(skb); +} + +/** + * homa_ack_pkt() - Handler for incoming ACK packets + * @skb: Incoming packet; size already verified large enough for header. + * This function now owns the packet. + * @hsk: Socket on which the packet was received. + * @rpc: The RPC named in the packet header, or NULL if no such + * RPC exists. The RPC has been locked by the caller but will + * be unlocked here. + */ +void homa_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk, + struct homa_rpc *rpc) + __releases(rpc->bucket_lock) +{ + const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb); + struct homa_ack_hdr *h = (struct homa_ack_hdr *)skb->data; + int i, count; + + if (rpc) { + homa_rpc_free(rpc); + homa_rpc_unlock(rpc); + } + + count = ntohs(h->num_acks); + for (i = 0; i < count; i++) + homa_rpc_acked(hsk, &saddr, &h->acks[i]); + kfree_skb(skb); +} + +/** + * homa_rpc_abort() - Terminate an RPC. + * @rpc: RPC to be terminated. Must be locked by caller. + * @error: A negative errno value indicating the error that caused the abort. + * If this is a client RPC, the error will be returned to the + * application; if it's a server RPC, the error is ignored and + * we just free the RPC. + */ +void homa_rpc_abort(struct homa_rpc *rpc, int error) +{ + if (!homa_is_client(rpc->id)) { + homa_rpc_free(rpc); + return; + } + rpc->error = error; + homa_sock_lock(rpc->hsk, "homa_rpc_abort"); + if (!rpc->hsk->shutdown) + homa_rpc_handoff(rpc); + homa_sock_unlock(rpc->hsk); +} + +/** + * homa_abort_rpcs() - Abort all RPCs to/from a particular peer. + * @homa: Overall data about the Homa protocol implementation. + * @addr: Address (network order) of the destination whose RPCs are + * to be aborted. + * @port: If nonzero, then RPCs will only be aborted if they were + * targeted at this server port. + * @error: Negative errno value indicating the reason for the abort. + */ +void homa_abort_rpcs(struct homa *homa, const struct in6_addr *addr, + int port, int error) +{ + struct homa_socktab_scan scan; + struct homa_rpc *rpc, *tmp; + struct homa_sock *hsk; + + rcu_read_lock(); + for (hsk = homa_socktab_start_scan(homa->port_map, &scan); hsk; + hsk = homa_socktab_next(&scan)) { + /* Skip the (expensive) lock acquisition if there's no + * work to do. + */ + if (list_empty(&hsk->active_rpcs)) + continue; + if (!homa_protect_rpcs(hsk)) + continue; + list_for_each_entry_safe(rpc, tmp, &hsk->active_rpcs, + active_links) { + if (!ipv6_addr_equal(&rpc->peer->addr, addr)) + continue; + if (port && rpc->dport != port) + continue; + homa_rpc_lock(rpc, "rpc_abort_rpcs"); + homa_rpc_abort(rpc, error); + homa_rpc_unlock(rpc); + } + homa_unprotect_rpcs(hsk); + } + homa_socktab_end_scan(&scan); + rcu_read_unlock(); +} + +/** + * homa_abort_sock_rpcs() - Abort all outgoing (client-side) RPCs on a given + * socket. + * @hsk: Socket whose RPCs should be aborted. + * @error: Zero means that the aborted RPCs should be freed immediately. + * A nonzero value means that the RPCs should be marked + * complete, so that they can be returned to the application; + * this value (a negative errno) will be returned from + * recvmsg. + */ +void homa_abort_sock_rpcs(struct homa_sock *hsk, int error) +{ + struct homa_rpc *rpc, *tmp; + + rcu_read_lock(); + if (list_empty(&hsk->active_rpcs)) + goto done; + if (!homa_protect_rpcs(hsk)) + goto done; + list_for_each_entry_safe(rpc, tmp, &hsk->active_rpcs, active_links) { + if (!homa_is_client(rpc->id)) + continue; + homa_rpc_lock(rpc, "homa_abort_sock_rpcs"); + if (rpc->state == RPC_DEAD) { + homa_rpc_unlock(rpc); + continue; + } + if (error) + homa_rpc_abort(rpc, error); + else + homa_rpc_free(rpc); + homa_rpc_unlock(rpc); + } + homa_unprotect_rpcs(hsk); +done: + rcu_read_unlock(); +} + +/** + * homa_register_interests() - Records information in various places so + * that a thread will be woken up if an RPC that it cares about becomes + * available. + * @interest: Used to record information about the messages this thread is + * waiting on. The initial contents of the structure are + * assumed to be undefined. + * @hsk: Socket on which relevant messages will arrive. Must not be + * locked. + * @flags: Flags field from homa_recvmsg_args; see manual entry for + * details. + * @id: If non-zero, then the caller is interested in receiving + * the response for this RPC (@id must be a client request). + * Return: Either zero or a negative errno value. If a matching RPC + * is already available, information about it will be stored in + * interest. + */ +int homa_register_interests(struct homa_interest *interest, + struct homa_sock *hsk, int flags, __u64 id) +{ + struct homa_rpc *rpc = NULL; + + homa_interest_init(interest); + interest->locked = 1; + if (id != 0) { + if (!homa_is_client(id)) + return -EINVAL; + rpc = homa_find_client_rpc(hsk, id); /* Locks rpc. */ + if (!rpc) + return -EINVAL; + if (rpc->interest && rpc->interest != interest) { + homa_rpc_unlock(rpc); + return -EINVAL; + } + } + + /* Need both the RPC lock (acquired above) and the socket lock to + * avoid races. + */ + homa_sock_lock(hsk, "homa_register_interests"); + if (hsk->shutdown) { + homa_sock_unlock(hsk); + if (rpc) + homa_rpc_unlock(rpc); + return -ESHUTDOWN; + } + + if (id != 0) { + if ((atomic_read(&rpc->flags) & RPC_PKTS_READY) || rpc->error) + goto claim_rpc; + rpc->interest = interest; + interest->reg_rpc = rpc; + homa_rpc_unlock(rpc); + } + + interest->locked = 0; + if (flags & HOMA_RECVMSG_RESPONSE) { + if (!list_empty(&hsk->ready_responses)) { + rpc = list_first_entry(&hsk->ready_responses, + struct homa_rpc, + ready_links); + goto claim_rpc; + } + /* Insert this thread at the *front* of the list; + * we'll get better cache locality if we reuse + * the same thread over and over, rather than + * round-robining between threads. Same below. + */ + list_add(&interest->response_links, + &hsk->response_interests); + } + if (flags & HOMA_RECVMSG_REQUEST) { + if (!list_empty(&hsk->ready_requests)) { + rpc = list_first_entry(&hsk->ready_requests, + struct homa_rpc, ready_links); + /* Make sure the interest isn't on the response list; + * otherwise it might receive a second RPC. + */ + if (interest->response_links.next != LIST_POISON1) + list_del(&interest->response_links); + goto claim_rpc; + } + list_add(&interest->request_links, &hsk->request_interests); + } + homa_sock_unlock(hsk); + return 0; + +claim_rpc: + list_del_init(&rpc->ready_links); + if (!list_empty(&hsk->ready_requests) || + !list_empty(&hsk->ready_responses)) { + hsk->sock.sk_data_ready(&hsk->sock); + } + + /* This flag is needed to keep the RPC from being reaped during the + * gap between when we release the socket lock and we acquire the + * RPC lock. + */ + atomic_or(RPC_HANDING_OFF, &rpc->flags); + homa_sock_unlock(hsk); + if (!interest->locked) { + atomic_or(APP_NEEDS_LOCK, &rpc->flags); + homa_rpc_lock(rpc, "homa_register_interests"); + atomic_andnot(APP_NEEDS_LOCK, &rpc->flags); + interest->locked = 1; + } + atomic_andnot(RPC_HANDING_OFF, &rpc->flags); + atomic_long_set_release(&interest->ready_rpc, (long)rpc); + return 0; +} + +/** + * homa_wait_for_message() - Wait for receipt of an incoming message + * that matches the parameters. Various other activities can occur while + * waiting, such as reaping dead RPCs and copying data to user space. + * @hsk: Socket where messages will arrive. + * @flags: Flags field from homa_recvmsg_args; see manual entry for + * details. + * @id: If non-zero, then a response message matching this id may + * be returned (@id must refer to a client request). + * + * Return: Pointer to an RPC that matches @flags and @id, or a negative + * errno value. The RPC will be locked; the caller must unlock. + */ +struct homa_rpc *homa_wait_for_message(struct homa_sock *hsk, int flags, + __u64 id) + __acquires(&rpc->bucket_lock) +{ + struct homa_rpc *result = NULL; + struct homa_interest interest; + struct homa_rpc *rpc = NULL; + int error; + + /* Each iteration of this loop finds an RPC, but it might not be + * in a state where we can return it (e.g., there might be packets + * ready to transfer to user space, but the incoming message isn't yet + * complete). Thus it could take many iterations of this loop + * before we have an RPC with a complete message. + */ + while (1) { + error = homa_register_interests(&interest, hsk, flags, id); + rpc = (struct homa_rpc *)atomic_long_read(&interest.ready_rpc); + if (rpc) + goto found_rpc; + if (error < 0) { + result = ERR_PTR(error); + goto found_rpc; + } + + /* There is no ready RPC so far. Clean up dead RPCs before + * going to sleep (or returning, if in nonblocking mode). + */ + while (1) { + int reaper_result; + + rpc = (struct homa_rpc *)atomic_long_read(&interest + .ready_rpc); + if (rpc) + goto found_rpc; + reaper_result = homa_rpc_reap(hsk, + hsk->homa->reap_limit); + if (reaper_result == 0) + break; + + /* Give NAPI and SoftIRQ tasks a chance to run. */ + schedule(); + } + if (flags & HOMA_RECVMSG_NONBLOCKING) { + result = ERR_PTR(-EAGAIN); + goto found_rpc; + } + + /* Now it's time to sleep. */ + set_current_state(TASK_INTERRUPTIBLE); + rpc = (struct homa_rpc *)atomic_long_read(&interest.ready_rpc); + if (!rpc && !hsk->shutdown) + schedule(); + __set_current_state(TASK_RUNNING); + +found_rpc: + /* If we get here, it means either an RPC is ready for our + * attention or an error occurred. + * + * First, clean up all of the interests. Must do this before + * making any other decisions, because until we do, an incoming + * message could still be passed to us. Note: if we went to + * sleep, then this info was already cleaned up by whoever + * woke us up. Also, values in the interest may change between + * when we test them below and when we acquire the socket lock, + * so they have to be checked again after locking the socket. + */ + if (interest.reg_rpc || + interest.request_links.next != LIST_POISON1 || + interest.response_links.next != LIST_POISON1) { + homa_sock_lock(hsk, "homa_wait_for_message"); + if (interest.reg_rpc) + interest.reg_rpc->interest = NULL; + if (interest.request_links.next != LIST_POISON1) + list_del(&interest.request_links); + if (interest.response_links.next != LIST_POISON1) + list_del(&interest.response_links); + homa_sock_unlock(hsk); + } + + /* Now check to see if we received an RPC handoff (note that + * this could have happened anytime up until we reset the + * interests above). + */ + rpc = (struct homa_rpc *)atomic_long_read(&interest.ready_rpc); + if (rpc) { + if (!interest.locked) { + atomic_or(APP_NEEDS_LOCK, &rpc->flags); + homa_rpc_lock(rpc, "homa_wait_for_message"); + atomic_andnot(APP_NEEDS_LOCK | RPC_HANDING_OFF, + &rpc->flags); + } else { + atomic_andnot(RPC_HANDING_OFF, &rpc->flags); + } + if (!rpc->error) + rpc->error = homa_copy_to_user(rpc); + if (rpc->state == RPC_DEAD) { + homa_rpc_unlock(rpc); + continue; + } + if (rpc->error) + goto done; + atomic_andnot(RPC_PKTS_READY, &rpc->flags); + if (rpc->msgin.bytes_remaining == 0 && + !skb_queue_len(&rpc->msgin.packets)) + goto done; + homa_rpc_unlock(rpc); + } + + /* A complete message isn't available: check for errors. */ + if (IS_ERR(result)) + return result; + if (signal_pending(current)) + return ERR_PTR(-EINTR); + + /* No message and no error; try again. */ + } + +done: + return rpc; +} + +/** + * homa_choose_interest() - Given a list of interests for an incoming + * message, choose the best one to handle it (if any). + * @homa: Overall information about the Homa transport. + * @head: Head pointers for the list of interest: either + * hsk->request_interests or hsk->response_interests. + * @offset: Offset of "next" pointers in the list elements (either + * offsetof(request_links) or offsetof(response_links). + * Return: An interest to use for the incoming message, or NULL if none + * is available. If possible, this function tries to pick an + * interest whose thread is running on a core that isn't + * currently busy doing Homa transport work. + */ +struct homa_interest *homa_choose_interest(struct homa *homa, + struct list_head *head, int offset) +{ + struct homa_interest *backup = NULL; + struct homa_interest *interest; + struct list_head *pos; + + list_for_each(pos, head) { + interest = (struct homa_interest *)(((char *)pos) - offset); + if (!backup) + backup = interest; + } + + /* All interested threads are on busy cores; return the first. */ + return backup; +} + +/** + * homa_rpc_handoff() - This function is called when the input message for + * an RPC is ready for attention from a user thread. It either notifies + * a waiting reader or queues the RPC. + * @rpc: RPC to handoff; must be locked. The caller must + * also have locked the socket for this RPC. + */ +void homa_rpc_handoff(struct homa_rpc *rpc) +{ + struct homa_sock *hsk = rpc->hsk; + struct homa_interest *interest; + + if ((atomic_read(&rpc->flags) & RPC_HANDING_OFF) || + !list_empty(&rpc->ready_links)) + return; + + /* First, see if someone is interested in this RPC specifically. + */ + if (rpc->interest) { + interest = rpc->interest; + goto thread_waiting; + } + + /* Second, check the interest list for this type of RPC. */ + if (homa_is_client(rpc->id)) { + interest = homa_choose_interest(hsk->homa, + &hsk->response_interests, + offsetof(struct homa_interest, + response_links)); + if (interest) + goto thread_waiting; + list_add_tail(&rpc->ready_links, &hsk->ready_responses); + } else { + interest = homa_choose_interest(hsk->homa, + &hsk->request_interests, + offsetof(struct homa_interest, + request_links)); + if (interest) + goto thread_waiting; + list_add_tail(&rpc->ready_links, &hsk->ready_requests); + } + + /* If we get here, no-one is waiting for the RPC, so it has been + * queued. + */ + + /* Notify the poll mechanism. */ + hsk->sock.sk_data_ready(&hsk->sock); + return; + +thread_waiting: + /* We found a waiting thread. The following 3 lines must be here, + * before clearing the interest, in order to avoid a race with + * homa_wait_for_message (which won't acquire the socket lock if + * the interest is clear). + */ + atomic_or(RPC_HANDING_OFF, &rpc->flags); + interest->locked = 0; + atomic_long_set_release(&interest->ready_rpc, (long)rpc); + + /* Clear the interest. This serves two purposes. First, it saves + * the waking thread from acquiring the socket lock again, which + * reduces contention on that lock). Second, it ensures that + * no-one else attempts to give this interest a different RPC. + */ + if (interest->reg_rpc) { + interest->reg_rpc->interest = NULL; + interest->reg_rpc = NULL; + } + if (interest->request_links.next != LIST_POISON1) + list_del(&interest->request_links); + if (interest->response_links.next != LIST_POISON1) + list_del(&interest->response_links); + wake_up_process(interest->thread); +} + +/** + * homa_incoming_sysctl_changed() - Invoked whenever a sysctl value is changed; + * any input-related parameters that depend on sysctl-settable values. + * @homa: Overall data about the Homa protocol implementation. + */ +void homa_incoming_sysctl_changed(struct homa *homa) +{ +}