diff mbox series

[RFC,v8,18/20] selftests: Add a bpf fq qdisc to selftest

Message ID 20240510192412.3297104-19-amery.hung@bytedance.com (mailing list archive)
State RFC
Delegated to: BPF
Headers show
Series bpf qdisc | expand

Checks

Context Check Description
netdev/tree_selection success Guessing tree name failed - patch did not apply, async

Commit Message

Amery Hung May 10, 2024, 7:24 p.m. UTC
This test implements a more sophisticated qdisc using bpf. The bpf fair-
queueing (fq) qdisc gives each flow an equal chance to transmit data. It
also respects the timestamp of skb for rate limiting. The implementation
does not prevent hash collision of flows nor does it recycle flows.

The bpf fq also takes the chance to communicate packet drop information
with a bpf clsact EDT rate limiter using bpf maps. With the info, the
rate limiter can compenstate the delay caused by packet drops in qdisc
to maintain the throughput.

Signed-off-by: Amery Hung <amery.hung@bytedance.com>
---
 .../selftests/bpf/prog_tests/bpf_qdisc.c      |  24 +
 .../selftests/bpf/progs/bpf_qdisc_fq.c        | 660 ++++++++++++++++++
 2 files changed, 684 insertions(+)
 create mode 100644 tools/testing/selftests/bpf/progs/bpf_qdisc_fq.c

Comments

Martin KaFai Lau May 24, 2024, 6:24 a.m. UTC | #1
On 5/10/24 12:24 PM, Amery Hung wrote:
> This test implements a more sophisticated qdisc using bpf. The bpf fair-
> queueing (fq) qdisc gives each flow an equal chance to transmit data. It
> also respects the timestamp of skb for rate limiting. The implementation
> does not prevent hash collision of flows nor does it recycle flows.

Does it hit some issue to handle the flow collision (just curious if there are 
missing pieces to do this)?

> The bpf fq also takes the chance to communicate packet drop information
> with a bpf clsact EDT rate limiter using bpf maps. With the info, the
> rate limiter can compenstate the delay caused by packet drops in qdisc
> to maintain the throughput.
> 

> diff --git a/tools/testing/selftests/bpf/progs/bpf_qdisc_fq.c b/tools/testing/selftests/bpf/progs/bpf_qdisc_fq.c
> new file mode 100644
> index 000000000000..5118237da9e4
> --- /dev/null
> +++ b/tools/testing/selftests/bpf/progs/bpf_qdisc_fq.c
> @@ -0,0 +1,660 @@
> +#include <vmlinux.h>
> +#include <bpf/bpf_helpers.h>
> +#include "bpf_experimental.h"
> +#include "bpf_qdisc_common.h"
> +
> +char _license[] SEC("license") = "GPL";
> +
> +#define NSEC_PER_USEC 1000L
> +#define NSEC_PER_SEC 1000000000L
> +#define PSCHED_MTU (64 * 1024 + 14)
> +
> +#define NUM_QUEUE_LOG 10
> +#define NUM_QUEUE (1 << NUM_QUEUE_LOG)
> +#define PRIO_QUEUE (NUM_QUEUE + 1)
> +#define COMP_DROP_PKT_DELAY 1
> +#define THROTTLED 0xffffffffffffffff
> +
> +/* fq configuration */
> +__u64 q_flow_refill_delay = 40 * 10000; //40us
> +__u64 q_horizon = 10ULL * NSEC_PER_SEC;
> +__u32 q_initial_quantum = 10 * PSCHED_MTU;
> +__u32 q_quantum = 2 * PSCHED_MTU;
> +__u32 q_orphan_mask = 1023;
> +__u32 q_flow_plimit = 100;
> +__u32 q_plimit = 10000;
> +__u32 q_timer_slack = 10 * NSEC_PER_USEC;
> +bool q_horizon_drop = true;
> +
> +bool q_compensate_tstamp;
> +bool q_random_drop;
> +
> +unsigned long time_next_delayed_flow = ~0ULL;
> +unsigned long unthrottle_latency_ns = 0ULL;
> +unsigned long ktime_cache = 0;
> +unsigned long dequeue_now;
> +unsigned int fq_qlen = 0;

I suspect some of these globals may be more natural if it is stored private to 
an individual Qdisc instance. i.e. qdisc_priv(). e.g. in the sch_mq setup.

A high level idea is to allow the SEC(".struct_ops.link") to specify its own 
Qdisc_ops.priv_size.

The bpf prog could use it as a simple u8 array memory area to write anything but 
the verifier can't learn a lot from it. It will be more useful if it can work 
like map_value(s) to the verifier such that the verifier can also see the 
bpf_rb_root/bpf_list_head/bpf_spin_lock...etc.

> +
> +struct fq_flow_node {
> +	u32 hash;
> +	int credit;
> +	u32 qlen;
> +	u32 socket_hash;
> +	u64 age;
> +	u64 time_next_packet;
> +	struct bpf_list_node list_node;
> +	struct bpf_rb_node rb_node;
> +	struct bpf_rb_root queue __contains_kptr(sk_buff, bpf_rbnode);
> +	struct bpf_spin_lock lock;
> +	struct bpf_refcount refcount;
> +};
> +
> +struct dequeue_nonprio_ctx {
> +	bool dequeued;
> +	u64 expire;
> +};
> +
> +struct fq_stashed_flow {
> +	struct fq_flow_node __kptr *flow;
> +};
> +
> +struct stashed_skb {
> +	struct sk_buff __kptr *skb;
> +};
> +
> +/* [NUM_QUEUE] for TC_PRIO_CONTROL
> + * [0, NUM_QUEUE - 1] for other flows
> + */
> +struct {
> +	__uint(type, BPF_MAP_TYPE_ARRAY);
> +	__type(key, __u32);
> +	__type(value, struct fq_stashed_flow);
> +	__uint(max_entries, NUM_QUEUE + 1);
> +} fq_stashed_flows SEC(".maps");
> +
> +struct {
> +	__uint(type, BPF_MAP_TYPE_HASH);
> +	__type(key, __u32);
> +	__type(value, __u64);
> +	__uint(pinning, LIBBPF_PIN_BY_NAME);
> +	__uint(max_entries, 16);
> +} rate_map SEC(".maps");
> +
> +struct {
> +	__uint(type, BPF_MAP_TYPE_HASH);
> +	__type(key, __u32);
> +	__type(value, __u64);
> +	__uint(pinning, LIBBPF_PIN_BY_NAME);
> +	__uint(max_entries, 16);
> +} comp_map SEC(".maps");
> +
> +#define private(name) SEC(".data." #name) __hidden __attribute__((aligned(8)))
> +
> +private(A) struct bpf_spin_lock fq_delayed_lock;
> +private(A) struct bpf_rb_root fq_delayed __contains(fq_flow_node, rb_node);
> +
> +private(B) struct bpf_spin_lock fq_new_flows_lock;
> +private(B) struct bpf_list_head fq_new_flows __contains(fq_flow_node, list_node);
> +
> +private(C) struct bpf_spin_lock fq_old_flows_lock;
> +private(C) struct bpf_list_head fq_old_flows __contains(fq_flow_node, list_node);
> +
> +private(D) struct bpf_spin_lock fq_stashed_skb_lock;
> +private(D) struct bpf_list_head fq_stashed_skb __contains_kptr(sk_buff, bpf_list);

