Message ID | 20240805173234.3542917-3-vdonnefort@google.com (mailing list archive) |
---|---|
State | Superseded |
Headers | show |
Series | Tracefs support for pKVM | expand |
On Mon, 5 Aug 2024 18:32:25 +0100 Vincent Donnefort <vdonnefort@google.com> wrote: > + > +#define for_each_rb_page_desc(__pdesc, __cpu, __trace_pdesc) \ > + for (__pdesc = (struct rb_page_desc *)&((__trace_pdesc)->__data[0]), __cpu = 0; \ > + __cpu < (__trace_pdesc)->nr_cpus; \ > + __cpu++, __pdesc = __next_rb_page_desc(__pdesc)) > + > +static inline > +struct rb_page_desc *rb_page_desc(struct trace_page_desc *trace_pdesc, int cpu) > +{ > + struct rb_page_desc *pdesc; > + int i; > + > + if (!trace_pdesc) > + return NULL; > + > + for_each_rb_page_desc(pdesc, i, trace_pdesc) { > + if (pdesc->cpu == cpu) Is there a reason for the linear search? Why not just: if (cpu >= trace_pdesc->nr_cpus) return NULL; len = struct_size(pdesc, page_va, pdesc->nr_page_va); pdesc = (void *)&(trace_pdesc->__data[0]); return pdesc + len * cpu; (note I don't think you need to typecast the void pointer). > + return pdesc; > + } > + > + return NULL; > +} > + > +static inline > +void *rb_page_desc_page(struct rb_page_desc *pdesc, int page_id) > +{ > + return page_id > pdesc->nr_page_va ? NULL : (void *)pdesc->page_va[page_id]; > +} > + > +struct ring_buffer_writer { > + struct trace_page_desc *pdesc; > + int (*get_reader_page)(int cpu); > +}; > + > +int ring_buffer_poll_writer(struct trace_buffer *buffer, int cpu); > + > +#define ring_buffer_reader(writer) \ > +({ \ > + static struct lock_class_key __key; \ > + __ring_buffer_alloc(0, RB_FL_OVERWRITE, &__key, writer);\ > +}) > #endif /* _LINUX_RING_BUFFER_H */ > diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c > index f4f4dda28077..a07c22254cfd 100644 > --- a/kernel/trace/ring_buffer.c > +++ b/kernel/trace/ring_buffer.c > @@ -495,6 +495,8 @@ struct ring_buffer_per_cpu { > unsigned long *subbuf_ids; /* ID to subbuf VA */ > struct trace_buffer_meta *meta_page; > > + struct ring_buffer_writer *writer; > + > /* ring buffer pages to update, > 0 to add, < 0 to remove */ > long nr_pages_to_update; > struct list_head new_pages; /* new pages to add */ > @@ -517,6 +519,8 @@ struct trace_buffer { > > struct ring_buffer_per_cpu **buffers; > > + struct ring_buffer_writer *writer; > + > struct hlist_node node; > u64 (*clock)(void); > > @@ -1626,6 +1630,31 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) > > cpu_buffer->reader_page = bpage; > > + if (buffer->writer) { > + struct rb_page_desc *pdesc = rb_page_desc(buffer->writer->pdesc, cpu); > + > + if (!pdesc) > + goto fail_free_reader; > + > + cpu_buffer->writer = buffer->writer; > + cpu_buffer->meta_page = (struct trace_buffer_meta *)(void *)pdesc->meta_va; > + cpu_buffer->subbuf_ids = pdesc->page_va; > + cpu_buffer->nr_pages = pdesc->nr_page_va - 1; > + atomic_inc(&cpu_buffer->record_disabled); > + atomic_inc(&cpu_buffer->resize_disabled); > + > + bpage->page = rb_page_desc_page(pdesc, > + cpu_buffer->meta_page->reader.id); > + if (!bpage->page) > + goto fail_free_reader; > + /* > + * The meta-page can only describe which of the ring-buffer page > + * is the reader. There is no need to init the rest of the > + * ring-buffer. > + */ > + return cpu_buffer; > + } > + > page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_COMP | __GFP_ZERO, > cpu_buffer->buffer->subbuf_order); > if (!page) > @@ -1663,6 +1692,10 @@ static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) > > irq_work_sync(&cpu_buffer->irq_work.work); > > + if (cpu_buffer->writer) > + /* the ring_buffer doesn't own the data pages */ > + cpu_buffer->reader_page->page = NULL; Note, if statements are to have brackets if it's more than one line. That even includes comments. So the above should be written either as: if (cpu_buffer->writer) { /* the ring_buffer doesn't own the data pages */ cpu_buffer->reader_page->page = NULL; } Or /* the ring_buffer doesn't own the data pages */ if (cpu_buffer->writer) cpu_buffer->reader_page->page = NULL; For the second version, you should probably add more detail: /* ring_buffers with writer set do not own the data pages */ if (cpu_buffer->writer) cpu_buffer->reader_page->page = NULL; > + > free_buffer_page(cpu_buffer->reader_page); > > if (head) { > @@ -1693,7 +1726,8 @@ static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) > * drop data when the tail hits the head. > */ > struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, > - struct lock_class_key *key) > + struct lock_class_key *key, > + struct ring_buffer_writer *writer) > { > struct trace_buffer *buffer; > long nr_pages; > @@ -1721,6 +1755,10 @@ struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, > buffer->flags = flags; > buffer->clock = trace_clock_local; > buffer->reader_lock_key = key; > + if (writer) { > + buffer->writer = writer; Should probably add a comment here: /* The writer is external and never done by the kernel */ or something like that. > + atomic_inc(&buffer->record_disabled); > + } > -- Steve > init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); > init_waitqueue_head(&buffer->irq_work.waiters); > @@ -4468,8 +4506,54 @@ rb_update_iter_read_stamp(struct ring_buffer_iter *iter, > } > } > > +static bool rb_read_writer_meta_page(struct ring_buffer_per_cpu *cpu_buffer) > +{ > + local_set(&cpu_buffer->entries, READ_ONCE(cpu_buffer->meta_page->entries)); > + local_set(&cpu_buffer->overrun, READ_ONCE(cpu_buffer->meta_page->overrun)); > + local_set(&cpu_buffer->pages_touched, READ_ONCE(meta_pages_touched(cpu_buffer->meta_page))); > + local_set(&cpu_buffer->pages_lost, READ_ONCE(meta_pages_lost(cpu_buffer->meta_page))); > + /* > + * No need to get the "read" field, it can be tracked here as any > + * reader will have to go through a rign_buffer_per_cpu. > + */ > + > + return rb_num_of_entries(cpu_buffer); > +} > + > +static struct buffer_page * > +__rb_get_reader_page_from_writer(struct ring_buffer_per_cpu *cpu_buffer) > +{ > + u32 prev_reader; > + > + if (!rb_read_writer_meta_page(cpu_buffer)) > + return NULL; > + > + /* More to read on the reader page */ > + if (cpu_buffer->reader_page->read < rb_page_size(cpu_buffer->reader_page)) > + return cpu_buffer->reader_page; > + > + prev_reader = cpu_buffer->meta_page->reader.id; > + > + WARN_ON(cpu_buffer->writer->get_reader_page(cpu_buffer->cpu)); > + /* nr_pages doesn't include the reader page */ > + if (cpu_buffer->meta_page->reader.id > cpu_buffer->nr_pages) { > + WARN_ON(1); > + return NULL; > + } > + > + cpu_buffer->reader_page->page = > + (void *)cpu_buffer->subbuf_ids[cpu_buffer->meta_page->reader.id]; > + cpu_buffer->reader_page->read = 0; > + cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; > + cpu_buffer->lost_events = cpu_buffer->meta_page->reader.lost_events; > + > + WARN_ON(prev_reader == cpu_buffer->meta_page->reader.id); > + > + return cpu_buffer->reader_page; > +} > + > static struct buffer_page * > -rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) > +__rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) > { > struct buffer_page *reader = NULL; > unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); > @@ -4636,6 +4720,13 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) > return reader; > } > > +static struct buffer_page * > +rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) > +{ > + return cpu_buffer->writer ? __rb_get_reader_page_from_writer(cpu_buffer) : > + __rb_get_reader_page(cpu_buffer); > +} > + > static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) > { > struct ring_buffer_event *event; > @@ -5040,7 +5131,7 @@ ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) > struct ring_buffer_per_cpu *cpu_buffer; > struct ring_buffer_iter *iter; > > - if (!cpumask_test_cpu(cpu, buffer->cpumask)) > + if (!cpumask_test_cpu(cpu, buffer->cpumask) || buffer->writer) > return NULL; > > iter = kzalloc(sizeof(*iter), flags); > @@ -5210,6 +5301,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) > { > struct buffer_page *page; > > + if (cpu_buffer->writer) > + return; > + > rb_head_page_deactivate(cpu_buffer); > > cpu_buffer->head_page > @@ -5440,6 +5534,49 @@ bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) > } > EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); > > +int ring_buffer_poll_writer(struct trace_buffer *buffer, int cpu) > +{ > + struct ring_buffer_per_cpu *cpu_buffer; > + unsigned long flags; > + > + if (cpu != RING_BUFFER_ALL_CPUS) { > + if (!cpumask_test_cpu(cpu, buffer->cpumask)) > + return -EINVAL; > + > + cpu_buffer = buffer->buffers[cpu]; > + > + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); > + if (rb_read_writer_meta_page(cpu_buffer)) > + rb_wakeups(buffer, cpu_buffer); > + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); > + > + return 0; > + } > + > + /* > + * Make sure all the ring buffers are up to date before we start reading > + * them. > + */ > + for_each_buffer_cpu(buffer, cpu) { > + cpu_buffer = buffer->buffers[cpu]; > + > + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); > + rb_read_writer_meta_page(buffer->buffers[cpu]); > + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); > + } > + > + for_each_buffer_cpu(buffer, cpu) { > + cpu_buffer = buffer->buffers[cpu]; > + > + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); > + if (rb_num_of_entries(cpu_buffer)) > + rb_wakeups(buffer, buffer->buffers[cpu]); > + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); > + } > + > + return 0; > +} > + > #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP > /** > * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers > @@ -5691,6 +5828,7 @@ int ring_buffer_read_page(struct trace_buffer *buffer, > unsigned int commit; > unsigned int read; > u64 save_timestamp; > + bool force_memcpy; > int ret = -1; > > if (!cpumask_test_cpu(cpu, buffer->cpumask)) > @@ -5728,6 +5866,8 @@ int ring_buffer_read_page(struct trace_buffer *buffer, > /* Check if any events were dropped */ > missed_events = cpu_buffer->lost_events; > > + force_memcpy = cpu_buffer->mapped || cpu_buffer->writer; > + > /* > * If this page has been partially read or > * if len is not big enough to read the rest of the page or > @@ -5737,7 +5877,7 @@ int ring_buffer_read_page(struct trace_buffer *buffer, > */ > if (read || (len < (commit - read)) || > cpu_buffer->reader_page == cpu_buffer->commit_page || > - cpu_buffer->mapped) { > + force_memcpy) { > struct buffer_data_page *rpage = cpu_buffer->reader_page->page; > unsigned int rpos = read; > unsigned int pos = 0; > @@ -6290,7 +6430,7 @@ int ring_buffer_map(struct trace_buffer *buffer, int cpu, > unsigned long flags, *subbuf_ids; > int err = 0; > > - if (!cpumask_test_cpu(cpu, buffer->cpumask)) > + if (!cpumask_test_cpu(cpu, buffer->cpumask) || buffer->writer) > return -EINVAL; > > cpu_buffer = buffer->buffers[cpu];
Sorry I have been preempted before I can answer here: On Tue, Aug 06, 2024 at 04:39:53PM -0400, Steven Rostedt wrote: > On Mon, 5 Aug 2024 18:32:25 +0100 > Vincent Donnefort <vdonnefort@google.com> wrote: > > > + > > +#define for_each_rb_page_desc(__pdesc, __cpu, __trace_pdesc) \ > > + for (__pdesc = (struct rb_page_desc *)&((__trace_pdesc)->__data[0]), __cpu = 0; \ > > + __cpu < (__trace_pdesc)->nr_cpus; \ > > + __cpu++, __pdesc = __next_rb_page_desc(__pdesc)) > > + > > +static inline > > +struct rb_page_desc *rb_page_desc(struct trace_page_desc *trace_pdesc, int cpu) > > +{ > > + struct rb_page_desc *pdesc; > > + int i; > > + > > + if (!trace_pdesc) > > + return NULL; > > + > > + for_each_rb_page_desc(pdesc, i, trace_pdesc) { > > + if (pdesc->cpu == cpu) > > Is there a reason for the linear search? > > Why not just: > > if (cpu >= trace_pdesc->nr_cpus) > return NULL; > > len = struct_size(pdesc, page_va, pdesc->nr_page_va); > pdesc = (void *)&(trace_pdesc->__data[0]); > > return pdesc + len * cpu; > > (note I don't think you need to typecast the void pointer). I supposed we can't assume buffers will be allocated for each CPU, hence the need to look at each buffer. > > > + return pdesc; > > + } > > + > > + return NULL; > > +} > > + > > +static inline > > +void *rb_page_desc_page(struct rb_page_desc *pdesc, int page_id) > > +{ > > + return page_id > pdesc->nr_page_va ? NULL : (void *)pdesc->page_va[page_id]; > > +} > > + > > +struct ring_buffer_writer { > > + struct trace_page_desc *pdesc; > > + int (*get_reader_page)(int cpu); > > +}; > > + > > +int ring_buffer_poll_writer(struct trace_buffer *buffer, int cpu); > > + > > +#define ring_buffer_reader(writer) \ > > +({ \ > > + static struct lock_class_key __key; \ > > + __ring_buffer_alloc(0, RB_FL_OVERWRITE, &__key, writer);\ > > +}) > > #endif /* _LINUX_RING_BUFFER_H */ > > diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c > > index f4f4dda28077..a07c22254cfd 100644 > > --- a/kernel/trace/ring_buffer.c > > +++ b/kernel/trace/ring_buffer.c > > @@ -495,6 +495,8 @@ struct ring_buffer_per_cpu { > > unsigned long *subbuf_ids; /* ID to subbuf VA */ > > struct trace_buffer_meta *meta_page; > > > > + struct ring_buffer_writer *writer; > > + > > /* ring buffer pages to update, > 0 to add, < 0 to remove */ > > long nr_pages_to_update; > > struct list_head new_pages; /* new pages to add */ > > @@ -517,6 +519,8 @@ struct trace_buffer { > > > > struct ring_buffer_per_cpu **buffers; > > > > + struct ring_buffer_writer *writer; > > + > > struct hlist_node node; > > u64 (*clock)(void); > > > > @@ -1626,6 +1630,31 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) > > > > cpu_buffer->reader_page = bpage; > > > > + if (buffer->writer) { > > + struct rb_page_desc *pdesc = rb_page_desc(buffer->writer->pdesc, cpu); > > + > > + if (!pdesc) > > + goto fail_free_reader; > > + > > + cpu_buffer->writer = buffer->writer; > > + cpu_buffer->meta_page = (struct trace_buffer_meta *)(void *)pdesc->meta_va; > > + cpu_buffer->subbuf_ids = pdesc->page_va; > > + cpu_buffer->nr_pages = pdesc->nr_page_va - 1; > > + atomic_inc(&cpu_buffer->record_disabled); > > + atomic_inc(&cpu_buffer->resize_disabled); > > + > > + bpage->page = rb_page_desc_page(pdesc, > > + cpu_buffer->meta_page->reader.id); > > + if (!bpage->page) > > + goto fail_free_reader; > > + /* > > + * The meta-page can only describe which of the ring-buffer page > > + * is the reader. There is no need to init the rest of the > > + * ring-buffer. > > + */ > > + return cpu_buffer; > > + } > > + > > page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_COMP | __GFP_ZERO, > > cpu_buffer->buffer->subbuf_order); > > if (!page) > > @@ -1663,6 +1692,10 @@ static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) > > > > irq_work_sync(&cpu_buffer->irq_work.work); > > > > + if (cpu_buffer->writer) > > + /* the ring_buffer doesn't own the data pages */ > > + cpu_buffer->reader_page->page = NULL; > > Note, if statements are to have brackets if it's more than one line. That > even includes comments. So the above should be written either as: > > if (cpu_buffer->writer) { > /* the ring_buffer doesn't own the data pages */ > cpu_buffer->reader_page->page = NULL; > } > > Or > > /* the ring_buffer doesn't own the data pages */ > if (cpu_buffer->writer) > cpu_buffer->reader_page->page = NULL; > > For the second version, you should probably add more detail: > > /* ring_buffers with writer set do not own the data pages */ > if (cpu_buffer->writer) > cpu_buffer->reader_page->page = NULL; > > > > + > > free_buffer_page(cpu_buffer->reader_page); > > > > if (head) { > > @@ -1693,7 +1726,8 @@ static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) > > * drop data when the tail hits the head. > > */ > > struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, > > - struct lock_class_key *key) > > + struct lock_class_key *key, > > + struct ring_buffer_writer *writer) > > { > > struct trace_buffer *buffer; > > long nr_pages; > > @@ -1721,6 +1755,10 @@ struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, > > buffer->flags = flags; > > buffer->clock = trace_clock_local; > > buffer->reader_lock_key = key; > > + if (writer) { > > + buffer->writer = writer; > > Should probably add a comment here: > > /* The writer is external and never done by the kernel */ > > or something like that. > > > + atomic_inc(&buffer->record_disabled); > > + } > > > > -- Steve > [...]
On Tue, 13 Aug 2024 15:21:54 +0100 Vincent Donnefort <vdonnefort@google.com> wrote: > > > + > > > +#define for_each_rb_page_desc(__pdesc, __cpu, __trace_pdesc) \ > > > + for (__pdesc = (struct rb_page_desc *)&((__trace_pdesc)->__data[0]), __cpu = 0; \ > > > + __cpu < (__trace_pdesc)->nr_cpus; \ > > > + __cpu++, __pdesc = __next_rb_page_desc(__pdesc)) > > > + > > > +static inline > > > +struct rb_page_desc *rb_page_desc(struct trace_page_desc *trace_pdesc, int cpu) > > > +{ > > > + struct rb_page_desc *pdesc; > > > + int i; > > > + > > > + if (!trace_pdesc) > > > + return NULL; > > > + > > > + for_each_rb_page_desc(pdesc, i, trace_pdesc) { > > > + if (pdesc->cpu == cpu) > > > > Is there a reason for the linear search? > > > > Why not just: > > > > if (cpu >= trace_pdesc->nr_cpus) > > return NULL; > > > > len = struct_size(pdesc, page_va, pdesc->nr_page_va); > > pdesc = (void *)&(trace_pdesc->__data[0]); > > > > return pdesc + len * cpu; > > > > (note I don't think you need to typecast the void pointer). > > I supposed we can't assume buffers will be allocated for each CPU, hence the > need to look at each buffer. OK, but by default that should be the fast path. We could add the above and then do: pdesc += len * cpu; if (pdesc->cpu == cpu) return pdesc; /* Missing CPUs, need to do a linear search */ for_each_rb_page_desc(pdesc, i, trace_pdesc) { if (pdesc->cpu == cpu) [...] -- Steve
diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h index 96d2140b471e..d3ea2b40437c 100644 --- a/include/linux/ring_buffer.h +++ b/include/linux/ring_buffer.h @@ -83,21 +83,24 @@ u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, void ring_buffer_discard_commit(struct trace_buffer *buffer, struct ring_buffer_event *event); +struct ring_buffer_writer; + /* * size is in bytes for each per CPU buffer. */ struct trace_buffer * -__ring_buffer_alloc(unsigned long size, unsigned flags, struct lock_class_key *key); +__ring_buffer_alloc(unsigned long size, unsigned flags, struct lock_class_key *key, + struct ring_buffer_writer *writer); /* * Because the ring buffer is generic, if other users of the ring buffer get * traced by ftrace, it can produce lockdep warnings. We need to keep each * ring buffer's lock class separate. */ -#define ring_buffer_alloc(size, flags) \ -({ \ - static struct lock_class_key __key; \ - __ring_buffer_alloc((size), (flags), &__key); \ +#define ring_buffer_alloc(size, flags) \ +({ \ + static struct lock_class_key __key; \ + __ring_buffer_alloc((size), (flags), &__key, NULL); \ }) typedef bool (*ring_buffer_cond_fn)(void *data); @@ -229,4 +232,70 @@ int ring_buffer_map(struct trace_buffer *buffer, int cpu, struct vm_area_struct *vma); int ring_buffer_unmap(struct trace_buffer *buffer, int cpu); int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu); + +#define meta_pages_lost(__meta) \ + ((__meta)->Reserved1) +#define meta_pages_touched(__meta) \ + ((__meta)->Reserved2) + +struct rb_page_desc { + int cpu; + int nr_page_va; /* exclude the meta page */ + unsigned long meta_va; + unsigned long page_va[]; +}; + +struct trace_page_desc { + int nr_cpus; + char __data[]; /* list of rb_page_desc */ +}; + +static inline +struct rb_page_desc *__next_rb_page_desc(struct rb_page_desc *pdesc) +{ + size_t len = struct_size(pdesc, page_va, pdesc->nr_page_va); + + return (struct rb_page_desc *)((void *)pdesc + len); +} + +#define for_each_rb_page_desc(__pdesc, __cpu, __trace_pdesc) \ + for (__pdesc = (struct rb_page_desc *)&((__trace_pdesc)->__data[0]), __cpu = 0; \ + __cpu < (__trace_pdesc)->nr_cpus; \ + __cpu++, __pdesc = __next_rb_page_desc(__pdesc)) + +static inline +struct rb_page_desc *rb_page_desc(struct trace_page_desc *trace_pdesc, int cpu) +{ + struct rb_page_desc *pdesc; + int i; + + if (!trace_pdesc) + return NULL; + + for_each_rb_page_desc(pdesc, i, trace_pdesc) { + if (pdesc->cpu == cpu) + return pdesc; + } + + return NULL; +} + +static inline +void *rb_page_desc_page(struct rb_page_desc *pdesc, int page_id) +{ + return page_id > pdesc->nr_page_va ? NULL : (void *)pdesc->page_va[page_id]; +} + +struct ring_buffer_writer { + struct trace_page_desc *pdesc; + int (*get_reader_page)(int cpu); +}; + +int ring_buffer_poll_writer(struct trace_buffer *buffer, int cpu); + +#define ring_buffer_reader(writer) \ +({ \ + static struct lock_class_key __key; \ + __ring_buffer_alloc(0, RB_FL_OVERWRITE, &__key, writer);\ +}) #endif /* _LINUX_RING_BUFFER_H */ diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c index f4f4dda28077..a07c22254cfd 100644 --- a/kernel/trace/ring_buffer.c +++ b/kernel/trace/ring_buffer.c @@ -495,6 +495,8 @@ struct ring_buffer_per_cpu { unsigned long *subbuf_ids; /* ID to subbuf VA */ struct trace_buffer_meta *meta_page; + struct ring_buffer_writer *writer; + /* ring buffer pages to update, > 0 to add, < 0 to remove */ long nr_pages_to_update; struct list_head new_pages; /* new pages to add */ @@ -517,6 +519,8 @@ struct trace_buffer { struct ring_buffer_per_cpu **buffers; + struct ring_buffer_writer *writer; + struct hlist_node node; u64 (*clock)(void); @@ -1626,6 +1630,31 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) cpu_buffer->reader_page = bpage; + if (buffer->writer) { + struct rb_page_desc *pdesc = rb_page_desc(buffer->writer->pdesc, cpu); + + if (!pdesc) + goto fail_free_reader; + + cpu_buffer->writer = buffer->writer; + cpu_buffer->meta_page = (struct trace_buffer_meta *)(void *)pdesc->meta_va; + cpu_buffer->subbuf_ids = pdesc->page_va; + cpu_buffer->nr_pages = pdesc->nr_page_va - 1; + atomic_inc(&cpu_buffer->record_disabled); + atomic_inc(&cpu_buffer->resize_disabled); + + bpage->page = rb_page_desc_page(pdesc, + cpu_buffer->meta_page->reader.id); + if (!bpage->page) + goto fail_free_reader; + /* + * The meta-page can only describe which of the ring-buffer page + * is the reader. There is no need to init the rest of the + * ring-buffer. + */ + return cpu_buffer; + } + page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_COMP | __GFP_ZERO, cpu_buffer->buffer->subbuf_order); if (!page) @@ -1663,6 +1692,10 @@ static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) irq_work_sync(&cpu_buffer->irq_work.work); + if (cpu_buffer->writer) + /* the ring_buffer doesn't own the data pages */ + cpu_buffer->reader_page->page = NULL; + free_buffer_page(cpu_buffer->reader_page); if (head) { @@ -1693,7 +1726,8 @@ static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) * drop data when the tail hits the head. */ struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, - struct lock_class_key *key) + struct lock_class_key *key, + struct ring_buffer_writer *writer) { struct trace_buffer *buffer; long nr_pages; @@ -1721,6 +1755,10 @@ struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, buffer->flags = flags; buffer->clock = trace_clock_local; buffer->reader_lock_key = key; + if (writer) { + buffer->writer = writer; + atomic_inc(&buffer->record_disabled); + } init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); init_waitqueue_head(&buffer->irq_work.waiters); @@ -4468,8 +4506,54 @@ rb_update_iter_read_stamp(struct ring_buffer_iter *iter, } } +static bool rb_read_writer_meta_page(struct ring_buffer_per_cpu *cpu_buffer) +{ + local_set(&cpu_buffer->entries, READ_ONCE(cpu_buffer->meta_page->entries)); + local_set(&cpu_buffer->overrun, READ_ONCE(cpu_buffer->meta_page->overrun)); + local_set(&cpu_buffer->pages_touched, READ_ONCE(meta_pages_touched(cpu_buffer->meta_page))); + local_set(&cpu_buffer->pages_lost, READ_ONCE(meta_pages_lost(cpu_buffer->meta_page))); + /* + * No need to get the "read" field, it can be tracked here as any + * reader will have to go through a rign_buffer_per_cpu. + */ + + return rb_num_of_entries(cpu_buffer); +} + +static struct buffer_page * +__rb_get_reader_page_from_writer(struct ring_buffer_per_cpu *cpu_buffer) +{ + u32 prev_reader; + + if (!rb_read_writer_meta_page(cpu_buffer)) + return NULL; + + /* More to read on the reader page */ + if (cpu_buffer->reader_page->read < rb_page_size(cpu_buffer->reader_page)) + return cpu_buffer->reader_page; + + prev_reader = cpu_buffer->meta_page->reader.id; + + WARN_ON(cpu_buffer->writer->get_reader_page(cpu_buffer->cpu)); + /* nr_pages doesn't include the reader page */ + if (cpu_buffer->meta_page->reader.id > cpu_buffer->nr_pages) { + WARN_ON(1); + return NULL; + } + + cpu_buffer->reader_page->page = + (void *)cpu_buffer->subbuf_ids[cpu_buffer->meta_page->reader.id]; + cpu_buffer->reader_page->read = 0; + cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; + cpu_buffer->lost_events = cpu_buffer->meta_page->reader.lost_events; + + WARN_ON(prev_reader == cpu_buffer->meta_page->reader.id); + + return cpu_buffer->reader_page; +} + static struct buffer_page * -rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) +__rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) { struct buffer_page *reader = NULL; unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); @@ -4636,6 +4720,13 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) return reader; } +static struct buffer_page * +rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) +{ + return cpu_buffer->writer ? __rb_get_reader_page_from_writer(cpu_buffer) : + __rb_get_reader_page(cpu_buffer); +} + static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) { struct ring_buffer_event *event; @@ -5040,7 +5131,7 @@ ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) struct ring_buffer_per_cpu *cpu_buffer; struct ring_buffer_iter *iter; - if (!cpumask_test_cpu(cpu, buffer->cpumask)) + if (!cpumask_test_cpu(cpu, buffer->cpumask) || buffer->writer) return NULL; iter = kzalloc(sizeof(*iter), flags); @@ -5210,6 +5301,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) { struct buffer_page *page; + if (cpu_buffer->writer) + return; + rb_head_page_deactivate(cpu_buffer); cpu_buffer->head_page @@ -5440,6 +5534,49 @@ bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) } EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); +int ring_buffer_poll_writer(struct trace_buffer *buffer, int cpu) +{ + struct ring_buffer_per_cpu *cpu_buffer; + unsigned long flags; + + if (cpu != RING_BUFFER_ALL_CPUS) { + if (!cpumask_test_cpu(cpu, buffer->cpumask)) + return -EINVAL; + + cpu_buffer = buffer->buffers[cpu]; + + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); + if (rb_read_writer_meta_page(cpu_buffer)) + rb_wakeups(buffer, cpu_buffer); + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); + + return 0; + } + + /* + * Make sure all the ring buffers are up to date before we start reading + * them. + */ + for_each_buffer_cpu(buffer, cpu) { + cpu_buffer = buffer->buffers[cpu]; + + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); + rb_read_writer_meta_page(buffer->buffers[cpu]); + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); + } + + for_each_buffer_cpu(buffer, cpu) { + cpu_buffer = buffer->buffers[cpu]; + + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); + if (rb_num_of_entries(cpu_buffer)) + rb_wakeups(buffer, buffer->buffers[cpu]); + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); + } + + return 0; +} + #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP /** * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers @@ -5691,6 +5828,7 @@ int ring_buffer_read_page(struct trace_buffer *buffer, unsigned int commit; unsigned int read; u64 save_timestamp; + bool force_memcpy; int ret = -1; if (!cpumask_test_cpu(cpu, buffer->cpumask)) @@ -5728,6 +5866,8 @@ int ring_buffer_read_page(struct trace_buffer *buffer, /* Check if any events were dropped */ missed_events = cpu_buffer->lost_events; + force_memcpy = cpu_buffer->mapped || cpu_buffer->writer; + /* * If this page has been partially read or * if len is not big enough to read the rest of the page or @@ -5737,7 +5877,7 @@ int ring_buffer_read_page(struct trace_buffer *buffer, */ if (read || (len < (commit - read)) || cpu_buffer->reader_page == cpu_buffer->commit_page || - cpu_buffer->mapped) { + force_memcpy) { struct buffer_data_page *rpage = cpu_buffer->reader_page->page; unsigned int rpos = read; unsigned int pos = 0; @@ -6290,7 +6430,7 @@ int ring_buffer_map(struct trace_buffer *buffer, int cpu, unsigned long flags, *subbuf_ids; int err = 0; - if (!cpumask_test_cpu(cpu, buffer->cpumask)) + if (!cpumask_test_cpu(cpu, buffer->cpumask) || buffer->writer) return -EINVAL; cpu_buffer = buffer->buffers[cpu];
A ring-buffer writer is an entity outside of the kernel (most likely a firmware or a hypervisor) capable of writing events in a ring-buffer following the same format as the tracefs ring-buffer. To setup the ring-buffer on the kernel side, a description of the pages (struct trace_page_desc) is necessary. A callback (get_reader_page) must also be provided. It is called whenever it is done reading the previous reader page. It is expected from the writer to keep the meta-page updated. Signed-off-by: Vincent Donnefort <vdonnefort@google.com>