diff mbox series

PATCH? avoid the unnecessary wakeups in pipe_read()

Message ID 20241229135737.GA3293@redhat.com (mailing list archive)
State New
Headers show
Series PATCH? avoid the unnecessary wakeups in pipe_read() | expand

Commit Message

Oleg Nesterov Dec. 29, 2024, 1:57 p.m. UTC
The previous discussion was very confusing, let me start another thread.
This is orthogonal to the possible wq_has_sleeper() optimizations in fs/pipe.c
we discussed before.

Let me quote one of my previous emails. Consider

	int main(void)
	{
		int fd[2], cnt;
		char c;

		pipe(fd);

		if (!fork()) {
			// wait until the parent blocks in pipe_write() ->
			// wait_event_interruptible_exclusive(pipe->wr_wait, pipe_writable(pipe));
			sleep(1);

			for (cnt = 0; cnt < 4096; ++cnt)
				read(fd[0], &c, 1);
			return 0;
		}

		// parent
		for (;;)
			write(fd[1], &c, 1);
	}

If I read this code correctly, in this case the child will wakeup the parent
4095 times for no reason, pipe_writable() == !pipe_pull() will still be true
until the last read(fd[0], &c, 1) does

	if (!buf->len)
		tail = pipe_update_tail(pipe, buf, tail);

and after that the parent can write the next char.

Does the patch below make sense? With this patch pipe_read() wakes the
writer up only when pipe_full() changes from T to F.

Still incomplete, obviously not for inclusion. But is it correct or not?
I am not sure I understand this nontrivial logic...

Oleg.
---

Comments

Linus Torvalds Dec. 29, 2024, 5:27 p.m. UTC | #1
On Sun, 29 Dec 2024 at 05:58, Oleg Nesterov <oleg@redhat.com> wrote:
>
> If I read this code correctly, in this case the child will wakeup the parent
> 4095 times for no reason, pipe_writable() == !pipe_pull() will still be true
> until the last read(fd[0], &c, 1) does

Ack, that patch looks sane to me.

Only wake writer if we actually released a pipe slot, and it was full
before we did so.

Makes sense.

                Linus
Oleg Nesterov Jan. 2, 2025, 4:33 p.m. UTC | #2
I was going to send a one-liner patch which adds mb() into pipe_poll()
but then I decided to make even more spam and ask some questions first.

	static void wakeup_pipe_readers(struct pipe_inode_info *pipe)
	{
		smp_mb();
		if (waitqueue_active(&pipe->rd_wait))
			wake_up_interruptible(&pipe->rd_wait);
		kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
	}

I think that wq_has_sleeper() + wake_up_interruptible_poll(POLLIN) make more
sense but this is minor.

Either way the waitqueue_active() check is only correct if the waiter has a
barrier between __add_wait_queue() and "check the condition". wait_event()
is fine, but pipe_poll() does:

	// poll_wait()
	__pollwait() -> add_wait_queue(pipe->rd_wait) -> list_add()

	READ_ONCE(pipe->head);
	READ_ONCE(pipe->tail);

In theory these LOAD's can leak into the critical section in add_wait_queue()
and they can happen before list_add(entry, rd_wait.head).

So I think we need the trivial

	--- a/fs/pipe.c
	+++ b/fs/pipe.c
	@@ -680,6 +680,7 @@ pipe_poll(struct file *filp, poll_table *wait)
		 * if something changes and you got it wrong, the poll
		 * table entry will wake you up and fix it.
		 */
	+	smp_mb();
		head = READ_ONCE(pipe->head);
		tail = READ_ONCE(pipe->tail);

and after that pipe_read/pipe_write can use the wq_has_sleeper() check too
(this is what the patch from WangYuli did).

-------------------------------------------------------------------------------
But perhaps this mb() should go into __pollwait() ? We can have more
waitqueue_active() users which do not take .poll() into account...

The are more init_poll_funcptr()'s, but at least epoll looks fine,
epi_fget() in ep_item_poll() provides a full barrier before vfs_poll().

-------------------------------------------------------------------------------
Or really add mb() into __add_wait_queue/__add_wait_queue_entry_tail as
Manfred suggests? Somehow I am not sure about this change.

Oleg.
diff mbox series

Patch

diff --git a/fs/pipe.c b/fs/pipe.c
index 12b22c2723b7..27ffb650f131 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -253,7 +253,7 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 	size_t total_len = iov_iter_count(to);
 	struct file *filp = iocb->ki_filp;
 	struct pipe_inode_info *pipe = filp->private_data;
-	bool was_full, wake_next_reader = false;
+	bool wake_writer = false, wake_next_reader = false;
 	ssize_t ret;
 
 	/* Null read succeeds. */
@@ -271,7 +271,6 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 	 * (WF_SYNC), because we want them to get going and generate more
 	 * data for us.
 	 */
-	was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
 	for (;;) {
 		/* Read ->head with a barrier vs post_one_notification() */
 		unsigned int head = smp_load_acquire(&pipe->head);
@@ -340,8 +339,10 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 				buf->len = 0;
 			}
 
-			if (!buf->len)
+			if (!buf->len) {
+				wake_writer |= pipe_full(head, tail, pipe->max_usage);
 				tail = pipe_update_tail(pipe, buf, tail);
+			}
 			total_len -= chars;
 			if (!total_len)
 				break;	/* common path: read succeeded */
@@ -377,7 +378,7 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 		 * _very_ unlikely case that the pipe was full, but we got
 		 * no data.
 		 */
-		if (unlikely(was_full))
+		if (unlikely(wake_writer))
 			wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
 		kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
 
@@ -391,14 +392,14 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 			return -ERESTARTSYS;
 
 		mutex_lock(&pipe->mutex);
-		was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
 		wake_next_reader = true;
+		wake_writer = false;
 	}
 	if (pipe_empty(pipe->head, pipe->tail))
 		wake_next_reader = false;
 	mutex_unlock(&pipe->mutex);
 
-	if (was_full)
+	if (wake_writer)
 		wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
 	if (wake_next_reader)
 		wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);