[ ... ]

> +SEC("struct_ops/bpf_fq_enqueue")
> +int BPF_PROG(bpf_fq_enqueue, struct sk_buff *skb, struct Qdisc *sch,
> +	     struct bpf_sk_buff_ptr *to_free)
> +{
> +	struct iphdr *iph = (void *)(long)skb->data + sizeof(struct ethhdr);
> +	u64 time_to_send, jiffies, delay_ns, *comp_ns, *rate;
> +	struct fq_flow_node *flow = NULL, *flow_copy;
> +	struct fq_stashed_flow *sflow;
> +	u32 hash, daddr, sk_hash;
> +	bool connected;
> +
> +	if (q_random_drop & (bpf_get_prandom_u32() > ~0U * 0.90))
> +		goto drop;
> +
> +	if (fq_qlen >= q_plimit)
> +		goto drop;
> +
> +	if (!skb->tstamp) {
> +		time_to_send = ktime_cache = bpf_ktime_get_ns();
> +	} else {
> +		if (fq_packet_beyond_horizon(skb)) {
> +			ktime_cache = bpf_ktime_get_ns();
> +			if (fq_packet_beyond_horizon(skb)) {
> +				if (q_horizon_drop)
> +					goto drop;
> +
> +				skb->tstamp = ktime_cache + q_horizon;
> +			}
> +		}
> +		time_to_send = skb->tstamp;
> +	}
> +
> +	if (fq_classify(skb, &hash, &sflow, &connected, &sk_hash) < 0)
> +		goto drop;
> +
> +	flow = bpf_kptr_xchg(&sflow->flow, flow);
> +	if (!flow)
> +		goto drop; //unexpected
> +
> +	if (hash != PRIO_QUEUE) {
> +		if (connected && flow->socket_hash != sk_hash) {
> +			flow->credit = q_initial_quantum;
> +			flow->socket_hash = sk_hash;
> +			if (fq_flow_is_throttled(flow)) {
> +				/* mark the flow as undetached. The reference to the
> +				 * throttled flow in fq_delayed will be removed later.
> +				 */
> +				flow_copy = bpf_refcount_acquire(flow);
> +				flow_copy->age = 0;
> +				fq_flows_add_tail(&fq_old_flows, &fq_old_flows_lock, flow_copy);
> +			}
> +			flow->time_next_packet = 0ULL;
> +		}
> +
> +		if (flow->qlen >= q_flow_plimit) {
> +			bpf_kptr_xchg_back(&sflow->flow, flow);
> +			goto drop;
> +		}
> +
> +		if (fq_flow_is_detached(flow)) {
> +			if (connected)
> +				flow->socket_hash = sk_hash;
> +
> +			flow_copy = bpf_refcount_acquire(flow);
> +
> +			jiffies = bpf_jiffies64();
> +			if ((s64)(jiffies - (flow_copy->age + q_flow_refill_delay)) > 0) {
> +				if (flow_copy->credit < q_quantum)
> +					flow_copy->credit = q_quantum;
> +			}
> +			flow_copy->age = 0;
> +			fq_flows_add_tail(&fq_new_flows, &fq_new_flows_lock, flow_copy);
> +		}
> +	}
> +
> +	skb->tstamp = time_to_send;
> +
> +	bpf_spin_lock(&flow->lock);
> +	bpf_rbtree_excl_add(&flow->queue, &skb->bpf_rbnode, skb_tstamp_less);
> +	bpf_spin_unlock(&flow->lock);
> +
> +	flow->qlen++;
> +	bpf_kptr_xchg_back(&sflow->flow, flow);
> +
> +	fq_qlen++;
> +	return NET_XMIT_SUCCESS;
> +
> +drop:
> +	if (q_compensate_tstamp) {
> +		bpf_probe_read_kernel(&daddr, sizeof(daddr), &iph->daddr);
> +		rate = bpf_map_lookup_elem(&rate_map, &daddr);
> +		comp_ns = bpf_map_lookup_elem(&comp_map, &daddr);
> +		if (rate && comp_ns) {
> +			delay_ns = (u64)qdisc_skb_cb(skb)->pkt_len * NSEC_PER_SEC / (*rate);
> +			__sync_fetch_and_add(comp_ns, delay_ns);
> +		}
> +	}
> +	bpf_qdisc_skb_drop(skb, to_free);
> +	return NET_XMIT_DROP;
> +}

[ ... ]

> +SEC("struct_ops/bpf_fq_dequeue")
> +struct sk_buff *BPF_PROG(bpf_fq_dequeue, struct Qdisc *sch)
> +{
> +	struct dequeue_nonprio_ctx cb_ctx = {};
> +	struct sk_buff *skb = NULL;
> +
> +	skb = fq_dequeue_prio();
> +	if (skb) {
> +		bpf_skb_set_dev(skb, sch);
> +		return skb;
> +	}
> +
> +	ktime_cache = dequeue_now = bpf_ktime_get_ns();
> +	fq_check_throttled();
> +	bpf_loop(q_plimit, fq_dequeue_nonprio_flows, &cb_ctx, 0);
> +
> +	skb = get_stashed_skb();
> +
> +	if (skb) {
> +		bpf_skb_set_dev(skb, sch);
> +		return skb;
> +	}
> +
> +	if (cb_ctx.expire)
> +		bpf_qdisc_watchdog_schedule(sch, cb_ctx.expire, q_timer_slack);
> +
> +	return NULL;
> +}

