Message ID | 20241229135737.GA3293@redhat.com (mailing list archive) |
---|---|
State | New |
Headers | show |
Series | PATCH? avoid the unnecessary wakeups in pipe_read() | expand |
On Sun, 29 Dec 2024 at 05:58, Oleg Nesterov <oleg@redhat.com> wrote: > > If I read this code correctly, in this case the child will wakeup the parent > 4095 times for no reason, pipe_writable() == !pipe_pull() will still be true > until the last read(fd[0], &c, 1) does Ack, that patch looks sane to me. Only wake writer if we actually released a pipe slot, and it was full before we did so. Makes sense. Linus
I was going to send a one-liner patch which adds mb() into pipe_poll() but then I decided to make even more spam and ask some questions first. static void wakeup_pipe_readers(struct pipe_inode_info *pipe) { smp_mb(); if (waitqueue_active(&pipe->rd_wait)) wake_up_interruptible(&pipe->rd_wait); kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN); } I think that wq_has_sleeper() + wake_up_interruptible_poll(POLLIN) make more sense but this is minor. Either way the waitqueue_active() check is only correct if the waiter has a barrier between __add_wait_queue() and "check the condition". wait_event() is fine, but pipe_poll() does: // poll_wait() __pollwait() -> add_wait_queue(pipe->rd_wait) -> list_add() READ_ONCE(pipe->head); READ_ONCE(pipe->tail); In theory these LOAD's can leak into the critical section in add_wait_queue() and they can happen before list_add(entry, rd_wait.head). So I think we need the trivial --- a/fs/pipe.c +++ b/fs/pipe.c @@ -680,6 +680,7 @@ pipe_poll(struct file *filp, poll_table *wait) * if something changes and you got it wrong, the poll * table entry will wake you up and fix it. */ + smp_mb(); head = READ_ONCE(pipe->head); tail = READ_ONCE(pipe->tail); and after that pipe_read/pipe_write can use the wq_has_sleeper() check too (this is what the patch from WangYuli did). ------------------------------------------------------------------------------- But perhaps this mb() should go into __pollwait() ? We can have more waitqueue_active() users which do not take .poll() into account... The are more init_poll_funcptr()'s, but at least epoll looks fine, epi_fget() in ep_item_poll() provides a full barrier before vfs_poll(). ------------------------------------------------------------------------------- Or really add mb() into __add_wait_queue/__add_wait_queue_entry_tail as Manfred suggests? Somehow I am not sure about this change. Oleg.
diff --git a/fs/pipe.c b/fs/pipe.c index 12b22c2723b7..27ffb650f131 100644 --- a/fs/pipe.c +++ b/fs/pipe.c @@ -253,7 +253,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) size_t total_len = iov_iter_count(to); struct file *filp = iocb->ki_filp; struct pipe_inode_info *pipe = filp->private_data; - bool was_full, wake_next_reader = false; + bool wake_writer = false, wake_next_reader = false; ssize_t ret; /* Null read succeeds. */ @@ -271,7 +271,6 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) * (WF_SYNC), because we want them to get going and generate more * data for us. */ - was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage); for (;;) { /* Read ->head with a barrier vs post_one_notification() */ unsigned int head = smp_load_acquire(&pipe->head); @@ -340,8 +339,10 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) buf->len = 0; } - if (!buf->len) + if (!buf->len) { + wake_writer |= pipe_full(head, tail, pipe->max_usage); tail = pipe_update_tail(pipe, buf, tail); + } total_len -= chars; if (!total_len) break; /* common path: read succeeded */ @@ -377,7 +378,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) * _very_ unlikely case that the pipe was full, but we got * no data. */ - if (unlikely(was_full)) + if (unlikely(wake_writer)) wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM); kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT); @@ -391,14 +392,14 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) return -ERESTARTSYS; mutex_lock(&pipe->mutex); - was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage); wake_next_reader = true; + wake_writer = false; } if (pipe_empty(pipe->head, pipe->tail)) wake_next_reader = false; mutex_unlock(&pipe->mutex); - if (was_full) + if (wake_writer) wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM); if (wake_next_reader) wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);