Message ID | 20220808155341.2479054-4-void@manifault.com (mailing list archive) |
---|---|
State | Superseded |
Delegated to: | BPF |
Headers | show |
Series | bpf: Add user-space-publisher ringbuffer map type | expand |
On Mon, Aug 8, 2022 at 8:54 AM David Vernet <void@manifault.com> wrote: > > Now that all of the logic is in place in the kernel to support user-space > produced ringbuffers, we can add the user-space logic to libbpf. > > Signed-off-by: David Vernet <void@manifault.com> > --- > kernel/bpf/ringbuf.c | 7 +- > tools/lib/bpf/libbpf.c | 10 +- > tools/lib/bpf/libbpf.h | 19 +++ > tools/lib/bpf/libbpf.map | 6 + > tools/lib/bpf/libbpf_probes.c | 1 + > tools/lib/bpf/ringbuf.c | 216 ++++++++++++++++++++++++++++++++++ > 6 files changed, 256 insertions(+), 3 deletions(-) > > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c > index fc589fc8eb7c..a10558e79ec8 100644 > --- a/kernel/bpf/ringbuf.c > +++ b/kernel/bpf/ringbuf.c > @@ -639,7 +639,9 @@ static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample, > if (!atomic_try_cmpxchg(&rb->busy, &busy, 1)) > return -EBUSY; > > - /* Synchronizes with smp_store_release() in user-space. */ > + /* Synchronizes with smp_store_release() in __ring_buffer_user__commit() > + * in user-space. > + */ let's not hard-code libbpf function names in kernel comments, it's still user-space > prod_pos = smp_load_acquire(&rb->producer_pos); > /* Synchronizes with smp_store_release() in > * __bpf_user_ringbuf_sample_release(). > @@ -695,6 +697,9 @@ __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, > /* To release the ringbuffer, just increment the producer position to > * signal that a new sample can be consumed. The busy bit is cleared by > * userspace when posting a new sample to the ringbuffer. > + * > + * Synchronizes with smp_load_acquire() in ring_buffer_user__reserve() > + * in user-space. > */ same > smp_store_release(&rb->consumer_pos, rb->consumer_pos + size + > BPF_RINGBUF_HDR_SZ); > diff --git a/tools/lib/bpf/libbpf.c b/tools/lib/bpf/libbpf.c > index 9c1f2d09f44e..f7fe09dce643 100644 > --- a/tools/lib/bpf/libbpf.c > +++ b/tools/lib/bpf/libbpf.c > @@ -2367,6 +2367,12 @@ static size_t adjust_ringbuf_sz(size_t sz) > return sz; > } > > +static bool map_is_ringbuf(const struct bpf_map *map) > +{ > + return map->def.type == BPF_MAP_TYPE_RINGBUF || > + map->def.type == BPF_MAP_TYPE_USER_RINGBUF; > +} > + > static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def) > { > map->def.type = def->map_type; > @@ -2381,7 +2387,7 @@ static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def > map->btf_value_type_id = def->value_type_id; > > /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */ > - if (map->def.type == BPF_MAP_TYPE_RINGBUF) > + if (map_is_ringbuf(map)) > map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries); > > if (def->parts & MAP_DEF_MAP_TYPE) > @@ -4363,7 +4369,7 @@ int bpf_map__set_max_entries(struct bpf_map *map, __u32 max_entries) > map->def.max_entries = max_entries; > > /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */ > - if (map->def.type == BPF_MAP_TYPE_RINGBUF) > + if (map_is_ringbuf(map)) > map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries); > > return 0; > diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h > index 61493c4cddac..6d1d0539b08d 100644 > --- a/tools/lib/bpf/libbpf.h > +++ b/tools/lib/bpf/libbpf.h > @@ -1009,6 +1009,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook, > > /* Ring buffer APIs */ > struct ring_buffer; > +struct ring_buffer_user; I know that I'm the one asking to use ring_buffer_user name, but given kernel is using USER_RINGBUF and user_ringbuf naming, let's be consistent and use user_ring_buffer in libbpf as well. I was contemplating uring_buffer, can't make up my mind if it's better or not. Also ring_buffer_user reads like "user of ring buffer", which adds to confusion :) > > typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size); > > @@ -1028,6 +1029,24 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms); > LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb); > LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb); > > +struct ring_buffer_user_opts { > + size_t sz; /* size of this struct, for forward/backward compatibility */ > +}; > + > +#define ring_buffer_user_opts__last_field sz > + > +LIBBPF_API struct ring_buffer_user * > +ring_buffer_user__new(int map_fd, const struct ring_buffer_user_opts *opts); > +LIBBPF_API void *ring_buffer_user__reserve(struct ring_buffer_user *rb, > + uint32_t size); > +LIBBPF_API void *ring_buffer_user__poll(struct ring_buffer_user *rb, > + uint32_t size, int timeout_ms); > +LIBBPF_API void ring_buffer_user__submit(struct ring_buffer_user *rb, > + void *sample); > +LIBBPF_API void ring_buffer_user__discard(struct ring_buffer_user *rb, > + void *sample); > +LIBBPF_API void ring_buffer_user__free(struct ring_buffer_user *rb); > + > /* Perf buffer APIs */ > struct perf_buffer; > > diff --git a/tools/lib/bpf/libbpf.map b/tools/lib/bpf/libbpf.map > index 119e6e1ea7f1..8db11040df1b 100644 > --- a/tools/lib/bpf/libbpf.map > +++ b/tools/lib/bpf/libbpf.map > @@ -365,4 +365,10 @@ LIBBPF_1.0.0 { > libbpf_bpf_map_type_str; > libbpf_bpf_prog_type_str; > perf_buffer__buffer; > + ring_buffer_user__discard; > + ring_buffer_user__free; > + ring_buffer_user__new; > + ring_buffer_user__poll; > + ring_buffer_user__reserve; > + ring_buffer_user__submit; > }; > diff --git a/tools/lib/bpf/libbpf_probes.c b/tools/lib/bpf/libbpf_probes.c > index 6d495656f554..f3a8e8e74eb8 100644 > --- a/tools/lib/bpf/libbpf_probes.c > +++ b/tools/lib/bpf/libbpf_probes.c > @@ -231,6 +231,7 @@ static int probe_map_create(enum bpf_map_type map_type) > return btf_fd; > break; > case BPF_MAP_TYPE_RINGBUF: > + case BPF_MAP_TYPE_USER_RINGBUF: > key_size = 0; > value_size = 0; > max_entries = 4096; > diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c > index 8bc117bcc7bc..86e3c11d8486 100644 > --- a/tools/lib/bpf/ringbuf.c > +++ b/tools/lib/bpf/ringbuf.c > @@ -39,6 +39,17 @@ struct ring_buffer { > int ring_cnt; > }; > > +struct ring_buffer_user { > + struct epoll_event event; > + unsigned long *consumer_pos; > + unsigned long *producer_pos; > + void *data; > + unsigned long mask; > + size_t page_size; > + int map_fd; > + int epoll_fd; > +}; > + > static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r) > { > if (r->consumer_pos) { > @@ -300,3 +311,208 @@ int ring_buffer__epoll_fd(const struct ring_buffer *rb) > { > return rb->epoll_fd; > } > + > +static void __user_ringbuf_unmap_ring(struct ring_buffer_user *rb) libbpf generally doesn't use double underscore naming pattern, let's not do that here as well > +{ > + if (rb->consumer_pos) { > + munmap(rb->consumer_pos, rb->page_size); > + rb->consumer_pos = NULL; > + } > + if (rb->producer_pos) { > + munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1)); > + rb->producer_pos = NULL; > + } > +} > + > +void ring_buffer_user__free(struct ring_buffer_user *rb) > +{ > + if (!rb) > + return; > + > + __user_ringbuf_unmap_ring(rb); > + > + if (rb->epoll_fd >= 0) > + close(rb->epoll_fd); > + > + free(rb); > +} > + > +static int __ring_buffer_user_map(struct ring_buffer_user *rb, int map_fd) > +{ > + please don't leave stray empty lines around the code > + struct bpf_map_info info; > + __u32 len = sizeof(info); > + void *tmp; > + struct epoll_event *rb_epoll; > + int err; > + > + memset(&info, 0, sizeof(info)); > + > + err = bpf_obj_get_info_by_fd(map_fd, &info, &len); > + if (err) { > + err = -errno; > + pr_warn("user ringbuf: failed to get map info for fd=%d: %d\n", > + map_fd, err); > + return libbpf_err(err); > + } > + > + if (info.type != BPF_MAP_TYPE_USER_RINGBUF) { > + pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n", > + map_fd); > + return libbpf_err(-EINVAL); > + } > + > + rb->map_fd = map_fd; > + rb->mask = info.max_entries - 1; > + > + /* Map read-only consumer page */ > + tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0); > + if (tmp == MAP_FAILED) { > + err = -errno; > + pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %d\n", > + map_fd, err); > + return libbpf_err(err); > + } > + rb->consumer_pos = tmp; > + > + /* Map read-write the producer page and data pages. We map the data > + * region as twice the total size of the ringbuffer to allow the simple > + * reading and writing of samples that wrap around the end of the > + * buffer. See the kernel implementation for details. > + */ > + tmp = mmap(NULL, rb->page_size + 2 * info.max_entries, > + PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, rb->page_size); > + if (tmp == MAP_FAILED) { > + err = -errno; > + pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n", > + map_fd, err); > + return libbpf_err(err); > + } > + > + rb->producer_pos = tmp; > + rb->data = tmp + rb->page_size; > + > + rb_epoll = &rb->event; > + rb_epoll->events = EPOLLOUT; > + if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) { > + err = -errno; > + pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n", > + map_fd, err); > + return libbpf_err(err); > + } > + > + return 0; > +} > + > +struct ring_buffer_user * > +ring_buffer_user__new(int map_fd, const struct ring_buffer_user_opts *opts) > +{ > + struct ring_buffer_user *rb; > + int err; > + > + if (!OPTS_VALID(opts, ring_buffer_opts)) > + return errno = EINVAL, NULL; > + > + rb = calloc(1, sizeof(*rb)); > + if (!rb) > + return errno = ENOMEM, NULL; > + > + rb->page_size = getpagesize(); > + > + rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC); > + if (rb->epoll_fd < 0) { > + err = -errno; > + pr_warn("user ringbuf: failed to create epoll instance: %d\n", > + err); > + goto err_out; > + } > + > + err = __ring_buffer_user_map(rb, map_fd); > + if (err) > + goto err_out; > + > + return rb; > + > +err_out: > + ring_buffer_user__free(rb); > + return errno = -err, NULL; > +} > + > +static void __ring_buffer_user__commit(struct ring_buffer_user *rb) > +{ > + uint32_t *hdr; > + uint32_t total_len; > + unsigned long prod_pos; > + > + prod_pos = *rb->producer_pos; > + hdr = rb->data + (prod_pos & rb->mask); > + > + total_len = *hdr + BPF_RINGBUF_HDR_SZ; round up to multiple of 8 > + > + /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_poll() in > + * the kernel. > + */ > + smp_store_release(rb->producer_pos, prod_pos + total_len); > +} > + > +/* Discard a previously reserved sample into the ring buffer. Because the user > + * ringbuffer is assumed to be single producer, this can simply be a no-op, and > + * the producer pointer is left in the same place as when it was reserved. > + */ > +void ring_buffer_user__discard(struct ring_buffer_user *rb, void *sample) > +{} { } > + > +/* Submit a previously reserved sample into the ring buffer. > + */ nit: this is single-line comment > +void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample) > +{ > + __ring_buffer_user__commit(rb); > +} this made me think that it's probably best to add kernel support for busy bit anyways (just like for existing ringbuf), so that we can eventually turn this into multi-producer on user-space side (all we need is a lock, really). So let's anticipate that on kernel side from the very beginning > + > +/* Reserve a pointer to a sample in the user ring buffer. This function is *not* > + * thread safe, and the ring-buffer supports only a single producer. > + */ > +void *ring_buffer_user__reserve(struct ring_buffer_user *rb, uint32_t size) > +{ > + uint32_t *hdr; > + /* 64-bit to avoid overflow in case of extreme application behavior */ > + size_t avail_size, total_size, max_size; size_t is not guaranteed to be 64-bit, neither is long > + unsigned long cons_pos, prod_pos; > + > + /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_poll() in > + * the kernel. > + */ > + cons_pos = smp_load_acquire(rb->consumer_pos); > + /* Synchronizes with smp_store_release() in __ring_buffer_user__commit() > + */ > + prod_pos = smp_load_acquire(rb->producer_pos); > + > + max_size = rb->mask + 1; > + avail_size = max_size - (prod_pos - cons_pos); > + total_size = size + BPF_RINGBUF_HDR_SZ; round_up(8) > + > + if (total_size > max_size || avail_size < total_size) > + return NULL; set errno as well? > + > + hdr = rb->data + (prod_pos & rb->mask); > + *hdr = size; I'd make sure that entire 8 bytes of header are zero-initialized, for better future extensibility > + > + /* Producer pos is updated when a sample is submitted. */ > + > + return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask); > +} > + > +/* Poll for available space in the ringbuffer, and reserve a record when it > + * becomes available. > + */ > +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size, > + int timeout_ms) > +{ > + int cnt; > + > + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms); > + if (cnt < 0) > + return NULL; > + > + return ring_buffer_user__reserve(rb, size); it's not clear how just doing epoll_wait() guarantees that we have >= size of space available?.. Seems like some tests are missing? > +} > -- > 2.30.2 >
On Thu, Aug 11, 2022 at 04:39:57PM -0700, Andrii Nakryiko wrote: [...] > > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c > > index fc589fc8eb7c..a10558e79ec8 100644 > > --- a/kernel/bpf/ringbuf.c > > +++ b/kernel/bpf/ringbuf.c > > @@ -639,7 +639,9 @@ static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample, > > if (!atomic_try_cmpxchg(&rb->busy, &busy, 1)) > > return -EBUSY; > > > > - /* Synchronizes with smp_store_release() in user-space. */ > > + /* Synchronizes with smp_store_release() in __ring_buffer_user__commit() > > + * in user-space. > > + */ > > let's not hard-code libbpf function names in kernel comments, it's > still user-space Fair enough. > > prod_pos = smp_load_acquire(&rb->producer_pos); > > /* Synchronizes with smp_store_release() in > > * __bpf_user_ringbuf_sample_release(). > > @@ -695,6 +697,9 @@ __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, > > /* To release the ringbuffer, just increment the producer position to > > * signal that a new sample can be consumed. The busy bit is cleared by > > * userspace when posting a new sample to the ringbuffer. > > + * > > + * Synchronizes with smp_load_acquire() in ring_buffer_user__reserve() > > + * in user-space. > > */ > > same > > > smp_store_release(&rb->consumer_pos, rb->consumer_pos + size + > > BPF_RINGBUF_HDR_SZ); > > diff --git a/tools/lib/bpf/libbpf.c b/tools/lib/bpf/libbpf.c > > index 9c1f2d09f44e..f7fe09dce643 100644 > > --- a/tools/lib/bpf/libbpf.c > > +++ b/tools/lib/bpf/libbpf.c > > @@ -2367,6 +2367,12 @@ static size_t adjust_ringbuf_sz(size_t sz) > > return sz; > > } > > > > +static bool map_is_ringbuf(const struct bpf_map *map) > > +{ > > + return map->def.type == BPF_MAP_TYPE_RINGBUF || > > + map->def.type == BPF_MAP_TYPE_USER_RINGBUF; > > +} > > + > > static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def) > > { > > map->def.type = def->map_type; > > @@ -2381,7 +2387,7 @@ static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def > > map->btf_value_type_id = def->value_type_id; > > > > /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */ > > - if (map->def.type == BPF_MAP_TYPE_RINGBUF) > > + if (map_is_ringbuf(map)) > > map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries); > > > > if (def->parts & MAP_DEF_MAP_TYPE) > > @@ -4363,7 +4369,7 @@ int bpf_map__set_max_entries(struct bpf_map *map, __u32 max_entries) > > map->def.max_entries = max_entries; > > > > /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */ > > - if (map->def.type == BPF_MAP_TYPE_RINGBUF) > > + if (map_is_ringbuf(map)) > > map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries); > > > > return 0; > > diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h > > index 61493c4cddac..6d1d0539b08d 100644 > > --- a/tools/lib/bpf/libbpf.h > > +++ b/tools/lib/bpf/libbpf.h > > @@ -1009,6 +1009,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook, > > > > /* Ring buffer APIs */ > > struct ring_buffer; > > +struct ring_buffer_user; > > > I know that I'm the one asking to use ring_buffer_user name, but given > kernel is using USER_RINGBUF and user_ringbuf naming, let's be > consistent and use user_ring_buffer in libbpf as well. I was > contemplating uring_buffer, can't make up my mind if it's better or > not. Not a problem, I'll revert it back to user_ring_buffer. > Also ring_buffer_user reads like "user of ring buffer", which adds to > confusion :) [...] > > +static void __user_ringbuf_unmap_ring(struct ring_buffer_user *rb) > > libbpf generally doesn't use double underscore naming pattern, let's > not do that here as well Ack. > > +{ > > + if (rb->consumer_pos) { > > + munmap(rb->consumer_pos, rb->page_size); > > + rb->consumer_pos = NULL; > > + } > > + if (rb->producer_pos) { > > + munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1)); > > + rb->producer_pos = NULL; > > + } > > +} > > + > > +void ring_buffer_user__free(struct ring_buffer_user *rb) > > +{ > > + if (!rb) > > + return; > > + > > + __user_ringbuf_unmap_ring(rb); > > + > > + if (rb->epoll_fd >= 0) > > + close(rb->epoll_fd); > > + > > + free(rb); > > +} > > + > > +static int __ring_buffer_user_map(struct ring_buffer_user *rb, int map_fd) > > +{ > > + > > please don't leave stray empty lines around the code Sorry, not sure how I missed that. > > +static void __ring_buffer_user__commit(struct ring_buffer_user *rb) > > +{ > > + uint32_t *hdr; > > + uint32_t total_len; > > + unsigned long prod_pos; > > + > > + prod_pos = *rb->producer_pos; > > + hdr = rb->data + (prod_pos & rb->mask); > > + > > + total_len = *hdr + BPF_RINGBUF_HDR_SZ; > > round up to multiple of 8 Will do, and I'll also validate this in the kernel. > > + > > + /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_poll() in > > + * the kernel. > > + */ > > + smp_store_release(rb->producer_pos, prod_pos + total_len); > > +} > > + > > +/* Discard a previously reserved sample into the ring buffer. Because the user > > + * ringbuffer is assumed to be single producer, this can simply be a no-op, and > > + * the producer pointer is left in the same place as when it was reserved. > > + */ > > +void ring_buffer_user__discard(struct ring_buffer_user *rb, void *sample) > > +{} > > { > } Ack. > > + > > +/* Submit a previously reserved sample into the ring buffer. > > + */ > > nit: this is single-line comment Ack. > > +void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample) > > +{ > > + __ring_buffer_user__commit(rb); > > +} > > this made me think that it's probably best to add kernel support for > busy bit anyways (just like for existing ringbuf), so that we can > eventually turn this into multi-producer on user-space side (all we > need is a lock, really). So let's anticipate that on kernel side from > the very beginning Hmm, yeah, fair enough. We have the extra space in the sample header to OR the busy bit, and we already have a 2-stage reserve -> commit workflow, so we might as well. I'll go ahead and add this, and then hopefully someday we can just add a lock, as you mentioned. > > +/* Reserve a pointer to a sample in the user ring buffer. This function is *not* > > + * thread safe, and the ring-buffer supports only a single producer. > > + */ > > +void *ring_buffer_user__reserve(struct ring_buffer_user *rb, uint32_t size) > > +{ > > + uint32_t *hdr; > > + /* 64-bit to avoid overflow in case of extreme application behavior */ > > + size_t avail_size, total_size, max_size; > > size_t is not guaranteed to be 64-bit, neither is long Sorry, you're right. I'll just use explicit bit-width types. > > + unsigned long cons_pos, prod_pos; > > + > > + /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_poll() in > > + * the kernel. > > + */ > > + cons_pos = smp_load_acquire(rb->consumer_pos); > > + /* Synchronizes with smp_store_release() in __ring_buffer_user__commit() > > + */ > > + prod_pos = smp_load_acquire(rb->producer_pos); > > + > > + max_size = rb->mask + 1; > > + avail_size = max_size - (prod_pos - cons_pos); > > + total_size = size + BPF_RINGBUF_HDR_SZ; > > round_up(8) Ack. > > + > > + if (total_size > max_size || avail_size < total_size) > > + return NULL; > > set errno as well? Will do. > > + > > + hdr = rb->data + (prod_pos & rb->mask); > > + *hdr = size; > > I'd make sure that entire 8 bytes of header are zero-initialized, for > better future extensibility Will do. > > + > > + /* Producer pos is updated when a sample is submitted. */ > > + > > + return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask); > > +} > > + > > +/* Poll for available space in the ringbuffer, and reserve a record when it > > + * becomes available. > > + */ > > +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size, > > + int timeout_ms) > > +{ > > + int cnt; > > + > > + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms); > > + if (cnt < 0) > > + return NULL; > > + > > + return ring_buffer_user__reserve(rb, size); > > it's not clear how just doing epoll_wait() guarantees that we have >= > size of space available?.. Seems like some tests are missing? Right now, the kernel only kicks the polling writer once it's drained all of the samples from the ring buffer. So at this point, if there's not enough size in the buffer, there would be nothing we could do regardless. This seemed like reasonable, simple behavior for the initial implementation. I can make it a bit more intelligent if you'd like, and return EPOLLRWNORM as soon as there is any space in the buffer, and have libbpf potentially make multiple calls to epoll_wait() until enough space has become available.
On Fri, Aug 12, 2022 at 10:28 AM David Vernet <void@manifault.com> wrote: > > On Thu, Aug 11, 2022 at 04:39:57PM -0700, Andrii Nakryiko wrote: > > [...] > > > > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c > > > index fc589fc8eb7c..a10558e79ec8 100644 > > > --- a/kernel/bpf/ringbuf.c > > > +++ b/kernel/bpf/ringbuf.c > > > @@ -639,7 +639,9 @@ static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample, > > > if (!atomic_try_cmpxchg(&rb->busy, &busy, 1)) > > > return -EBUSY; > > > [...] > > > +void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample) > > > +{ > > > + __ring_buffer_user__commit(rb); > > > +} > > > > this made me think that it's probably best to add kernel support for > > busy bit anyways (just like for existing ringbuf), so that we can > > eventually turn this into multi-producer on user-space side (all we > > need is a lock, really). So let's anticipate that on kernel side from > > the very beginning > > Hmm, yeah, fair enough. We have the extra space in the sample header to OR the > busy bit, and we already have a 2-stage reserve -> commit workflow, so we might > as well. I'll go ahead and add this, and then hopefully someday we can just add > a lock, as you mentioned. Right. We can probably also just document that reserve() step is the only one that needs serialization among multiple producers (and currently is required to taken care of by user app), while commit (submit/discard) operation is thread-safe and needs no synchronization. The only reason we don't add it to libbpf right now is because we are unsure about taking explicit dependency on pthread library. But I also just found [0], so I don't know, maybe we should use that? I wonder if it's supported by musl and other less full-featured libc implementations, though. [0] https://www.gnu.org/software/libc/manual/html_node/ISO-C-Mutexes.html > > > > +/* Reserve a pointer to a sample in the user ring buffer. This function is *not* > > > + * thread safe, and the ring-buffer supports only a single producer. > > > + */ > > > +void *ring_buffer_user__reserve(struct ring_buffer_user *rb, uint32_t size) > > > +{ > > > + uint32_t *hdr; > > > + /* 64-bit to avoid overflow in case of extreme application behavior */ > > > + size_t avail_size, total_size, max_size; > > > > size_t is not guaranteed to be 64-bit, neither is long [...] > > > +/* Poll for available space in the ringbuffer, and reserve a record when it > > > + * becomes available. > > > + */ > > > +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size, > > > + int timeout_ms) > > > +{ > > > + int cnt; > > > + > > > + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms); > > > + if (cnt < 0) > > > + return NULL; > > > + > > > + return ring_buffer_user__reserve(rb, size); > > > > it's not clear how just doing epoll_wait() guarantees that we have >= > > size of space available?.. Seems like some tests are missing? > > Right now, the kernel only kicks the polling writer once it's drained all > of the samples from the ring buffer. So at this point, if there's not > enough size in the buffer, there would be nothing we could do regardless. > This seemed like reasonable, simple behavior for the initial > implementation. I can make it a bit more intelligent if you'd like, and > return EPOLLRWNORM as soon as there is any space in the buffer, and have > libbpf potentially make multiple calls to epoll_wait() until enough space > has become available. So this "drain all samples" notion is not great: you can end drain prematurely and thus not really drain all the data in ringbuf. With multiple producers there could also be always more data coming in in parallel. Plus, when in the future we'll have BPF program associated with such ringbuf on the kernel side, we won't have a notion of draining queue, we'll be just submitting record and letting kernel handle it eventually. So I think yeah, you'd have to send notification when at least one sample gets consumed. The problem is that it's going to be a performance hit, potentially, if you are going to do this notification for each consumed sample. BPF ringbuf gets somewhat around that by using heuristic to avoid notification if we see that consumer is still behind kernel when kernel submits a new sample. I don't know if we can do anything clever here for waiting for some space to be available... Any thoughts? As for making libbpf loop until enough space is available... I guess that would be the only reasonable implementation, right? I wonder if calling it "user_ring_buffer__reserve_blocking()" would be a better name than just "poll", though?
On Tue, Aug 16, 2022 at 12:09:53PM -0700, Andrii Nakryiko wrote: > > > > +void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample) > > > > +{ > > > > + __ring_buffer_user__commit(rb); > > > > +} > > > > > > this made me think that it's probably best to add kernel support for > > > busy bit anyways (just like for existing ringbuf), so that we can > > > eventually turn this into multi-producer on user-space side (all we > > > need is a lock, really). So let's anticipate that on kernel side from > > > the very beginning > > > > Hmm, yeah, fair enough. We have the extra space in the sample header to OR the > > busy bit, and we already have a 2-stage reserve -> commit workflow, so we might > > as well. I'll go ahead and add this, and then hopefully someday we can just add > > a lock, as you mentioned. > > Right. We can probably also just document that reserve() step is the > only one that needs serialization among multiple producers (and > currently is required to taken care of by user app), while commit > (submit/discard) operation is thread-safe and needs no > synchronization. Sounds good. > The only reason we don't add it to libbpf right now is because we are > unsure about taking explicit dependency on pthread library. But I also > just found [0], so I don't know, maybe we should use that? I wonder if > it's supported by musl and other less full-featured libc > implementations, though. > > [0] https://www.gnu.org/software/libc/manual/html_node/ISO-C-Mutexes.html IMHO, and others may disagree, if it's in the C standard it seems like it should be fair game to add to libbpf? Also FWIW, it looks like musl does support it. See mtx_*.c in [0]. [0] https://git.musl-libc.org/cgit/musl/tree/src/thread That being said, I would like to try and keep this decision outside the scope of user-ringbuf though, if possible. Would you be OK this landing as is (modulo further discussion, revisions, etc, of course), and then we can update this implementation to be multi-producer if and when we've added something like mtx_t support in a follow-on patch-set? [...] > > > > +/* Poll for available space in the ringbuffer, and reserve a record when it > > > > + * becomes available. > > > > + */ > > > > +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size, > > > > + int timeout_ms) > > > > +{ > > > > + int cnt; > > > > + > > > > + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms); > > > > + if (cnt < 0) > > > > + return NULL; > > > > + > > > > + return ring_buffer_user__reserve(rb, size); > > > > > > it's not clear how just doing epoll_wait() guarantees that we have >= > > > size of space available?.. Seems like some tests are missing? > > > > Right now, the kernel only kicks the polling writer once it's drained all > > of the samples from the ring buffer. So at this point, if there's not > > enough size in the buffer, there would be nothing we could do regardless. > > This seemed like reasonable, simple behavior for the initial > > implementation. I can make it a bit more intelligent if you'd like, and > > return EPOLLRWNORM as soon as there is any space in the buffer, and have > > libbpf potentially make multiple calls to epoll_wait() until enough space > > has become available. > > So this "drain all samples" notion is not great: you can end drain > prematurely and thus not really drain all the data in ringbuf.With > multiple producers there could also be always more data coming in in > parallel. Plus, when in the future we'll have BPF program associated > with such ringbuf on the kernel side, we won't have a notion of > draining queue, we'll be just submitting record and letting kernel > handle it eventually. I don't disagree with any of your points. I think what we'll have to decide-on is a trade-off between performance and usability. As you pointed out, if we only kick user-space once the ringbuffer is empty, that imposes the requirement on the kernel that it will always drain the ringbuffer. That might not even be possible though if we have multiple producers posting data in parallel. More on this below, but the TL;DR is that I agree with you, and I think having a model where we kick user-space whenever a sample is consumed from the buffer is a lot easier to reason about, and probably our only option if our plan is to make the ringbuffer MPMC. I'll make this change in v3. > So I think yeah, you'd have to send notification when at least one > sample gets consumed. The problem is that it's going to be a > performance hit, potentially, if you are going to do this notification > for each consumed sample. BPF ringbuf gets somewhat around that by > using heuristic to avoid notification if we see that consumer is still > behind kernel when kernel submits a new sample. Something perhaps worth pointing out here is that this heuristic works because the kernel-producer ringbuffer is MPSC. If it were MPMC, we'd potentially have the same problem you pointed out above where you'd never wake up an epoll-waiter because other consumers would drain the buffer, and by the time the kernel got around to posting another sample, could observe that consumer_pos == producer_pos, and either wouldn't wake up anyone on the waitq, or wouldn't return any events from ringbuf_map_poll(). If our intention is to make user-space ringbuffers MPMC, it becomes more difficult to use these nice heuristics. > I don't know if we can do anything clever here for waiting for some > space to be available... Any thoughts? Hmmm, yeah, nothing clever is coming to mind. The problem is that we can't make assumptions about why user-space would be epoll-waiting on the ringbuffer because because it's a producer, and the user-space producer is free to post variably sized samples. For example, I was initially considering whether we could do a heuristic where we notify the producer only if the buffer was previously full / producer_pos was ringbuf size away from consumer_pos when we drained a sample, but that doesn't work at all because there could be space in the ringbuffer, but user-space is epoll-waiting for *more* space to become available for some large sample that it wants to publish. I think the only options we have are: 1. Send a notification (i.e. schedule bpf_ringbuf_notify() using irq_work_queue(), and then return EPOLLOUT | EPOLLWRNORM if the ringbuffer is not full), every time a sample is drained. 2. Keep the behavior in v1, wherein we have a contract that the kernel will always eventually drain the ringbuffer, and will kick user-space when the buffer is empty. I think a requirement here would also be that the ringbuffer would be SPMC, or we decide that it's acceptable for some producers to sleep indefinitely if other producers keep reading samples, and that the caller just needs to be aware of this as a possibility. My two cents are that we go with (1), and then consider our options later if we need to optimize. > As for making libbpf loop until enough space is available... I guess > that would be the only reasonable implementation, right? I wonder if > calling it "user_ring_buffer__reserve_blocking()" would be a better > name than just "poll", though? I went with user_ring_buffer__poll() to match the complementary function for the user-space consumer function for kernel-producer ringbuffers: ring_buffer__poll(). I personally prefer user_ring_buffer__reserve_blocking() because the fact that it's doing an epoll-wait is entirely an implementation detail. I'll go ahead and make that change in v3. Thanks, David
On Wed, Aug 17, 2022 at 09:02:14AM -0500, David Vernet wrote: [...] > > > > > +/* Poll for available space in the ringbuffer, and reserve a record when it > > > > > + * becomes available. > > > > > + */ > > > > > +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size, > > > > > + int timeout_ms) > > > > > +{ > > > > > + int cnt; > > > > > + > > > > > + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms); > > > > > + if (cnt < 0) > > > > > + return NULL; > > > > > + > > > > > + return ring_buffer_user__reserve(rb, size); > > > > > > > > it's not clear how just doing epoll_wait() guarantees that we have >= > > > > size of space available?.. Seems like some tests are missing? > > > > > > Right now, the kernel only kicks the polling writer once it's drained all > > > of the samples from the ring buffer. So at this point, if there's not > > > enough size in the buffer, there would be nothing we could do regardless. > > > This seemed like reasonable, simple behavior for the initial > > > implementation. I can make it a bit more intelligent if you'd like, and > > > return EPOLLRWNORM as soon as there is any space in the buffer, and have > > > libbpf potentially make multiple calls to epoll_wait() until enough space > > > has become available. > > > > So this "drain all samples" notion is not great: you can end drain > > prematurely and thus not really drain all the data in ringbuf.With > > multiple producers there could also be always more data coming in in > > parallel. Plus, when in the future we'll have BPF program associated > > with such ringbuf on the kernel side, we won't have a notion of > > draining queue, we'll be just submitting record and letting kernel > > handle it eventually. > > I don't disagree with any of your points. I think what we'll have to > decide-on is a trade-off between performance and usability. As you pointed > out, if we only kick user-space once the ringbuffer is empty, that imposes > the requirement on the kernel that it will always drain the ringbuffer. > That might not even be possible though if we have multiple producers > posting data in parallel. > > More on this below, but the TL;DR is that I agree with you, and I think > having a model where we kick user-space whenever a sample is consumed from > the buffer is a lot easier to reason about, and probably our only option if > our plan is to make the ringbuffer MPMC. I'll make this change in v3. > > > So I think yeah, you'd have to send notification when at least one > > sample gets consumed. The problem is that it's going to be a > > performance hit, potentially, if you are going to do this notification > > for each consumed sample. BPF ringbuf gets somewhat around that by > > using heuristic to avoid notification if we see that consumer is still > > behind kernel when kernel submits a new sample. > > Something perhaps worth pointing out here is that this heuristic works > because the kernel-producer ringbuffer is MPSC. If it were MPMC, we'd > potentially have the same problem you pointed out above where you'd never > wake up an epoll-waiter because other consumers would drain the buffer, and > by the time the kernel got around to posting another sample, could observe > that consumer_pos == producer_pos, and either wouldn't wake up anyone on > the waitq, or wouldn't return any events from ringbuf_map_poll(). If our > intention is to make user-space ringbuffers MPMC, it becomes more difficult > to use these nice heuristics. > > > I don't know if we can do anything clever here for waiting for some > > space to be available... Any thoughts? > > Hmmm, yeah, nothing clever is coming to mind. The problem is that we can't > make assumptions about why user-space would be epoll-waiting on the > ringbuffer because because it's a producer, and the user-space producer is > free to post variably sized samples. > > For example, I was initially considering whether we could do a heuristic > where we notify the producer only if the buffer was previously full / > producer_pos was ringbuf size away from consumer_pos when we drained a > sample, but that doesn't work at all because there could be space in the > ringbuffer, but user-space is epoll-waiting for *more* space to become > available for some large sample that it wants to publish. > > I think the only options we have are: > > 1. Send a notification (i.e. schedule bpf_ringbuf_notify() using > irq_work_queue(), and then return EPOLLOUT | EPOLLWRNORM if > the ringbuffer is not full), every time a sample is drained. > > 2. Keep the behavior in v1, wherein we have a contract that the kernel will > always eventually drain the ringbuffer, and will kick user-space when the > buffer is empty. I think a requirement here would also be that the > ringbuffer would be SPMC, or we decide that it's acceptable for some > producers to sleep indefinitely if other producers keep reading samples, > and that the caller just needs to be aware of this as a possibility. > > My two cents are that we go with (1), and then consider our options later > if we need to optimize. Unfortunately this causes the system to hang because of how many times we're invoking irq_work_queue() in rapid succession. So, I'm not sure that notifying the user-space producer after _every_ sample is going to work. Another option that occurred to me, however, is to augment the initial approach taken in v1. Rather than only sending a notification when the ringbuffer is empty, we could instead guarantee that we'll always send a notification when bpf_ringbuf_drain() is called and at least one message was consumed. This is slightly stronger than our earlier guarantee because we may send a notification even if the ringbuffer is only partially drained, and it avoids some of the associated pitfalls that you pointed out (e.g. that a BPF program is now responsible for always draining the ringbuffer in order to ensure that an epoll-waiting user-space producer will always be woken up). The down-side of this is that it still may not be aggressive enough in waking up user-space producers who could have had data to post as soon as space was available in the ringbuffer. It won't potentially cause them to block indefinitely as the first approach did, but they'll still have to wait for the end of bpf_ringbuf_drain() to be woken up, when they could potentially have been woken up sooner to start publishing samples. It's hard to say whether this will be prohibitive to the feature being usable, but in my opinion it's more useful to have this functionality (and make its behavior very clearly described in comments and the public documentation for the APIs) in an imperfect form than to not have it at all. Additionally, a plus-side to this approach is that it gives us some flexibility to send more event notifications if we want to. For example, we could add a heuristic where we always send a notification if the buffer was previously full before a sample was consumed (in contrast to always invoking irq_work_queue() _whenever_ a sample is consumed), and then always send a notification at the end of bpf_ringbuf_drain() to catch any producers who were waiting for a larger sample than what was afforded by the initial heuristic notification. I think this should address the likely common use-case of samples being statically sized (so posting an event as soon as the ringbuffer is no longer empty should almost always be sufficient). We discussed this offline, so I'll go ahead and implement it for the next version. One thing we didn't discuss was the example heuristic described above, but I'm going to include that because it seems like the complementary behavior to what we do for kernel-producer ringbuffers. If folks aren't a fan, we can remove it in favor of only sending a single notification in bpf_ringbuf_drain(). Thanks, David
diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c index fc589fc8eb7c..a10558e79ec8 100644 --- a/kernel/bpf/ringbuf.c +++ b/kernel/bpf/ringbuf.c @@ -639,7 +639,9 @@ static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample, if (!atomic_try_cmpxchg(&rb->busy, &busy, 1)) return -EBUSY; - /* Synchronizes with smp_store_release() in user-space. */ + /* Synchronizes with smp_store_release() in __ring_buffer_user__commit() + * in user-space. + */ prod_pos = smp_load_acquire(&rb->producer_pos); /* Synchronizes with smp_store_release() in * __bpf_user_ringbuf_sample_release(). @@ -695,6 +697,9 @@ __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, /* To release the ringbuffer, just increment the producer position to * signal that a new sample can be consumed. The busy bit is cleared by * userspace when posting a new sample to the ringbuffer. + * + * Synchronizes with smp_load_acquire() in ring_buffer_user__reserve() + * in user-space. */ smp_store_release(&rb->consumer_pos, rb->consumer_pos + size + BPF_RINGBUF_HDR_SZ); diff --git a/tools/lib/bpf/libbpf.c b/tools/lib/bpf/libbpf.c index 9c1f2d09f44e..f7fe09dce643 100644 --- a/tools/lib/bpf/libbpf.c +++ b/tools/lib/bpf/libbpf.c @@ -2367,6 +2367,12 @@ static size_t adjust_ringbuf_sz(size_t sz) return sz; } +static bool map_is_ringbuf(const struct bpf_map *map) +{ + return map->def.type == BPF_MAP_TYPE_RINGBUF || + map->def.type == BPF_MAP_TYPE_USER_RINGBUF; +} + static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def) { map->def.type = def->map_type; @@ -2381,7 +2387,7 @@ static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def map->btf_value_type_id = def->value_type_id; /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */ - if (map->def.type == BPF_MAP_TYPE_RINGBUF) + if (map_is_ringbuf(map)) map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries); if (def->parts & MAP_DEF_MAP_TYPE) @@ -4363,7 +4369,7 @@ int bpf_map__set_max_entries(struct bpf_map *map, __u32 max_entries) map->def.max_entries = max_entries; /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */ - if (map->def.type == BPF_MAP_TYPE_RINGBUF) + if (map_is_ringbuf(map)) map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries); return 0; diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h index 61493c4cddac..6d1d0539b08d 100644 --- a/tools/lib/bpf/libbpf.h +++ b/tools/lib/bpf/libbpf.h @@ -1009,6 +1009,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook, /* Ring buffer APIs */ struct ring_buffer; +struct ring_buffer_user; typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size); @@ -1028,6 +1029,24 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms); LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb); LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb); +struct ring_buffer_user_opts { + size_t sz; /* size of this struct, for forward/backward compatibility */ +}; + +#define ring_buffer_user_opts__last_field sz + +LIBBPF_API struct ring_buffer_user * +ring_buffer_user__new(int map_fd, const struct ring_buffer_user_opts *opts); +LIBBPF_API void *ring_buffer_user__reserve(struct ring_buffer_user *rb, + uint32_t size); +LIBBPF_API void *ring_buffer_user__poll(struct ring_buffer_user *rb, + uint32_t size, int timeout_ms); +LIBBPF_API void ring_buffer_user__submit(struct ring_buffer_user *rb, + void *sample); +LIBBPF_API void ring_buffer_user__discard(struct ring_buffer_user *rb, + void *sample); +LIBBPF_API void ring_buffer_user__free(struct ring_buffer_user *rb); + /* Perf buffer APIs */ struct perf_buffer; diff --git a/tools/lib/bpf/libbpf.map b/tools/lib/bpf/libbpf.map index 119e6e1ea7f1..8db11040df1b 100644 --- a/tools/lib/bpf/libbpf.map +++ b/tools/lib/bpf/libbpf.map @@ -365,4 +365,10 @@ LIBBPF_1.0.0 { libbpf_bpf_map_type_str; libbpf_bpf_prog_type_str; perf_buffer__buffer; + ring_buffer_user__discard; + ring_buffer_user__free; + ring_buffer_user__new; + ring_buffer_user__poll; + ring_buffer_user__reserve; + ring_buffer_user__submit; }; diff --git a/tools/lib/bpf/libbpf_probes.c b/tools/lib/bpf/libbpf_probes.c index 6d495656f554..f3a8e8e74eb8 100644 --- a/tools/lib/bpf/libbpf_probes.c +++ b/tools/lib/bpf/libbpf_probes.c @@ -231,6 +231,7 @@ static int probe_map_create(enum bpf_map_type map_type) return btf_fd; break; case BPF_MAP_TYPE_RINGBUF: + case BPF_MAP_TYPE_USER_RINGBUF: key_size = 0; value_size = 0; max_entries = 4096; diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c index 8bc117bcc7bc..86e3c11d8486 100644 --- a/tools/lib/bpf/ringbuf.c +++ b/tools/lib/bpf/ringbuf.c @@ -39,6 +39,17 @@ struct ring_buffer { int ring_cnt; }; +struct ring_buffer_user { + struct epoll_event event; + unsigned long *consumer_pos; + unsigned long *producer_pos; + void *data; + unsigned long mask; + size_t page_size; + int map_fd; + int epoll_fd; +}; + static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r) { if (r->consumer_pos) { @@ -300,3 +311,208 @@ int ring_buffer__epoll_fd(const struct ring_buffer *rb) { return rb->epoll_fd; } + +static void __user_ringbuf_unmap_ring(struct ring_buffer_user *rb) +{ + if (rb->consumer_pos) { + munmap(rb->consumer_pos, rb->page_size); + rb->consumer_pos = NULL; + } + if (rb->producer_pos) { + munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1)); + rb->producer_pos = NULL; + } +} + +void ring_buffer_user__free(struct ring_buffer_user *rb) +{ + if (!rb) + return; + + __user_ringbuf_unmap_ring(rb); + + if (rb->epoll_fd >= 0) + close(rb->epoll_fd); + + free(rb); +} + +static int __ring_buffer_user_map(struct ring_buffer_user *rb, int map_fd) +{ + + struct bpf_map_info info; + __u32 len = sizeof(info); + void *tmp; + struct epoll_event *rb_epoll; + int err; + + memset(&info, 0, sizeof(info)); + + err = bpf_obj_get_info_by_fd(map_fd, &info, &len); + if (err) { + err = -errno; + pr_warn("user ringbuf: failed to get map info for fd=%d: %d\n", + map_fd, err); + return libbpf_err(err); + } + + if (info.type != BPF_MAP_TYPE_USER_RINGBUF) { + pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n", + map_fd); + return libbpf_err(-EINVAL); + } + + rb->map_fd = map_fd; + rb->mask = info.max_entries - 1; + + /* Map read-only consumer page */ + tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0); + if (tmp == MAP_FAILED) { + err = -errno; + pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %d\n", + map_fd, err); + return libbpf_err(err); + } + rb->consumer_pos = tmp; + + /* Map read-write the producer page and data pages. We map the data + * region as twice the total size of the ringbuffer to allow the simple + * reading and writing of samples that wrap around the end of the + * buffer. See the kernel implementation for details. + */ + tmp = mmap(NULL, rb->page_size + 2 * info.max_entries, + PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, rb->page_size); + if (tmp == MAP_FAILED) { + err = -errno; + pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n", + map_fd, err); + return libbpf_err(err); + } + + rb->producer_pos = tmp; + rb->data = tmp + rb->page_size; + + rb_epoll = &rb->event; + rb_epoll->events = EPOLLOUT; + if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) { + err = -errno; + pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n", + map_fd, err); + return libbpf_err(err); + } + + return 0; +} + +struct ring_buffer_user * +ring_buffer_user__new(int map_fd, const struct ring_buffer_user_opts *opts) +{ + struct ring_buffer_user *rb; + int err; + + if (!OPTS_VALID(opts, ring_buffer_opts)) + return errno = EINVAL, NULL; + + rb = calloc(1, sizeof(*rb)); + if (!rb) + return errno = ENOMEM, NULL; + + rb->page_size = getpagesize(); + + rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (rb->epoll_fd < 0) { + err = -errno; + pr_warn("user ringbuf: failed to create epoll instance: %d\n", + err); + goto err_out; + } + + err = __ring_buffer_user_map(rb, map_fd); + if (err) + goto err_out; + + return rb; + +err_out: + ring_buffer_user__free(rb); + return errno = -err, NULL; +} + +static void __ring_buffer_user__commit(struct ring_buffer_user *rb) +{ + uint32_t *hdr; + uint32_t total_len; + unsigned long prod_pos; + + prod_pos = *rb->producer_pos; + hdr = rb->data + (prod_pos & rb->mask); + + total_len = *hdr + BPF_RINGBUF_HDR_SZ; + + /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_poll() in + * the kernel. + */ + smp_store_release(rb->producer_pos, prod_pos + total_len); +} + +/* Discard a previously reserved sample into the ring buffer. Because the user + * ringbuffer is assumed to be single producer, this can simply be a no-op, and + * the producer pointer is left in the same place as when it was reserved. + */ +void ring_buffer_user__discard(struct ring_buffer_user *rb, void *sample) +{} + +/* Submit a previously reserved sample into the ring buffer. + */ +void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample) +{ + __ring_buffer_user__commit(rb); +} + +/* Reserve a pointer to a sample in the user ring buffer. This function is *not* + * thread safe, and the ring-buffer supports only a single producer. + */ +void *ring_buffer_user__reserve(struct ring_buffer_user *rb, uint32_t size) +{ + uint32_t *hdr; + /* 64-bit to avoid overflow in case of extreme application behavior */ + size_t avail_size, total_size, max_size; + unsigned long cons_pos, prod_pos; + + /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_poll() in + * the kernel. + */ + cons_pos = smp_load_acquire(rb->consumer_pos); + /* Synchronizes with smp_store_release() in __ring_buffer_user__commit() + */ + prod_pos = smp_load_acquire(rb->producer_pos); + + max_size = rb->mask + 1; + avail_size = max_size - (prod_pos - cons_pos); + total_size = size + BPF_RINGBUF_HDR_SZ; + + if (total_size > max_size || avail_size < total_size) + return NULL; + + hdr = rb->data + (prod_pos & rb->mask); + *hdr = size; + + /* Producer pos is updated when a sample is submitted. */ + + return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask); +} + +/* Poll for available space in the ringbuffer, and reserve a record when it + * becomes available. + */ +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size, + int timeout_ms) +{ + int cnt; + + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms); + if (cnt < 0) + return NULL; + + return ring_buffer_user__reserve(rb, size); +}
Now that all of the logic is in place in the kernel to support user-space produced ringbuffers, we can add the user-space logic to libbpf. Signed-off-by: David Vernet <void@manifault.com> --- kernel/bpf/ringbuf.c | 7 +- tools/lib/bpf/libbpf.c | 10 +- tools/lib/bpf/libbpf.h | 19 +++ tools/lib/bpf/libbpf.map | 6 + tools/lib/bpf/libbpf_probes.c | 1 + tools/lib/bpf/ringbuf.c | 216 ++++++++++++++++++++++++++++++++++ 6 files changed, 256 insertions(+), 3 deletions(-)