The enqueue and dequeue are using the bpf map (e.g. arraymap) or global var 
(also an arraymap). Potentially, the map can be shared by different qdisc 
instances (sch) and they could be attached to different net devices also. Not 
sure if there is potentail issue? e.g. the bpf_fq_reset below.
or a bpf prog dequeue a skb with a different skb->dev.

> +
> +static int
> +fq_reset_flows(u32 index, void *ctx)
> +{
> +	struct bpf_list_node *node;
> +	struct fq_flow_node *flow;
> +
> +	bpf_spin_lock(&fq_new_flows_lock);
> +	node = bpf_list_pop_front(&fq_new_flows);
> +	bpf_spin_unlock(&fq_new_flows_lock);
> +	if (!node) {
> +		bpf_spin_lock(&fq_old_flows_lock);
> +		node = bpf_list_pop_front(&fq_old_flows);
> +		bpf_spin_unlock(&fq_old_flows_lock);
> +		if (!node)
> +			return 1;
> +	}
> +
> +	flow = container_of(node, struct fq_flow_node, list_node);
> +	bpf_obj_drop(flow);
> +
> +	return 0;
> +}
> +
> +static int
> +fq_reset_stashed_flows(u32 index, void *ctx)
> +{
> +	struct fq_flow_node *flow = NULL;
> +	struct fq_stashed_flow *sflow;
> +
> +	sflow = bpf_map_lookup_elem(&fq_stashed_flows, &index);
> +	if (!sflow)
> +		return 0;
> +
> +	flow = bpf_kptr_xchg(&sflow->flow, flow);
> +	if (flow)
> +		bpf_obj_drop(flow);
> +
> +	return 0;
> +}
> +
> +SEC("struct_ops/bpf_fq_reset")
> +void BPF_PROG(bpf_fq_reset, struct Qdisc *sch)
> +{
> +	bool unset_all = true;
> +	fq_qlen = 0;
> +	bpf_loop(NUM_QUEUE + 1, fq_reset_stashed_flows, NULL, 0);
> +	bpf_loop(NUM_QUEUE, fq_reset_flows, NULL, 0);
> +	bpf_loop(NUM_QUEUE, fq_unset_throttled_flows, &unset_all, 0);

I am not sure if it can depend on a bpf prog to do cleanup/reset. What if it 
missed to drop some skb which potentially could hold up resources like sk/dev/netns?

A quick thought is that the struct_ops knows all the bpf progs and each prog 
tracks the used map in prog->aux->used_maps. The kernel can clean it up. 
However, the map may still be used by other Qdisc instances.

It may be easier if the skb can only be enqueued somewhere in the qdisc_priv() 
and then cleans up its own qdisc_priv during reset.

> +	return;
> +}
> +
> +SEC(".struct_ops")
> +struct Qdisc_ops fq = {
> +	.enqueue   = (void *)bpf_fq_enqueue,
> +	.dequeue   = (void *)bpf_fq_dequeue,
> +	.reset     = (void *)bpf_fq_reset,
> +	.id        = "bpf_fq",
> +};
Toke Høiland-Jørgensen May 24, 2024, 7:40 a.m. UTC | #2
Martin KaFai Lau <martin.lau@linux.dev> writes:

> [ ... ]
>
>> +SEC("struct_ops/bpf_fq_dequeue")
>> +struct sk_buff *BPF_PROG(bpf_fq_dequeue, struct Qdisc *sch)
>> +{
>> +	struct dequeue_nonprio_ctx cb_ctx = {};
>> +	struct sk_buff *skb = NULL;
>> +
>> +	skb = fq_dequeue_prio();
>> +	if (skb) {
>> +		bpf_skb_set_dev(skb, sch);
>> +		return skb;
>> +	}
>> +
>> +	ktime_cache = dequeue_now = bpf_ktime_get_ns();
>> +	fq_check_throttled();
>> +	bpf_loop(q_plimit, fq_dequeue_nonprio_flows, &cb_ctx, 0);
>> +
>> +	skb = get_stashed_skb();
>> +
>> +	if (skb) {
>> +		bpf_skb_set_dev(skb, sch);
>> +		return skb;
>> +	}
>> +
>> +	if (cb_ctx.expire)
>> +		bpf_qdisc_watchdog_schedule(sch, cb_ctx.expire, q_timer_slack);
>> +
>> +	return NULL;
>> +}
>
> The enqueue and dequeue are using the bpf map (e.g. arraymap) or global var 
> (also an arraymap). Potentially, the map can be shared by different qdisc 
> instances (sch) and they could be attached to different net devices also. Not 
> sure if there is potentail issue? e.g. the bpf_fq_reset below.
> or a bpf prog dequeue a skb with a different skb->dev.

I think behaviour like this is potentially quite interesting and will
allow some neat optimisations (skipping a redirect to a different
interface and just directly enqueueing it to a different place comes to
mind). However, as you point out it may lead to weird things like a
mismatched skb->dev, so if we allow this we should make sure that the
kernel will disallow (or fix) such behaviour. I'm not sure how difficult
that is; for instance, I *think* the mismatched skb->dev can lead to
bugs, but I'm not quite sure.

So maybe it's better to disallow "crossing over" like this, and relax
that restriction later if/when we have a concrete use case?

-Toke
Alexei Starovoitov May 24, 2024, 7:33 p.m. UTC | #3
On Thu, May 23, 2024 at 11:25 PM Martin KaFai Lau <martin.lau@linux.dev> wrote:
>
> > +
> > +unsigned long time_next_delayed_flow = ~0ULL;
> > +unsigned long unthrottle_latency_ns = 0ULL;
> > +unsigned long ktime_cache = 0;
> > +unsigned long dequeue_now;
> > +unsigned int fq_qlen = 0;
>
> I suspect some of these globals may be more natural if it is stored private to
> an individual Qdisc instance. i.e. qdisc_priv(). e.g. in the sch_mq setup.
>
> A high level idea is to allow the SEC(".struct_ops.link") to specify its own
> Qdisc_ops.priv_size.
>
> The bpf prog could use it as a simple u8 array memory area to write anything but
> the verifier can't learn a lot from it. It will be more useful if it can work
> like map_value(s) to the verifier such that the verifier can also see the
> bpf_rb_root/bpf_list_head/bpf_spin_lock...etc.

