diff mbox series

ring-buffer: Limit time with disabled interrupts in rb_check_pages()

Message ID 20240621150956.24814-1-petr.pavlu@suse.com (mailing list archive)
State Superseded
Headers show
Series ring-buffer: Limit time with disabled interrupts in rb_check_pages() | expand

Commit Message

Petr Pavlu June 21, 2024, 3:09 p.m. UTC
Function rb_check_pages() validates the integrity of a specified per-CPU
tracing ring buffer. It does so by walking the underlying linked
list and checking its next and prev links.

To guarantee that the list doesn't get modified during the check,
a caller typically needs to take cpu_buffer->reader_lock. This prevents
the check from running concurrently with a potential reader which can
make the list temporarily inconsistent when swapping its old reader page
into the buffer.

A problem is that the time when interrupts are disabled is
non-deterministic, dependent on the ring buffer size. This particularly
affects PREEMPT_RT because the reader_lock is a raw spinlock which
doesn't become sleepable on PREEMPT_RT kernels.

Modify the check so it still tries to walk the whole list but gives up
the reader_lock between checking individual pages.

Signed-off-by: Petr Pavlu <petr.pavlu@suse.com>
---

This is a follow-up to the discussion in
https://lore.kernel.org/linux-trace-kernel/20240517134008.24529-1-petr.pavlu@suse.com/

 kernel/trace/ring_buffer.c | 96 +++++++++++++++++++++++++++-----------
 1 file changed, 70 insertions(+), 26 deletions(-)


base-commit: 50736169ecc8387247fe6a00932852ce7b057083

Comments

Petr Pavlu June 23, 2024, 2:54 p.m. UTC | #1
On 6/21/24 17:09, Petr Pavlu wrote:
> Function rb_check_pages() validates the integrity of a specified per-CPU
> tracing ring buffer. It does so by walking the underlying linked
> list and checking its next and prev links.
> 
> To guarantee that the list doesn't get modified during the check,
> a caller typically needs to take cpu_buffer->reader_lock. This prevents
> the check from running concurrently with a potential reader which can
> make the list temporarily inconsistent when swapping its old reader page
> into the buffer.
> 
> A problem is that the time when interrupts are disabled is
> non-deterministic, dependent on the ring buffer size. This particularly
> affects PREEMPT_RT because the reader_lock is a raw spinlock which
> doesn't become sleepable on PREEMPT_RT kernels.
> 
> Modify the check so it still tries to walk the whole list but gives up
> the reader_lock between checking individual pages.

Sorry, I'm afraid this patch is actually buggy. It considers only the
case when rb_check_pages() is called by ring_buffer_resize() and the
ring-buffer list is potentially modified by a reader swapping its page
into the buffer from rb_get_reader_page(). However, I suspect the
opposite can happen, rb_check_pages() called by a completing reader from
ring_buffer_read_finish() and rb_remove_pages()/rb_insert_pages()
concurrently modifying the ring-buffer pages. This needs to be handled
as well.

I'm at a conference next week, I'll have a closer look afterwards.
diff mbox series

Patch

diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 28853966aa9a..3aefaf8a4d58 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -1454,40 +1454,91 @@  static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
 	RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK);
 }
 
+static bool rb_check_links(struct ring_buffer_per_cpu *cpu_buffer,
+			   struct list_head *list)
+{
+	if (RB_WARN_ON(cpu_buffer,
+		       rb_list_head(rb_list_head(list->next)->prev) != list))
+		return false;
+
+	if (RB_WARN_ON(cpu_buffer,
+		       rb_list_head(rb_list_head(list->prev)->next) != list))
+		return false;
+
+	return true;
+}
+
 /**
  * rb_check_pages - integrity check of buffer pages
  * @cpu_buffer: CPU buffer with pages to test
  *
  * As a safety measure we check to make sure the data pages have not
  * been corrupted.
- *
- * Callers of this function need to guarantee that the list of pages doesn't get
- * modified during the check. In particular, if it's possible that the function
- * is invoked with concurrent readers which can swap in a new reader page then
- * the caller should take cpu_buffer->reader_lock.
  */
 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
 {
-	struct list_head *head = rb_list_head(cpu_buffer->pages);
-	struct list_head *tmp;
+	struct list_head *head, *tmp;
+	unsigned long flags;
+	size_t pages_read;
+	int nr_loops = 0;
 
-	if (RB_WARN_ON(cpu_buffer,
-			rb_list_head(rb_list_head(head->next)->prev) != head))
+	/*
+	 * Walk the linked list underpinning the ring buffer and validate all
+	 * its next and prev links.
+	 *
+	 * The check acquires the reader_lock in order to avoid concurrent
+	 * processing with a potential reader which can make the list
+	 * temporarily inconsistent when swapping its old reader page into the
+	 * buffer and obtaining a new page. However, the lock cannot be held for
+	 * the whole duration of the walk, as this would make the time when
+	 * interrupts are disabled non-deterministic, dependent on the ring
+	 * buffer size. Therefore, the code releases and re-acquires the lock
+	 * after checking each page. The pages_read variable is then used to
+	 * detect if a reader changed the list while the lock was not held, in
+	 * which case the check needs to be restarted.
+	 *
+	 * The code attempts to perform the check at most three times before
+	 * giving up. That itself is ok because this is only a self-validation
+	 * to detect problems early on. In practice, if it even happens that
+	 * this code runs concurrently with a reader getting a new page from the
+	 * buffer, it should take the reader a bit to process the obtained page
+	 * before coming back for more data and so this check typically succeeds
+	 * at most on the second try.
+	 */
+again:
+	if (++nr_loops > 3)
 		return;
 
-	if (RB_WARN_ON(cpu_buffer,
-			rb_list_head(rb_list_head(head->prev)->next) != head))
-		return;
+	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+	head = rb_list_head(cpu_buffer->pages);
+	if (!rb_check_links(cpu_buffer, head))
+		goto out_locked;
+	pages_read = local_read(&cpu_buffer->pages_read);
+	tmp = head;
+	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
-	for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) {
-		if (RB_WARN_ON(cpu_buffer,
-				rb_list_head(rb_list_head(tmp->next)->prev) != tmp))
-			return;
+	while (true) {
+		raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
 
-		if (RB_WARN_ON(cpu_buffer,
-				rb_list_head(rb_list_head(tmp->prev)->next) != tmp))
-			return;
+		if (pages_read != local_read(&cpu_buffer->pages_read)) {
+			/* A reader updated the list, try again. */
+			raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+			goto again;
+		}
+
+		tmp = rb_list_head(tmp->next);
+		if (tmp == head)
+			/* The iteration circled back, all is done. */
+			goto out_locked;
+
+		if (!rb_check_links(cpu_buffer, tmp))
+			goto out_locked;
+
+		raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 	}
+
+out_locked:
+	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 }
 
 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
@@ -2215,12 +2266,8 @@  int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size,
 		 */
 		synchronize_rcu();
 		for_each_buffer_cpu(buffer, cpu) {
-			unsigned long flags;
-
 			cpu_buffer = buffer->buffers[cpu];
-			raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
 			rb_check_pages(cpu_buffer);
-			raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 		}
 		atomic_dec(&buffer->record_disabled);
 	}
@@ -5150,12 +5197,9 @@  void
 ring_buffer_read_finish(struct ring_buffer_iter *iter)
 {
 	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
-	unsigned long flags;
 
 	/* Use this opportunity to check the integrity of the ring buffer. */
-	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
 	rb_check_pages(cpu_buffer);
-	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
 	atomic_dec(&cpu_buffer->resize_disabled);
 	kfree(iter->event);