@@ -834,8 +834,11 @@ void free_pipe_info(struct pipe_inode_info *pipe)
unsigned int i;
#ifdef CONFIG_WATCH_QUEUE
- if (pipe->watch_queue)
+ if (pipe->watch_queue) {
watch_queue_clear(pipe->watch_queue);
+ smp_cond_load_relaxed(&pipe->watch_queue->state,
+ (VAL & WATCH_QUEUE_POST_CNT_MASK) == 0);
+ }
#endif
(void) account_pipe_buffers(pipe->user, pipe->nr_accounted, 0);
@@ -35,6 +35,7 @@ struct watch_filter {
struct watch_type_filter filters[];
};
+#define WATCH_QUEUE_POST_CNT_MASK GENMASK(30, 0)
struct watch_queue {
struct rcu_head rcu;
struct watch_filter __rcu *filter;
@@ -46,7 +47,18 @@ struct watch_queue {
spinlock_t lock;
unsigned int nr_notes; /* Number of notes */
unsigned int nr_pages; /* Number of pages in notes[] */
- bool defunct; /* T when queues closed */
+ union {
+ struct {
+#ifdef __LITTLE_ENDIAN
+ u32 post_cnt:31; /* How many threads are posting notification */
+ u32 defunct:1; /* T when queues closed */
+#else
+ u32 defunct:1; /* T when queues closed */
+ u32 post_cnt:31; /* How many threads are posting notification */
+#endif
+ };
+ u32 state;
+ };
};
/*
@@ -33,6 +33,8 @@ MODULE_AUTHOR("Red Hat, Inc.");
#define WATCH_QUEUE_NOTE_SIZE 128
#define WATCH_QUEUE_NOTES_PER_PAGE (PAGE_SIZE / WATCH_QUEUE_NOTE_SIZE)
+static void put_watch(struct watch *watch);
+
/*
* This must be called under the RCU read-lock, which makes
* sure that the wqueue still exists. It can then take the lock,
@@ -88,24 +90,40 @@ static const struct pipe_buf_operations watch_queue_pipe_buf_ops = {
};
/*
- * Post a notification to a watch queue.
- *
- * Must be called with the RCU lock for reading, and the
- * watch_queue lock held, which guarantees that the pipe
- * hasn't been released.
+ * Post a notification to a watch queue with RCU lock held.
*/
-static bool post_one_notification(struct watch_queue *wqueue,
+static bool post_one_notification(struct watch *watch,
struct watch_notification *n)
{
void *p;
- struct pipe_inode_info *pipe = wqueue->pipe;
+ struct watch_queue *wqueue;
+ struct pipe_inode_info *pipe;
struct pipe_buffer *buf;
struct page *page;
unsigned int head, tail, mask, note, offset, len;
bool done = false;
+ u32 state;
+
+ if (!kref_get_unless_zero(&watch->usage))
+ return false;
+ wqueue = rcu_dereference(watch->queue);
+
+ pipe = wqueue->pipe;
- if (!pipe)
+ if (!pipe) {
+ put_watch(watch);
return false;
+ }
+
+ do {
+ if (wqueue->defunct) {
+ put_watch(watch);
+ return false;
+ }
+ state = wqueue->state;
+ } while (cmpxchg(&wqueue->state, state, state + 1) != state);
+
+ rcu_read_unlock();
spin_lock_irq(&pipe->rd_wait.lock);
@@ -145,6 +163,12 @@ static bool post_one_notification(struct watch_queue *wqueue,
out:
spin_unlock_irq(&pipe->rd_wait.lock);
+ do {
+ state = wqueue->state;
+ } while (cmpxchg(&wqueue->state, state, state - 1) != state);
+
+ rcu_read_lock();
+ put_watch(watch);
if (done)
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
return done;
@@ -224,10 +248,7 @@ void __post_watch_notification(struct watch_list *wlist,
if (security_post_notification(watch->cred, cred, n) < 0)
continue;
- if (lock_wqueue(wqueue)) {
- post_one_notification(wqueue, n);
- unlock_wqueue(wqueue);
- }
+ post_one_notification(watch, n);
}
rcu_read_unlock();
@@ -560,8 +581,8 @@ int remove_watch_from_object(struct watch_list *wlist, struct watch_queue *wq,
wqueue = rcu_dereference(watch->queue);
+ post_one_notification(watch, &n.watch);
if (lock_wqueue(wqueue)) {
- post_one_notification(wqueue, &n.watch);
if (!hlist_unhashed(&watch->queue_node)) {
hlist_del_init_rcu(&watch->queue_node);
Refactor post_one_notification so that we can lock pipe using sleepable lock. Signed-off-by: Hongchen Zhang <zhanghongchen@loongson.cn> --- fs/pipe.c | 5 +++- include/linux/watch_queue.h | 14 ++++++++++- kernel/watch_queue.c | 47 +++++++++++++++++++++++++++---------- 3 files changed, 51 insertions(+), 15 deletions(-) base-commit: 6995e2de6891c724bfeb2db33d7b87775f913ad1