Message ID | 20220902234317.2518808-4-void@manifault.com (mailing list archive) |
---|---|
State | Changes Requested |
Delegated to: | BPF |
Headers | show |
Series | bpf: Add user-space-publisher ring buffer map type | expand |
Context | Check | Description |
---|---|---|
netdev/tree_selection | success | Not a local patch, async |
bpf/vmtest-bpf-next-PR | success | PR summary |
bpf/vmtest-bpf-next-VM_Test-9 | success | Logs for test_progs on s390x with gcc |
bpf/vmtest-bpf-next-VM_Test-12 | success | Logs for test_progs_no_alu32 on s390x with gcc |
bpf/vmtest-bpf-next-VM_Test-15 | success | Logs for test_verifier on s390x with gcc |
bpf/vmtest-bpf-next-VM_Test-6 | success | Logs for test_maps on s390x with gcc |
bpf/vmtest-bpf-next-VM_Test-2 | success | Logs for build for x86_64 with gcc |
bpf/vmtest-bpf-next-VM_Test-3 | success | Logs for build for x86_64 with llvm-16 |
bpf/vmtest-bpf-next-VM_Test-4 | success | Logs for llvm-toolchain |
bpf/vmtest-bpf-next-VM_Test-5 | success | Logs for set-matrix |
bpf/vmtest-bpf-next-VM_Test-1 | success | Logs for build for s390x with gcc |
bpf/vmtest-bpf-next-VM_Test-16 | success | Logs for test_verifier on x86_64 with gcc |
bpf/vmtest-bpf-next-VM_Test-17 | success | Logs for test_verifier on x86_64 with llvm-16 |
bpf/vmtest-bpf-next-VM_Test-10 | success | Logs for test_progs on x86_64 with gcc |
bpf/vmtest-bpf-next-VM_Test-11 | success | Logs for test_progs on x86_64 with llvm-16 |
bpf/vmtest-bpf-next-VM_Test-13 | success | Logs for test_progs_no_alu32 on x86_64 with gcc |
bpf/vmtest-bpf-next-VM_Test-14 | success | Logs for test_progs_no_alu32 on x86_64 with llvm-16 |
bpf/vmtest-bpf-next-VM_Test-7 | success | Logs for test_maps on x86_64 with gcc |
bpf/vmtest-bpf-next-VM_Test-8 | success | Logs for test_maps on x86_64 with llvm-16 |
bpf/vmtest-bpf-PR | fail | merge-conflict |
On Fri, Sep 2, 2022 at 4:43 PM David Vernet <void@manifault.com> wrote: > > Now that all of the logic is in place in the kernel to support user-space > produced ring buffers, we can add the user-space logic to libbpf. This > patch therefore adds the following public symbols to libbpf: > > struct user_ring_buffer * > user_ring_buffer__new(int map_fd, > const struct user_ring_buffer_opts *opts); > void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size); > void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, > __u32 size, int timeout_ms); > void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample); > void user_ring_buffer__discard(struct user_ring_buffer *rb, > void user_ring_buffer__free(struct user_ring_buffer *rb); > > A user-space producer must first create a struct user_ring_buffer * object > with user_ring_buffer__new(), and can then reserve samples in the > ring buffer using one of the following two symbols: > > void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size); > void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, > __u32 size, int timeout_ms); > > With user_ring_buffer__reserve(), a pointer to a 'size' region of the ring > buffer will be returned if sufficient space is available in the buffer. > user_ring_buffer__reserve_blocking() provides similar semantics, but will > block for up to 'timeout_ms' in epoll_wait if there is insufficient space > in the buffer. This function has the guarantee from the kernel that it will > receive at least one event-notification per invocation to > bpf_ringbuf_drain(), provided that at least one sample is drained, and the > BPF program did not pass the BPF_RB_NO_WAKEUP flag to bpf_ringbuf_drain(). > > Once a sample is reserved, it must either be committed to the ring buffer > with user_ring_buffer__submit(), or discarded with > user_ring_buffer__discard(). > > Signed-off-by: David Vernet <void@manifault.com> > --- > tools/lib/bpf/libbpf.c | 10 +- > tools/lib/bpf/libbpf.h | 105 +++++++++++++ > tools/lib/bpf/libbpf.map | 10 ++ > tools/lib/bpf/libbpf_probes.c | 1 + > tools/lib/bpf/libbpf_version.h | 2 +- > tools/lib/bpf/ringbuf.c | 270 +++++++++++++++++++++++++++++++++ > 6 files changed, 395 insertions(+), 3 deletions(-) > Thanks for adding nice doc comments! See below for few minor nits, but ns_elapsed issue I think is pretty bad and should be fixed. > @@ -1011,6 +1011,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook, > > /* Ring buffer APIs */ > struct ring_buffer; > +struct user_ring_buffer; > > typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size); > > @@ -1030,6 +1031,110 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms); > LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb); > LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb); > > +struct user_ring_buffer_opts { > + size_t sz; /* size of this struct, for forward/backward compatibility */ > +}; > + > +#define user_ring_buffer_opts__last_field sz > + > +/* @brief **user_ring_buffer__new()** creates a new instance of a user ring > + * buffer. > + * > + * @param map_fd A file descriptor to a BPF_MAP_TYPE_RINGBUF map. typo: USER_RINGBUF > + * @param opts Options for how the ring buffer should be created. > + * @return A user ring buffer on success; NULL and errno being set on a > + * failure. > + */ > +LIBBPF_API struct user_ring_buffer * > +user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts); > + > +/* @brief **user_ring_buffer__reserve()** reserves a pointer to a sample in the > + * user ring buffer. > + * @param rb A pointer to a user ring buffer. > + * @param size The size of the sample, in bytes. > + * @return A pointer to a reserved region of the user ring buffer; NULL, and > + * errno being set if a sample could not be reserved. > + * > + * This function is *not* thread safe, and callers must synchronize accessing > + * this function if there are multiple producers. If a size is requested that > + * is larger than the size of the entire ring buffer, errno will be set to > + * E2BIG and NULL is returned. If the ring buffer could accommodate the size, > + * but currently does not have enough space, errno is set to ENOSPC and NULL is > + * returned. we might want to mention that returned pointer is 8-byte aligned > + * > + * After initializing the sample, callers must invoke > + * **user_ring_buffer__submit()** to post the sample to the kernel. Otherwise, > + * the sample must be freed with **user_ring_buffer__discard()**. > + */ > +LIBBPF_API void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size); > + [...] > + err = user_ringbuf_map(rb, map_fd); > + if (err) > + goto err_out; > + > + return rb; > + > +err_out: > + user_ring_buffer__free(rb); > + return errno = -err, NULL; > +} > + > +static void user_ringbuf__commit(struct user_ring_buffer *rb, void *sample, bool discard) small nit if you are going to resubmit: we stopped using double underscore naming for internal static functions, so this should be called user_ringbuf_commit > +{ > + __u32 new_len; > + struct ringbuf_hdr *hdr; > + uintptr_t hdr_offset; > + [...] > +void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms) > +{ > + int err; > + struct timespec start; > + __u64 ns_per_ms = 1000000, ns_elapsed = 0, timeout_ns; > + > + if (timeout_ms < 0 && timeout_ms != -1) > + return errno = EINVAL, NULL; > + > + if (timeout_ms != -1) { > + err = clock_gettime(CLOCK_MONOTONIC, &start); > + if (err) > + return NULL; > + } > + > + timeout_ns = timeout_ms * ns_per_ms; > + do { > + __u64 ns_remaining = timeout_ns - ns_elapsed; > + int cnt, ms_remaining; > + void *sample; > + struct timespec curr; > + > + sample = user_ring_buffer__reserve(rb, size); > + if (sample) > + return sample; > + else if (errno != ENOSPC) > + return NULL; > + > + ms_remaining = timeout_ms == -1 ? -1 : ns_remaining / ns_per_ms; ok, so you've special-cased timeout_ms == -1 but still didn't do max(0, ns_remaining). Can you prove that ns_elapsed will never be bigger than timeout_ns due to various delays in waking up this thread? If not, let's please have max(0) otherwise we can accidentally epoll_wait(-1). > + /* The kernel guarantees at least one event notification > + * delivery whenever at least one sample is drained from the > + * ring buffer in an invocation to bpf_ringbuf_drain(). Other > + * additional events may be delivered at any time, but only one > + * event is guaranteed per bpf_ringbuf_drain() invocation, > + * provided that a sample is drained, and the BPF program did > + * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain(). > + */ > + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining); > + if (cnt < 0) > + return NULL; > + > + if (timeout_ms == -1) > + continue; > + > + err = clock_gettime(CLOCK_MONOTONIC, &curr); > + if (err) > + return NULL; > + > + ns_elapsed = ns_elapsed_timespec(&start, &curr); nit: if you move re-calculation of ms_remaining and ns_remaining to here, I think overall loop logic will be even more straightforwad. You can initialize ms_remaining to -1 if timeout_ms < 0 and never recalculate it, right? Note that you can also do ns_elapsed conversion to ms right here and then keep everything else in ms (so no need for timeout_ns, ns_remaining, etc). > + } while (ns_elapsed <= timeout_ns); > + > + errno = ENOSPC; > + return NULL; > +} > -- > 2.37.1 >
On Fri, Sep 09, 2022 at 04:59:27PM -0700, Andrii Nakryiko wrote: > > @@ -1011,6 +1011,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook, > > > > /* Ring buffer APIs */ > > struct ring_buffer; > > +struct user_ring_buffer; > > > > typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size); > > > > @@ -1030,6 +1031,110 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms); > > LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb); > > LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb); > > > > +struct user_ring_buffer_opts { > > + size_t sz; /* size of this struct, for forward/backward compatibility */ > > +}; > > + > > +#define user_ring_buffer_opts__last_field sz > > + > > +/* @brief **user_ring_buffer__new()** creates a new instance of a user ring > > + * buffer. > > + * > > + * @param map_fd A file descriptor to a BPF_MAP_TYPE_RINGBUF map. > > typo: USER_RINGBUF Good catch. > > + * @param opts Options for how the ring buffer should be created. > > + * @return A user ring buffer on success; NULL and errno being set on a > > + * failure. > > + */ > > +LIBBPF_API struct user_ring_buffer * > > +user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts); > > + > > +/* @brief **user_ring_buffer__reserve()** reserves a pointer to a sample in the > > + * user ring buffer. > > + * @param rb A pointer to a user ring buffer. > > + * @param size The size of the sample, in bytes. > > + * @return A pointer to a reserved region of the user ring buffer; NULL, and > > + * errno being set if a sample could not be reserved. > > + * > > + * This function is *not* thread safe, and callers must synchronize accessing > > + * this function if there are multiple producers. If a size is requested that > > + * is larger than the size of the entire ring buffer, errno will be set to > > + * E2BIG and NULL is returned. If the ring buffer could accommodate the size, > > + * but currently does not have enough space, errno is set to ENOSPC and NULL is > > + * returned. > > we might want to mention that returned pointer is 8-byte aligned Will do. > > + * > > + * After initializing the sample, callers must invoke > > + * **user_ring_buffer__submit()** to post the sample to the kernel. Otherwise, > > + * the sample must be freed with **user_ring_buffer__discard()**. > > + */ > > +LIBBPF_API void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size); > > + > > [...] > > > + err = user_ringbuf_map(rb, map_fd); > > + if (err) > > + goto err_out; > > + > > + return rb; > > + > > +err_out: > > + user_ring_buffer__free(rb); > > + return errno = -err, NULL; > > +} > > + > > +static void user_ringbuf__commit(struct user_ring_buffer *rb, void *sample, bool discard) > > small nit if you are going to resubmit: we stopped using double > underscore naming for internal static functions, so this should be > called user_ringbuf_commit Will do. > > +{ > > + __u32 new_len; > > + struct ringbuf_hdr *hdr; > > + uintptr_t hdr_offset; > > + > > [...] > > > +void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms) > > +{ > > + int err; > > + struct timespec start; > > + __u64 ns_per_ms = 1000000, ns_elapsed = 0, timeout_ns; > > + > > + if (timeout_ms < 0 && timeout_ms != -1) > > + return errno = EINVAL, NULL; > > + > > + if (timeout_ms != -1) { > > + err = clock_gettime(CLOCK_MONOTONIC, &start); > > + if (err) > > + return NULL; > > + } > > + > > + timeout_ns = timeout_ms * ns_per_ms; > > + do { > > + __u64 ns_remaining = timeout_ns - ns_elapsed; > > + int cnt, ms_remaining; > > + void *sample; > > + struct timespec curr; > > + > > + sample = user_ring_buffer__reserve(rb, size); > > + if (sample) > > + return sample; > > + else if (errno != ENOSPC) > > + return NULL; > > + > > + ms_remaining = timeout_ms == -1 ? -1 : ns_remaining / ns_per_ms; > > ok, so you've special-cased timeout_ms == -1 but still didn't do > max(0, ns_remaining). Can you prove that ns_elapsed will never be > bigger than timeout_ns due to various delays in waking up this thread? > If not, let's please have max(0) otherwise we can accidentally > epoll_wait(-1). Yes you're right, this was an oversight. Thanks for catching this! > > + /* The kernel guarantees at least one event notification > > + * delivery whenever at least one sample is drained from the > > + * ring buffer in an invocation to bpf_ringbuf_drain(). Other > > + * additional events may be delivered at any time, but only one > > + * event is guaranteed per bpf_ringbuf_drain() invocation, > > + * provided that a sample is drained, and the BPF program did > > + * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain(). > > + */ > > + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining); > > + if (cnt < 0) > > + return NULL; > > + > > + if (timeout_ms == -1) > > + continue; > > + > > + err = clock_gettime(CLOCK_MONOTONIC, &curr); > > + if (err) > > + return NULL; > > + > > + ns_elapsed = ns_elapsed_timespec(&start, &curr); > > nit: if you move re-calculation of ms_remaining and ns_remaining to > here, I think overall loop logic will be even more straightforwad. You > can initialize ms_remaining to -1 if timeout_ms < 0 and never > recalculate it, right? Note that you can also do ns_elapsed conversion > to ms right here and then keep everything else in ms (so no need for > timeout_ns, ns_remaining, etc). Sounds good, let me give this a shot in v6. Thanks for another detailed review!
On Mon, Sep 19, 2022 at 03:22:09PM -0500, David Vernet wrote: [...] > > > + timeout_ns = timeout_ms * ns_per_ms; > > > + do { > > > + __u64 ns_remaining = timeout_ns - ns_elapsed; > > > + int cnt, ms_remaining; > > > + void *sample; > > > + struct timespec curr; > > > + > > > + sample = user_ring_buffer__reserve(rb, size); > > > + if (sample) > > > + return sample; > > > + else if (errno != ENOSPC) > > > + return NULL; > > > + > > > + ms_remaining = timeout_ms == -1 ? -1 : ns_remaining / ns_per_ms; > > > > ok, so you've special-cased timeout_ms == -1 but still didn't do > > max(0, ns_remaining). Can you prove that ns_elapsed will never be > > bigger than timeout_ns due to various delays in waking up this thread? > > If not, let's please have max(0) otherwise we can accidentally > > epoll_wait(-1). > > Yes you're right, this was an oversight. Thanks for catching this! Wait, actually, this can't happen because of the while check at the end of the loop: while (ns_elapsed <= timeout_ns) So I don't think the max is necessary to prevent underflowing, but I do think we need to have one more attempt to invoke user_ring_buffer__reserve() at the end of the function to account for wakeup delays after epoll_wait() returns. Otherwise, we might think we've timed out when there's actually a sample left in the buffer. I also still think your suggestion below for cleaning up makes sense, so I'll still add it in v6, but I think I can leave off the max() call. > > > > + /* The kernel guarantees at least one event notification > > > + * delivery whenever at least one sample is drained from the > > > + * ring buffer in an invocation to bpf_ringbuf_drain(). Other > > > + * additional events may be delivered at any time, but only one > > > + * event is guaranteed per bpf_ringbuf_drain() invocation, > > > + * provided that a sample is drained, and the BPF program did > > > + * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain(). > > > + */ > > > + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining); > > > + if (cnt < 0) > > > + return NULL; > > > + > > > + if (timeout_ms == -1) > > > + continue; > > > + > > > + err = clock_gettime(CLOCK_MONOTONIC, &curr); > > > + if (err) > > > + return NULL; > > > + > > > + ns_elapsed = ns_elapsed_timespec(&start, &curr); > > > > nit: if you move re-calculation of ms_remaining and ns_remaining to > > here, I think overall loop logic will be even more straightforwad. You > > can initialize ms_remaining to -1 if timeout_ms < 0 and never > > recalculate it, right? Note that you can also do ns_elapsed conversion > > to ms right here and then keep everything else in ms (so no need for > > timeout_ns, ns_remaining, etc). > > Sounds good, let me give this a shot in v6. > > Thanks for another detailed review!
diff --git a/tools/lib/bpf/libbpf.c b/tools/lib/bpf/libbpf.c index 6b580ba027ba..588cf0474743 100644 --- a/tools/lib/bpf/libbpf.c +++ b/tools/lib/bpf/libbpf.c @@ -2373,6 +2373,12 @@ static size_t adjust_ringbuf_sz(size_t sz) return sz; } +static bool map_is_ringbuf(const struct bpf_map *map) +{ + return map->def.type == BPF_MAP_TYPE_RINGBUF || + map->def.type == BPF_MAP_TYPE_USER_RINGBUF; +} + static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def) { map->def.type = def->map_type; @@ -2387,7 +2393,7 @@ static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def map->btf_value_type_id = def->value_type_id; /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */ - if (map->def.type == BPF_MAP_TYPE_RINGBUF) + if (map_is_ringbuf(map)) map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries); if (def->parts & MAP_DEF_MAP_TYPE) @@ -4370,7 +4376,7 @@ int bpf_map__set_max_entries(struct bpf_map *map, __u32 max_entries) map->def.max_entries = max_entries; /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */ - if (map->def.type == BPF_MAP_TYPE_RINGBUF) + if (map_is_ringbuf(map)) map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries); return 0; diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h index 88a1ac34b12a..92fdbe484482 100644 --- a/tools/lib/bpf/libbpf.h +++ b/tools/lib/bpf/libbpf.h @@ -1011,6 +1011,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook, /* Ring buffer APIs */ struct ring_buffer; +struct user_ring_buffer; typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size); @@ -1030,6 +1031,110 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms); LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb); LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb); +struct user_ring_buffer_opts { + size_t sz; /* size of this struct, for forward/backward compatibility */ +}; + +#define user_ring_buffer_opts__last_field sz + +/* @brief **user_ring_buffer__new()** creates a new instance of a user ring + * buffer. + * + * @param map_fd A file descriptor to a BPF_MAP_TYPE_RINGBUF map. + * @param opts Options for how the ring buffer should be created. + * @return A user ring buffer on success; NULL and errno being set on a + * failure. + */ +LIBBPF_API struct user_ring_buffer * +user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts); + +/* @brief **user_ring_buffer__reserve()** reserves a pointer to a sample in the + * user ring buffer. + * @param rb A pointer to a user ring buffer. + * @param size The size of the sample, in bytes. + * @return A pointer to a reserved region of the user ring buffer; NULL, and + * errno being set if a sample could not be reserved. + * + * This function is *not* thread safe, and callers must synchronize accessing + * this function if there are multiple producers. If a size is requested that + * is larger than the size of the entire ring buffer, errno will be set to + * E2BIG and NULL is returned. If the ring buffer could accommodate the size, + * but currently does not have enough space, errno is set to ENOSPC and NULL is + * returned. + * + * After initializing the sample, callers must invoke + * **user_ring_buffer__submit()** to post the sample to the kernel. Otherwise, + * the sample must be freed with **user_ring_buffer__discard()**. + */ +LIBBPF_API void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size); + +/* @brief **user_ring_buffer__reserve_blocking()** reserves a record in the + * ring buffer, possibly blocking for up to @timeout_ms until a sample becomes + * available. + * @param rb The user ring buffer. + * @param size The size of the sample, in bytes. + * @param timeout_ms The amount of time, in milliseconds, for which the caller + * should block when waiting for a sample. -1 causes the caller to block + * indefinitely. + * @return A pointer to a reserved region of the user ring buffer; NULL, and + * errno being set if a sample could not be reserved. + * + * This function is *not* thread safe, and callers must synchronize + * accessing this function if there are multiple producers + * + * If **timeout_ms** is -1, the function will block indefinitely until a sample + * becomes available. Otherwise, **timeout_ms** must be non-negative, or errno + * is set to EINVAL, and NULL is returned. If **timeout_ms** is 0, no blocking + * will occur and the function will return immediately after attempting to + * reserve a sample. + * + * If **size** is larger than the size of the entire ring buffer, errno is set + * to E2BIG and NULL is returned. If the ring buffer could accommodate + * **size**, but currently does not have enough space, the caller will block + * until at most **timeout_ms** has elapsed. If insufficient space is available + * at that time, errno is set to ENOSPC, and NULL is returned. + * + * The kernel guarantees that it will wake up this thread to check if + * sufficient space is available in the ring buffer at least once per + * invocation of the **bpf_ringbuf_drain()** helper function, provided that at + * least one sample is consumed, and the BPF program did not invoke the + * function with BPF_RB_NO_WAKEUP. A wakeup may occur sooner than that, but the + * kernel does not guarantee this. + * + * When a sample of size **size** is found within **timeout_ms**, a pointer to + * the sample is returned. After initializing the sample, callers must invoke + * **user_ring_buffer__submit()** to post the sample to the ring buffer. + * Otherwise, the sample must be freed with **user_ring_buffer__discard()**. + */ +LIBBPF_API void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, + __u32 size, + int timeout_ms); + +/* @brief **user_ring_buffer__submit()** submits a previously reserved sample + * into the ring buffer. + * @param rb The user ring buffer. + * @param sample A reserved sample. + * + * It is not necessary to synchronize amongst multiple producers when invoking + * this function. + */ +LIBBPF_API void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample); + +/* @brief **user_ring_buffer__discard()** discards a previously reserved sample. + * @param rb The user ring buffer. + * @param sample A reserved sample. + * + * It is not necessary to synchronize amongst multiple producers when invoking + * this function. + */ +LIBBPF_API void user_ring_buffer__discard(struct user_ring_buffer *rb, void *sample); + +/* @brief **user_ring_buffer__free()** frees a ring buffer that was previously + * created with **user_ring_buffer__new()**. + * @param rb The user ring buffer being freed. + */ +LIBBPF_API void user_ring_buffer__free(struct user_ring_buffer *rb); + /* Perf buffer APIs */ struct perf_buffer; diff --git a/tools/lib/bpf/libbpf.map b/tools/lib/bpf/libbpf.map index 2b928dc21af0..c1d6aa7c82b6 100644 --- a/tools/lib/bpf/libbpf.map +++ b/tools/lib/bpf/libbpf.map @@ -368,3 +368,13 @@ LIBBPF_1.0.0 { libbpf_bpf_prog_type_str; perf_buffer__buffer; }; + +LIBBPF_1.1.0 { + global: + user_ring_buffer__discard; + user_ring_buffer__free; + user_ring_buffer__new; + user_ring_buffer__reserve; + user_ring_buffer__reserve_blocking; + user_ring_buffer__submit; +} LIBBPF_1.0.0; diff --git a/tools/lib/bpf/libbpf_probes.c b/tools/lib/bpf/libbpf_probes.c index 6d495656f554..f3a8e8e74eb8 100644 --- a/tools/lib/bpf/libbpf_probes.c +++ b/tools/lib/bpf/libbpf_probes.c @@ -231,6 +231,7 @@ static int probe_map_create(enum bpf_map_type map_type) return btf_fd; break; case BPF_MAP_TYPE_RINGBUF: + case BPF_MAP_TYPE_USER_RINGBUF: key_size = 0; value_size = 0; max_entries = 4096; diff --git a/tools/lib/bpf/libbpf_version.h b/tools/lib/bpf/libbpf_version.h index 2fb2f4290080..e944f5bce728 100644 --- a/tools/lib/bpf/libbpf_version.h +++ b/tools/lib/bpf/libbpf_version.h @@ -4,6 +4,6 @@ #define __LIBBPF_VERSION_H #define LIBBPF_MAJOR_VERSION 1 -#define LIBBPF_MINOR_VERSION 0 +#define LIBBPF_MINOR_VERSION 1 #endif /* __LIBBPF_VERSION_H */ diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c index 8bc117bcc7bc..a5712007c30d 100644 --- a/tools/lib/bpf/ringbuf.c +++ b/tools/lib/bpf/ringbuf.c @@ -16,6 +16,7 @@ #include <asm/barrier.h> #include <sys/mman.h> #include <sys/epoll.h> +#include <time.h> #include "libbpf.h" #include "libbpf_internal.h" @@ -39,6 +40,23 @@ struct ring_buffer { int ring_cnt; }; +struct user_ring_buffer { + struct epoll_event event; + unsigned long *consumer_pos; + unsigned long *producer_pos; + void *data; + unsigned long mask; + size_t page_size; + int map_fd; + int epoll_fd; +}; + +/* 8-byte ring buffer header structure */ +struct ringbuf_hdr { + __u32 len; + __u32 pad; +}; + static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r) { if (r->consumer_pos) { @@ -300,3 +318,255 @@ int ring_buffer__epoll_fd(const struct ring_buffer *rb) { return rb->epoll_fd; } + +static void user_ringbuf_unmap_ring(struct user_ring_buffer *rb) +{ + if (rb->consumer_pos) { + munmap(rb->consumer_pos, rb->page_size); + rb->consumer_pos = NULL; + } + if (rb->producer_pos) { + munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1)); + rb->producer_pos = NULL; + } +} + +void user_ring_buffer__free(struct user_ring_buffer *rb) +{ + if (!rb) + return; + + user_ringbuf_unmap_ring(rb); + + if (rb->epoll_fd >= 0) + close(rb->epoll_fd); + + free(rb); +} + +static int user_ringbuf_map(struct user_ring_buffer *rb, int map_fd) +{ + struct bpf_map_info info; + __u32 len = sizeof(info); + void *tmp; + struct epoll_event *rb_epoll; + int err; + + memset(&info, 0, sizeof(info)); + + err = bpf_obj_get_info_by_fd(map_fd, &info, &len); + if (err) { + err = -errno; + pr_warn("user ringbuf: failed to get map info for fd=%d: %d\n", map_fd, err); + return err; + } + + if (info.type != BPF_MAP_TYPE_USER_RINGBUF) { + pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n", map_fd); + return -EINVAL; + } + + rb->map_fd = map_fd; + rb->mask = info.max_entries - 1; + + /* Map read-only consumer page */ + tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0); + if (tmp == MAP_FAILED) { + err = -errno; + pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %d\n", + map_fd, err); + return err; + } + rb->consumer_pos = tmp; + + /* Map read-write the producer page and data pages. We map the data + * region as twice the total size of the ring buffer to allow the + * simple reading and writing of samples that wrap around the end of + * the buffer. See the kernel implementation for details. + */ + tmp = mmap(NULL, rb->page_size + 2 * info.max_entries, + PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, rb->page_size); + if (tmp == MAP_FAILED) { + err = -errno; + pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n", + map_fd, err); + return err; + } + + rb->producer_pos = tmp; + rb->data = tmp + rb->page_size; + + rb_epoll = &rb->event; + rb_epoll->events = EPOLLOUT; + if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) { + err = -errno; + pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n", map_fd, err); + return err; + } + + return 0; +} + +struct user_ring_buffer * +user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts) +{ + struct user_ring_buffer *rb; + int err; + + if (!OPTS_VALID(opts, user_ring_buffer_opts)) + return errno = EINVAL, NULL; + + rb = calloc(1, sizeof(*rb)); + if (!rb) + return errno = ENOMEM, NULL; + + rb->page_size = getpagesize(); + + rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (rb->epoll_fd < 0) { + err = -errno; + pr_warn("user ringbuf: failed to create epoll instance: %d\n", err); + goto err_out; + } + + err = user_ringbuf_map(rb, map_fd); + if (err) + goto err_out; + + return rb; + +err_out: + user_ring_buffer__free(rb); + return errno = -err, NULL; +} + +static void user_ringbuf__commit(struct user_ring_buffer *rb, void *sample, bool discard) +{ + __u32 new_len; + struct ringbuf_hdr *hdr; + uintptr_t hdr_offset; + + hdr_offset = rb->mask + 1 + (sample - rb->data) - BPF_RINGBUF_HDR_SZ; + hdr = rb->data + (hdr_offset & rb->mask); + + new_len = hdr->len & ~BPF_RINGBUF_BUSY_BIT; + if (discard) + new_len |= BPF_RINGBUF_DISCARD_BIT; + + /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in + * the kernel. + */ + __atomic_exchange_n(&hdr->len, new_len, __ATOMIC_ACQ_REL); +} + +void user_ring_buffer__discard(struct user_ring_buffer *rb, void *sample) +{ + user_ringbuf__commit(rb, sample, true); +} + +void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample) +{ + user_ringbuf__commit(rb, sample, false); +} + +void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size) +{ + __u32 avail_size, total_size, max_size; + /* 64-bit to avoid overflow in case of extreme application behavior */ + __u64 cons_pos, prod_pos; + struct ringbuf_hdr *hdr; + + /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_peek() in + * the kernel. + */ + cons_pos = smp_load_acquire(rb->consumer_pos); + /* Synchronizes with smp_store_release() in user_ringbuf__commit() */ + prod_pos = smp_load_acquire(rb->producer_pos); + + max_size = rb->mask + 1; + avail_size = max_size - (prod_pos - cons_pos); + /* Round up total size to a multiple of 8. */ + total_size = (size + BPF_RINGBUF_HDR_SZ + 7) / 8 * 8; + + if (total_size > max_size) + return errno = E2BIG, NULL; + + if (avail_size < total_size) + return errno = ENOSPC, NULL; + + hdr = rb->data + (prod_pos & rb->mask); + hdr->len = size | BPF_RINGBUF_BUSY_BIT; + hdr->pad = 0; + + /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in + * the kernel. + */ + smp_store_release(rb->producer_pos, prod_pos + total_size); + + return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask); +} + +static __u64 ns_elapsed_timespec(const struct timespec *start, const struct timespec *end) +{ + __u64 start_ns, end_ns, ns_per_s = 1000000000; + + start_ns = (__u64)start->tv_sec * ns_per_s + start->tv_nsec; + end_ns = (__u64)end->tv_sec * ns_per_s + end->tv_nsec; + + return end_ns - start_ns; +} + +void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms) +{ + int err; + struct timespec start; + __u64 ns_per_ms = 1000000, ns_elapsed = 0, timeout_ns; + + if (timeout_ms < 0 && timeout_ms != -1) + return errno = EINVAL, NULL; + + if (timeout_ms != -1) { + err = clock_gettime(CLOCK_MONOTONIC, &start); + if (err) + return NULL; + } + + timeout_ns = timeout_ms * ns_per_ms; + do { + __u64 ns_remaining = timeout_ns - ns_elapsed; + int cnt, ms_remaining; + void *sample; + struct timespec curr; + + sample = user_ring_buffer__reserve(rb, size); + if (sample) + return sample; + else if (errno != ENOSPC) + return NULL; + + ms_remaining = timeout_ms == -1 ? -1 : ns_remaining / ns_per_ms; + /* The kernel guarantees at least one event notification + * delivery whenever at least one sample is drained from the + * ring buffer in an invocation to bpf_ringbuf_drain(). Other + * additional events may be delivered at any time, but only one + * event is guaranteed per bpf_ringbuf_drain() invocation, + * provided that a sample is drained, and the BPF program did + * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain(). + */ + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining); + if (cnt < 0) + return NULL; + + if (timeout_ms == -1) + continue; + + err = clock_gettime(CLOCK_MONOTONIC, &curr); + if (err) + return NULL; + + ns_elapsed = ns_elapsed_timespec(&start, &curr); + } while (ns_elapsed <= timeout_ns); + + errno = ENOSPC; + return NULL; +}
Now that all of the logic is in place in the kernel to support user-space produced ring buffers, we can add the user-space logic to libbpf. This patch therefore adds the following public symbols to libbpf: struct user_ring_buffer * user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts); void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size); void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms); void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample); void user_ring_buffer__discard(struct user_ring_buffer *rb, void user_ring_buffer__free(struct user_ring_buffer *rb); A user-space producer must first create a struct user_ring_buffer * object with user_ring_buffer__new(), and can then reserve samples in the ring buffer using one of the following two symbols: void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size); void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms); With user_ring_buffer__reserve(), a pointer to a 'size' region of the ring buffer will be returned if sufficient space is available in the buffer. user_ring_buffer__reserve_blocking() provides similar semantics, but will block for up to 'timeout_ms' in epoll_wait if there is insufficient space in the buffer. This function has the guarantee from the kernel that it will receive at least one event-notification per invocation to bpf_ringbuf_drain(), provided that at least one sample is drained, and the BPF program did not pass the BPF_RB_NO_WAKEUP flag to bpf_ringbuf_drain(). Once a sample is reserved, it must either be committed to the ring buffer with user_ring_buffer__submit(), or discarded with user_ring_buffer__discard(). Signed-off-by: David Vernet <void@manifault.com> --- tools/lib/bpf/libbpf.c | 10 +- tools/lib/bpf/libbpf.h | 105 +++++++++++++ tools/lib/bpf/libbpf.map | 10 ++ tools/lib/bpf/libbpf_probes.c | 1 + tools/lib/bpf/libbpf_version.h | 2 +- tools/lib/bpf/ringbuf.c | 270 +++++++++++++++++++++++++++++++++ 6 files changed, 395 insertions(+), 3 deletions(-)