Qdisc_ops.priv_size is too qdsic specific.
imo using globals here is fine. bpf prog can use hash map or arena
to store per-netdev or per-qdisc data.
The less custom things the better.
Martin KaFai Lau May 24, 2024, 8:54 p.m. UTC | #4
On 5/24/24 12:33 PM, Alexei Starovoitov wrote:
> On Thu, May 23, 2024 at 11:25 PM Martin KaFai Lau <martin.lau@linux.dev> wrote:
>>
>>> +
>>> +unsigned long time_next_delayed_flow = ~0ULL;
>>> +unsigned long unthrottle_latency_ns = 0ULL;
>>> +unsigned long ktime_cache = 0;
>>> +unsigned long dequeue_now;
>>> +unsigned int fq_qlen = 0;
>>
>> I suspect some of these globals may be more natural if it is stored private to
>> an individual Qdisc instance. i.e. qdisc_priv(). e.g. in the sch_mq setup.
>>
>> A high level idea is to allow the SEC(".struct_ops.link") to specify its own
>> Qdisc_ops.priv_size.
>>
>> The bpf prog could use it as a simple u8 array memory area to write anything but
>> the verifier can't learn a lot from it. It will be more useful if it can work
>> like map_value(s) to the verifier such that the verifier can also see the
>> bpf_rb_root/bpf_list_head/bpf_spin_lock...etc.
> 
> Qdisc_ops.priv_size is too qdsic specific.

Instead of priv_size, may be something like a bpf_local_storage for Qdisc is 
closer to how other kernel objects (sk/task/cgrp) are doing it now. Like 
bpf_sk_storage that goes away with the sk. It needs a storage that goes away 
with the Qdisc.

I was thinking using priv_size to work something like a bpf_local_storage 
without the pointer array redirection by pre-defining all map_values it wants to 
store in the Qdisc, so the total size of the pre-defined map_values will be the 
priv_size. However, I haven't thought through how it should look like from 
bpf_prog.c to the kernel. It is an optimization of the bpf_local_storage.

> imo using globals here is fine. bpf prog can use hash map or arena
> to store per-netdev or per-qdisc data.
> The less custom things the better.
Martin KaFai Lau May 26, 2024, 1:08 a.m. UTC | #5
On 5/24/24 12:40 AM, Toke Høiland-Jørgensen wrote:
> I think behaviour like this is potentially quite interesting and will
> allow some neat optimisations (skipping a redirect to a different
> interface and just directly enqueueing it to a different place comes to

hmm... I am not sure it is a good/safe optimization. From looking at 
skb_do_redirect, there are quite a few things bypassed from __dev_queue_xmit 
upto the final dequeue of the redirected dev. I don't know if all of them is not 
dev dependent.

> mind). However, as you point out it may lead to weird things like a
> mismatched skb->dev, so if we allow this we should make sure that the
> kernel will disallow (or fix) such behaviour.

Have been thinking about the skb->dev "fix" but the thought is originally for 
the bpf_skb_set_dev() use case in patch 14.

Note that the struct_ops ".dequeue" is actually realized by a fentry trampoline 
(call it fentry ".dequeue"). May be using an extra fexit ".dequeue" here. The 
fexit ".dequeue" will be called after the fentry ".dequeue". The fexit 
".dequeue" has the function arguments (sch here that has the correct dev) and 
the return value (skb) from the fentry ".dequeue". This will be an extra call 
(to the fexit ".dequeue") and very specific to this use case but may be the less 
evil solution I can think of now...
Toke Høiland-Jørgensen May 27, 2024, 10:09 a.m. UTC | #6
Martin KaFai Lau <martin.lau@linux.dev> writes:

> On 5/24/24 12:40 AM, Toke Høiland-Jørgensen wrote:
>> I think behaviour like this is potentially quite interesting and will
>> allow some neat optimisations (skipping a redirect to a different
>> interface and just directly enqueueing it to a different place comes to
>
> hmm... I am not sure it is a good/safe optimization. From looking at
> skb_do_redirect, there are quite a few things bypassed from
> __dev_queue_xmit upto the final dequeue of the redirected dev. I don't
> know if all of them is not dev dependent.

There are certainly footguns, but as long as they are of the "break the
data path" variety and not the "immediately crash the kernel" variety
that may be OK. After all, you can already do plenty of convoluted
things with BPF that will break things. And glancing through the
redirect code, nothing immediately jumps out as something that will
definitely crash, AFAICT.

However, it does feel a bit risky, so I am also totally fine with
disallowing this until someone comes up with a concrete use case where
it would be beneficial :)

>> mind). However, as you point out it may lead to weird things like a
>> mismatched skb->dev, so if we allow this we should make sure that the
>> kernel will disallow (or fix) such behaviour.
>
> Have been thinking about the skb->dev "fix" but the thought is originally for 
> the bpf_skb_set_dev() use case in patch 14.
>
> Note that the struct_ops ".dequeue" is actually realized by a fentry trampoline 
> (call it fentry ".dequeue"). May be using an extra fexit ".dequeue" here. The 
> fexit ".dequeue" will be called after the fentry ".dequeue". The fexit 
> ".dequeue" has the function arguments (sch here that has the correct dev) and 
> the return value (skb) from the fentry ".dequeue". This will be an extra call 
> (to the fexit ".dequeue") and very specific to this use case but may be the less 
> evil solution I can think of now...

That's an interesting idea, certainly! Relying on fexit functions
to do specific sanity checks/fixups after a BPF program has run
(enforcing/checking post-conditions, basically) does not seem totally
crazy to me, and may have other applications :)

-Toke
diff mbox series

Patch

diff --git a/tools/testing/selftests/bpf/prog_tests/bpf_qdisc.c b/tools/testing/selftests/bpf/prog_tests/bpf_qdisc.c
index 295d0216e70f..394bf5a4adae 100644
--- a/tools/testing/selftests/bpf/prog_tests/bpf_qdisc.c
+++ b/tools/testing/selftests/bpf/prog_tests/bpf_qdisc.c
@@ -4,6 +4,7 @@ 
 
 #include "network_helpers.h"
 #include "bpf_qdisc_fifo.skel.h"
+#include "bpf_qdisc_fq.skel.h"
 
 #ifndef ENOTSUPP
 #define ENOTSUPP 524
@@ -154,8 +155,31 @@  static void test_fifo(void)
 	bpf_qdisc_fifo__destroy(fifo_skel);
 }
 
+static void test_fq(void)
+{
+	struct bpf_qdisc_fq *fq_skel;
+	struct bpf_link *link;
+
+	fq_skel = bpf_qdisc_fq__open_and_load();
+	if (!ASSERT_OK_PTR(fq_skel, "bpf_qdisc_fq__open_and_load"))
+		return;
+
+	link = bpf_map__attach_struct_ops(fq_skel->maps.fq);
+	if (!ASSERT_OK_PTR(link, "bpf_map__attach_struct_ops")) {
+		bpf_qdisc_fq__destroy(fq_skel);
+		return;
+	}
+
+	do_test("bpf_fq");
+
+	bpf_link__destroy(link);
+	bpf_qdisc_fq__destroy(fq_skel);
+}
+
 void test_bpf_qdisc(void)
 {
 	if (test__start_subtest("fifo"))
 		test_fifo();
+	if (test__start_subtest("fq"))
+		test_fq();
 }
diff --git a/tools/testing/selftests/bpf/progs/bpf_qdisc_fq.c b/tools/testing/selftests/bpf/progs/bpf_qdisc_fq.c
new file mode 100644
index 000000000000..5118237da9e4
--- /dev/null
+++ b/tools/testing/selftests/bpf/progs/bpf_qdisc_fq.c
@@ -0,0 +1,660 @@ 
+#include <vmlinux.h>
+#include <bpf/bpf_helpers.h>
+#include "bpf_experimental.h"
+#include "bpf_qdisc_common.h"
+
+char _license[] SEC("license") = "GPL";
+
+#define NSEC_PER_USEC 1000L
+#define NSEC_PER_SEC 1000000000L
+#define PSCHED_MTU (64 * 1024 + 14)
+
+#define NUM_QUEUE_LOG 10
+#define NUM_QUEUE (1 << NUM_QUEUE_LOG)
+#define PRIO_QUEUE (NUM_QUEUE + 1)
+#define COMP_DROP_PKT_DELAY 1
+#define THROTTLED 0xffffffffffffffff
+
+/* fq configuration */
+__u64 q_flow_refill_delay = 40 * 10000; //40us
+__u64 q_horizon = 10ULL * NSEC_PER_SEC;
+__u32 q_initial_quantum = 10 * PSCHED_MTU;
+__u32 q_quantum = 2 * PSCHED_MTU;
+__u32 q_orphan_mask = 1023;
+__u32 q_flow_plimit = 100;
+__u32 q_plimit = 10000;
+__u32 q_timer_slack = 10 * NSEC_PER_USEC;
+bool q_horizon_drop = true;
+
+bool q_compensate_tstamp;
+bool q_random_drop;
+
+unsigned long time_next_delayed_flow = ~0ULL;
+unsigned long unthrottle_latency_ns = 0ULL;
+unsigned long ktime_cache = 0;
+unsigned long dequeue_now;
+unsigned int fq_qlen = 0;
+
+struct fq_flow_node {
+	u32 hash;
+	int credit;
+	u32 qlen;
+	u32 socket_hash;
+	u64 age;
+	u64 time_next_packet;
+	struct bpf_list_node list_node;
+	struct bpf_rb_node rb_node;
+	struct bpf_rb_root queue __contains_kptr(sk_buff, bpf_rbnode);
+	struct bpf_spin_lock lock;
+	struct bpf_refcount refcount;
+};
+
+struct dequeue_nonprio_ctx {
+	bool dequeued;
+	u64 expire;
+};
+
+struct fq_stashed_flow {
+	struct fq_flow_node __kptr *flow;
+};
+
+struct stashed_skb {
+	struct sk_buff __kptr *skb;
+};
+
+/* [NUM_QUEUE] for TC_PRIO_CONTROL
+ * [0, NUM_QUEUE - 1] for other flows
+ */
+struct {
+	__uint(type, BPF_MAP_TYPE_ARRAY);
+	__type(key, __u32);
+	__type(value, struct fq_stashed_flow);
+	__uint(max_entries, NUM_QUEUE + 1);
+} fq_stashed_flows SEC(".maps");
+
+struct {
+	__uint(type, BPF_MAP_TYPE_HASH);
+	__type(key, __u32);
+	__type(value, __u64);
+	__uint(pinning, LIBBPF_PIN_BY_NAME);
+	__uint(max_entries, 16);
+} rate_map SEC(".maps");
+
+struct {
+	__uint(type, BPF_MAP_TYPE_HASH);
+	__type(key, __u32);
+	__type(value, __u64);
+	__uint(pinning, LIBBPF_PIN_BY_NAME);
+	__uint(max_entries, 16);
+} comp_map SEC(".maps");
+
+#define private(name) SEC(".data." #name) __hidden __attribute__((aligned(8)))
+
+private(A) struct bpf_spin_lock fq_delayed_lock;
+private(A) struct bpf_rb_root fq_delayed __contains(fq_flow_node, rb_node);
+
+private(B) struct bpf_spin_lock fq_new_flows_lock;
+private(B) struct bpf_list_head fq_new_flows __contains(fq_flow_node, list_node);
+
+private(C) struct bpf_spin_lock fq_old_flows_lock;
+private(C) struct bpf_list_head fq_old_flows __contains(fq_flow_node, list_node);
+
+private(D) struct bpf_spin_lock fq_stashed_skb_lock;
+private(D) struct bpf_list_head fq_stashed_skb __contains_kptr(sk_buff, bpf_list);
+
+static __always_inline bool bpf_kptr_xchg_back(void *map_val, void *ptr)
+{
+	void *ret;
+
+	ret = bpf_kptr_xchg(map_val, ptr);
+	if (ret) { //unexpected
+		bpf_obj_drop(ret);
+		return false;
+	}
+	return true;
+}
+
+static __always_inline struct qdisc_skb_cb *qdisc_skb_cb(const struct sk_buff *skb)
+{
+	return (struct qdisc_skb_cb *)skb->cb;
+}
+
+static __always_inline int hash64(u64 val, int bits)
+{
+	return val * 0x61C8864680B583EBull >> (64 - bits);
+}
+
+static bool skb_tstamp_less(struct bpf_rb_node *a, const struct bpf_rb_node *b)
+{
+	struct sk_buff *skb_a;
+	struct sk_buff *skb_b;
+
+	skb_a = container_of(a, struct sk_buff, bpf_rbnode);
+	skb_b = container_of(b, struct sk_buff, bpf_rbnode);
+
+	return skb_a->tstamp < skb_b->tstamp;
+}
+
+static bool fn_time_next_packet_less(struct bpf_rb_node *a, const struct bpf_rb_node *b)
+{
+	struct fq_flow_node *flow_a;
+	struct fq_flow_node *flow_b;
+
+	flow_a = container_of(a, struct fq_flow_node, rb_node);
+	flow_b = container_of(b, struct fq_flow_node, rb_node);
+
+	return flow_a->time_next_packet < flow_b->time_next_packet;
+}
+
+static __always_inline void
+fq_flows_add_head(struct bpf_list_head *head, struct bpf_spin_lock *lock,
+		  struct fq_flow_node *flow)
+{
+	bpf_spin_lock(lock);
+	bpf_list_push_front(head, &flow->list_node);
+	bpf_spin_unlock(lock);
+}
+
+static __always_inline void
+fq_flows_add_tail(struct bpf_list_head *head, struct bpf_spin_lock *lock,
+		  struct fq_flow_node *flow)
+{
+	bpf_spin_lock(lock);
+	bpf_list_push_back(head, &flow->list_node);
+	bpf_spin_unlock(lock);
+}
+
+static __always_inline bool
+fq_flows_is_empty(struct bpf_list_head *head, struct bpf_spin_lock *lock)
+{
+	struct bpf_list_node *node;
+
+	bpf_spin_lock(lock);
+	node = bpf_list_pop_front(head);
+	if (node) {
+		bpf_list_push_front(head, node);
+		bpf_spin_unlock(lock);
+		return false;
+	}
+	bpf_spin_unlock(lock);
+
+	return true;
+}
+
+static __always_inline void fq_flow_set_detached(struct fq_flow_node *flow)
+{
+	flow->age = bpf_jiffies64();
+	bpf_obj_drop(flow);
+}
+
+static __always_inline bool fq_flow_is_detached(struct fq_flow_node *flow)
+{
+	return flow->age != 0 && flow->age != THROTTLED;
+}
+
+static __always_inline bool fq_flow_is_throttled(struct fq_flow_node *flow)
+{
+	return flow->age != THROTTLED;
+}
+
+static __always_inline bool sk_listener(struct sock *sk)
+{
+	return (1 << sk->__sk_common.skc_state) & (TCPF_LISTEN | TCPF_NEW_SYN_RECV);
+}
+
+static __always_inline int
+fq_classify(struct sk_buff *skb, u32 *hash, struct fq_stashed_flow **sflow,
+	    bool *connected, u32 *sk_hash)
+{
+	struct fq_flow_node *flow;
+	struct sock *sk = skb->sk;
+
+	*connected = false;
+
+	if ((skb->priority & TC_PRIO_MAX) == TC_PRIO_CONTROL) {
+		*hash = PRIO_QUEUE;
+	} else {
+		if (!sk || sk_listener(sk)) {
+			*sk_hash = bpf_skb_get_hash(skb) & q_orphan_mask;
+			*sk_hash = (*sk_hash << 1 | 1);
+		} else if (sk->__sk_common.skc_state == TCP_CLOSE) {
+			*sk_hash = bpf_skb_get_hash(skb) & q_orphan_mask;
+			*sk_hash = (*sk_hash << 1 | 1);
+		} else {
+			*sk_hash = sk->__sk_common.skc_hash;
+			*connected = true;
+		}
+		*hash = hash64(*sk_hash, NUM_QUEUE_LOG);
+	}
+
+	*sflow = bpf_map_lookup_elem(&fq_stashed_flows, hash);
+	if (!*sflow)
+		return -1; //unexpected
+
+	if ((*sflow)->flow)
+		return 0;
+
+	flow = bpf_obj_new(typeof(*flow));
+	if (!flow)
+		return -1;
+
+	flow->hash = *hash;
+	flow->credit = q_initial_quantum;
+	flow->qlen = 0;
+	flow->age = 1UL;
+	flow->time_next_packet = 0;
+
+	bpf_kptr_xchg_back(&(*sflow)->flow, flow);
+
+	return 0;
+}
+
+static __always_inline bool fq_packet_beyond_horizon(struct sk_buff *skb)
+{
+	return (s64)skb->tstamp > (s64)(ktime_cache + q_horizon);
+}
+
+SEC("struct_ops/bpf_fq_enqueue")
+int BPF_PROG(bpf_fq_enqueue, struct sk_buff *skb, struct Qdisc *sch,
+	     struct bpf_sk_buff_ptr *to_free)
+{
+	struct iphdr *iph = (void *)(long)skb->data + sizeof(struct ethhdr);
+	u64 time_to_send, jiffies, delay_ns, *comp_ns, *rate;
+	struct fq_flow_node *flow = NULL, *flow_copy;
+	struct fq_stashed_flow *sflow;
+	u32 hash, daddr, sk_hash;
+	bool connected;
+
+	if (q_random_drop & (bpf_get_prandom_u32() > ~0U * 0.90))
+		goto drop;
+
+	if (fq_qlen >= q_plimit)
+		goto drop;
+
+	if (!skb->tstamp) {
+		time_to_send = ktime_cache = bpf_ktime_get_ns();
+	} else {
+		if (fq_packet_beyond_horizon(skb)) {
+			ktime_cache = bpf_ktime_get_ns();
+			if (fq_packet_beyond_horizon(skb)) {
+				if (q_horizon_drop)
+					goto drop;
+
+				skb->tstamp = ktime_cache + q_horizon;
+			}
+		}
+		time_to_send = skb->tstamp;
+	}
+
+	if (fq_classify(skb, &hash, &sflow, &connected, &sk_hash) < 0)
+		goto drop;
+
+	flow = bpf_kptr_xchg(&sflow->flow, flow);
+	if (!flow)
+		goto drop; //unexpected
+
+	if (hash != PRIO_QUEUE) {
+		if (connected && flow->socket_hash != sk_hash) {
+			flow->credit = q_initial_quantum;
+			flow->socket_hash = sk_hash;
+			if (fq_flow_is_throttled(flow)) {
+				/* mark the flow as undetached. The reference to the
+				 * throttled flow in fq_delayed will be removed later.
+				 */
+				flow_copy = bpf_refcount_acquire(flow);
+				flow_copy->age = 0;
+				fq_flows_add_tail(&fq_old_flows, &fq_old_flows_lock, flow_copy);
+			}
+			flow->time_next_packet = 0ULL;
+		}
+
+		if (flow->qlen >= q_flow_plimit) {
+			bpf_kptr_xchg_back(&sflow->flow, flow);
+			goto drop;
+		}
+
+		if (fq_flow_is_detached(flow)) {
+			if (connected)
+				flow->socket_hash = sk_hash;
+
+			flow_copy = bpf_refcount_acquire(flow);
+
+			jiffies = bpf_jiffies64();
+			if ((s64)(jiffies - (flow_copy->age + q_flow_refill_delay)) > 0) {
+				if (flow_copy->credit < q_quantum)
+					flow_copy->credit = q_quantum;
+			}
+			flow_copy->age = 0;
+			fq_flows_add_tail(&fq_new_flows, &fq_new_flows_lock, flow_copy);
+		}
+	}
+
+	skb->tstamp = time_to_send;
+
+	bpf_spin_lock(&flow->lock);
+	bpf_rbtree_excl_add(&flow->queue, &skb->bpf_rbnode, skb_tstamp_less);
+	bpf_spin_unlock(&flow->lock);
+
+	flow->qlen++;
+	bpf_kptr_xchg_back(&sflow->flow, flow);
+
+	fq_qlen++;
+	return NET_XMIT_SUCCESS;
+
+drop:
+	if (q_compensate_tstamp) {
+		bpf_probe_read_kernel(&daddr, sizeof(daddr), &iph->daddr);
+		rate = bpf_map_lookup_elem(&rate_map, &daddr);
+		comp_ns = bpf_map_lookup_elem(&comp_map, &daddr);
+		if (rate && comp_ns) {
+			delay_ns = (u64)qdisc_skb_cb(skb)->pkt_len * NSEC_PER_SEC / (*rate);
+			__sync_fetch_and_add(comp_ns, delay_ns);
+		}
+	}
+	bpf_qdisc_skb_drop(skb, to_free);
+	return NET_XMIT_DROP;
+}
+
+static int fq_unset_throttled_flows(u32 index, bool *unset_all)
+{
+	struct bpf_rb_node *node = NULL;
+	struct fq_flow_node *flow;
+
+	bpf_spin_lock(&fq_delayed_lock);
+
+	node = bpf_rbtree_first(&fq_delayed);
+	if (!node) {
+		bpf_spin_unlock(&fq_delayed_lock);
+		return 1;
+	}
+
+	flow = container_of(node, struct fq_flow_node, rb_node);
+	if (!*unset_all && flow->time_next_packet > dequeue_now) {
+		time_next_delayed_flow = flow->time_next_packet;
+		bpf_spin_unlock(&fq_delayed_lock);
+		return 1;
+	}
+
+	node = bpf_rbtree_remove(&fq_delayed, &flow->rb_node);
+
+	bpf_spin_unlock(&fq_delayed_lock);
+
+	if (!node)
+		return 1; //unexpected
+
+	flow = container_of(node, struct fq_flow_node, rb_node);
+
+	/* the flow was recycled during enqueue() */
+	if (flow->age != THROTTLED) {
+		bpf_obj_drop(flow);
+		return 0;
+	}
+
+	flow->age = 0;
+	fq_flows_add_tail(&fq_old_flows, &fq_old_flows_lock, flow);
+
+	return 0;
+}
+
+static __always_inline void fq_flow_set_throttled(struct fq_flow_node *flow)
+{
+	flow->age = THROTTLED;
+
+	if (time_next_delayed_flow > flow->time_next_packet)
+		time_next_delayed_flow = flow->time_next_packet;
+
+	bpf_spin_lock(&fq_delayed_lock);
+	bpf_rbtree_add(&fq_delayed, &flow->rb_node, fn_time_next_packet_less);
+	bpf_spin_unlock(&fq_delayed_lock);
+}
+
+static __always_inline void fq_check_throttled(void)
+{
+	bool unset_all = false;
+	unsigned long sample;
+
+	if (time_next_delayed_flow > dequeue_now)
+		return;
+
+	sample = (unsigned long)(dequeue_now - time_next_delayed_flow);
+	unthrottle_latency_ns -= unthrottle_latency_ns >> 3;
+	unthrottle_latency_ns += sample >> 3;
+
+	time_next_delayed_flow = ~0ULL;
+	bpf_loop(NUM_QUEUE, fq_unset_throttled_flows, &unset_all, 0);
+}
+
+static __always_inline void stash_skb(struct sk_buff *skb)
+{
+	bpf_spin_lock(&fq_stashed_skb_lock);
+	bpf_list_excl_push_back(&fq_stashed_skb, &skb->bpf_list);
+	bpf_spin_unlock(&fq_stashed_skb_lock);
+}
+
+static __always_inline struct sk_buff *get_stashed_skb()
+{
+	struct bpf_list_excl_node *node;
+	struct sk_buff *skb;
+
+	bpf_spin_lock(&fq_stashed_skb_lock);
+	node = bpf_list_excl_pop_front(&fq_stashed_skb);
+	bpf_spin_unlock(&fq_stashed_skb_lock);
+	if (!node)
+		return NULL;
+
+	skb = container_of(node, struct sk_buff, bpf_list);
+	return skb;
+}
+
+static int
+fq_dequeue_nonprio_flows(u32 index, struct dequeue_nonprio_ctx *ctx)
+{
+	u64 time_next_packet, time_to_send;
+	struct bpf_rb_excl_node *rb_node;
+	struct sk_buff *skb = NULL;
+	struct bpf_list_head *head;
+	struct bpf_list_node *node;
+	struct bpf_spin_lock *lock;
+	struct fq_flow_node *flow;
+	bool is_empty;
+
+	head = &fq_new_flows;
+	lock = &fq_new_flows_lock;
+	bpf_spin_lock(&fq_new_flows_lock);
+	node = bpf_list_pop_front(&fq_new_flows);
+	bpf_spin_unlock(&fq_new_flows_lock);
+	if (!node) {
+		head = &fq_old_flows;
+		lock = &fq_old_flows_lock;
+		bpf_spin_lock(&fq_old_flows_lock);
+		node = bpf_list_pop_front(&fq_old_flows);
+		bpf_spin_unlock(&fq_old_flows_lock);
+		if (!node) {
+			if (time_next_delayed_flow != ~0ULL)
+				ctx->expire = time_next_delayed_flow;
+			return 1;
+		}
+	}
+
+	flow = container_of(node, struct fq_flow_node, list_node);
+	if (flow->credit <= 0) {
+		flow->credit += q_quantum;
+		fq_flows_add_tail(&fq_old_flows, &fq_old_flows_lock, flow);
+		return 0;
+	}
+
+	bpf_spin_lock(&flow->lock);
+	rb_node = bpf_rbtree_excl_first(&flow->queue);
+	if (!rb_node) {
+		bpf_spin_unlock(&flow->lock);
+		is_empty = fq_flows_is_empty(&fq_old_flows, &fq_old_flows_lock);
+		if (head == &fq_new_flows && !is_empty)
+			fq_flows_add_tail(&fq_old_flows, &fq_old_flows_lock, flow);
+		else
+			fq_flow_set_detached(flow);
+
+		return 0;
+	}
+
+	skb = container_of(rb_node, struct sk_buff, bpf_rbnode);
+	time_to_send = skb->tstamp;
+
+	time_next_packet = (time_to_send > flow->time_next_packet) ?
+		time_to_send : flow->time_next_packet;
+	if (dequeue_now < time_next_packet) {
+		bpf_spin_unlock(&flow->lock);
+		flow->time_next_packet = time_next_packet;
+		fq_flow_set_throttled(flow);
+		return 0;
+	}
+
+	rb_node = bpf_rbtree_excl_remove(&flow->queue, rb_node);
+	bpf_spin_unlock(&flow->lock);
+
+	if (!rb_node) {
+		fq_flows_add_tail(head, lock, flow);
+		return 0; //unexpected
+	}
+
+	skb = container_of(rb_node, struct sk_buff, bpf_rbnode);
+
+	flow->credit -= qdisc_skb_cb(skb)->pkt_len;
+	flow->qlen--;
+	fq_qlen--;
+
+	ctx->dequeued = true;
+	stash_skb(skb);
+
+	fq_flows_add_head(head, lock, flow);
+
+	return 1;
+}
+
+static __always_inline struct sk_buff *fq_dequeue_prio(void)
+{
+	struct fq_flow_node *flow = NULL;
+	struct fq_stashed_flow *sflow;
+	struct sk_buff *skb = NULL;
+	struct bpf_rb_excl_node *node;
+	u32 hash = NUM_QUEUE;
+
+	sflow = bpf_map_lookup_elem(&fq_stashed_flows, &hash);
+	if (!sflow)
+		return NULL; //unexpected
+
+	flow = bpf_kptr_xchg(&sflow->flow, flow);
+	if (!flow)
+		return NULL;
+
+	bpf_spin_lock(&flow->lock);
+	node = bpf_rbtree_excl_first(&flow->queue);
+	if (!node) {
+		bpf_spin_unlock(&flow->lock);
+		goto xchg_flow_back;
+	}
+
+	skb = container_of(node, struct sk_buff, bpf_rbnode);
+	node = bpf_rbtree_excl_remove(&flow->queue, &skb->bpf_rbnode);
+	bpf_spin_unlock(&flow->lock);
+
+	if (!node) {
+		skb = NULL;
+		goto xchg_flow_back;
+	}
+
+	skb = container_of(node, struct sk_buff, bpf_rbnode);
+	fq_qlen--;
+
+xchg_flow_back:
+	bpf_kptr_xchg_back(&sflow->flow, flow);
+
+	return skb;
+}
+
+SEC("struct_ops/bpf_fq_dequeue")
+struct sk_buff *BPF_PROG(bpf_fq_dequeue, struct Qdisc *sch)
+{
+	struct dequeue_nonprio_ctx cb_ctx = {};
+	struct sk_buff *skb = NULL;
+
+	skb = fq_dequeue_prio();
+	if (skb) {
+		bpf_skb_set_dev(skb, sch);
+		return skb;
+	}
+
+	ktime_cache = dequeue_now = bpf_ktime_get_ns();
+	fq_check_throttled();
+	bpf_loop(q_plimit, fq_dequeue_nonprio_flows, &cb_ctx, 0);
+
+	skb = get_stashed_skb();
+
+	if (skb) {
+		bpf_skb_set_dev(skb, sch);
+		return skb;
+	}
+
+	if (cb_ctx.expire)
+		bpf_qdisc_watchdog_schedule(sch, cb_ctx.expire, q_timer_slack);
+
+	return NULL;
+}
+
+static int
+fq_reset_flows(u32 index, void *ctx)
+{
+	struct bpf_list_node *node;
+	struct fq_flow_node *flow;
+
+	bpf_spin_lock(&fq_new_flows_lock);
+	node = bpf_list_pop_front(&fq_new_flows);
+	bpf_spin_unlock(&fq_new_flows_lock);
+	if (!node) {
+		bpf_spin_lock(&fq_old_flows_lock);
+		node = bpf_list_pop_front(&fq_old_flows);
+		bpf_spin_unlock(&fq_old_flows_lock);
+		if (!node)
+			return 1;
+	}
+
+	flow = container_of(node, struct fq_flow_node, list_node);
+	bpf_obj_drop(flow);
+
+	return 0;
+}
+
+static int
+fq_reset_stashed_flows(u32 index, void *ctx)
+{
+	struct fq_flow_node *flow = NULL;
+	struct fq_stashed_flow *sflow;
+
+	sflow = bpf_map_lookup_elem(&fq_stashed_flows, &index);
+	if (!sflow)
+		return 0;
+
+	flow = bpf_kptr_xchg(&sflow->flow, flow);
+	if (flow)
+		bpf_obj_drop(flow);
+
+	return 0;
+}
+
+SEC("struct_ops/bpf_fq_reset")
+void BPF_PROG(bpf_fq_reset, struct Qdisc *sch)
+{
+	bool unset_all = true;
+	fq_qlen = 0;
+	bpf_loop(NUM_QUEUE + 1, fq_reset_stashed_flows, NULL, 0);
+	bpf_loop(NUM_QUEUE, fq_reset_flows, NULL, 0);
+	bpf_loop(NUM_QUEUE, fq_unset_throttled_flows, &unset_all, 0);
+	return;
+}
+
+SEC(".struct_ops")
+struct Qdisc_ops fq = {
+	.enqueue   = (void *)bpf_fq_enqueue,
+	.dequeue   = (void *)bpf_fq_dequeue,
+	.reset     = (void *)bpf_fq_reset,
+	.id        = "bpf_fq",
+};