Message ID | 155905931502.7587.11705449537368497489.stgit@warthog.procyon.org.uk (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | Mount, FS, Block and Keyrings notifications | expand |
On Tue, May 28, 2019 at 05:01:55PM +0100, David Howells wrote: > Implement a misc device that implements a general notification queue as a > ring buffer that can be mmap()'d from userspace. "general" but just for filesystems, right? :( > Each entry has a 1-slot header that describes it: > > struct watch_notification { > __u32 type:24; > __u32 subtype:8; > __u32 info; > }; This doesn't match the structure definition in the documentation, so something is out of sync. > The type indicates the source (eg. mount tree changes, superblock events, > keyring changes, block layer events) and the subtype indicates the event > type (eg. mount, unmount; EIO, EDQUOT; link, unlink). The info field > indicates a number of things, including the entry length, an ID assigned to > a watchpoint contributing to this buffer, type-specific flags and meta > flags, such as an overrun indicator. > > Supplementary data, such as the key ID that generated an event, are > attached in additional slots. I'm all for a "generic" event system for the kernel (heck, Solaris has had one for decades), but it keeps getting shot down every time it comes up. What is different about this one? > --- a/drivers/misc/Kconfig > +++ b/drivers/misc/Kconfig > @@ -4,6 +4,19 @@ > > menu "Misc devices" > > +config WATCH_QUEUE > + bool "Mappable notification queue" > + default n Nit, not needed. > + depends on MMU > + help > + This is a general notification queue for the kernel to pass events to > + userspace through a mmap()'able ring buffer. It can be used in > + conjunction with watches for mount topology change notifications, > + superblock change notifications and key/keyring change notifications. > + > + Note that in theory this should work fine with NOMMU, but I'm not > + sure how to make that work. > + > config SENSORS_LIS3LV02D > tristate > depends on INPUT > diff --git a/drivers/misc/Makefile b/drivers/misc/Makefile > index b9affcdaa3d6..bf16acd9f8cc 100644 > --- a/drivers/misc/Makefile > +++ b/drivers/misc/Makefile > @@ -3,6 +3,7 @@ > # Makefile for misc devices that really don't fit anywhere else. > # > > +obj-$(CONFIG_WATCH_QUEUE) += watch_queue.o > obj-$(CONFIG_IBM_ASM) += ibmasm/ > obj-$(CONFIG_IBMVMC) += ibmvmc.o > obj-$(CONFIG_AD525X_DPOT) += ad525x_dpot.o > diff --git a/drivers/misc/watch_queue.c b/drivers/misc/watch_queue.c > new file mode 100644 > index 000000000000..39a09ea15d97 > --- /dev/null > +++ b/drivers/misc/watch_queue.c > @@ -0,0 +1,877 @@ > +/* User-mappable watch queue > + * > + * Copyright (C) 2018 Red Hat, Inc. All Rights Reserved. You didn't touch the code this year? > + * Written by David Howells (dhowells@redhat.com) > + * > + * This program is free software; you can redistribute it and/or > + * modify it under the terms of the GNU General Public Licence > + * as published by the Free Software Foundation; either version > + * 2 of the Licence, or (at your option) any later version. Please drop the boiler plate text and use a SPDX tag, checkpatch should have caught this. I don't want to have to go and change it again. > + * > + * See Documentation/watch_queue.rst > + */ > + > +#define pr_fmt(fmt) "watchq: " fmt > +#include <linux/module.h> > +#include <linux/init.h> > +#include <linux/sched.h> > +#include <linux/slab.h> > +#include <linux/printk.h> > +#include <linux/miscdevice.h> > +#include <linux/fs.h> > +#include <linux/mm.h> > +#include <linux/pagemap.h> > +#include <linux/poll.h> > +#include <linux/uaccess.h> > +#include <linux/vmalloc.h> > +#include <linux/file.h> > +#include <linux/security.h> > +#include <linux/cred.h> > +#include <linux/watch_queue.h> > + > +#define DEBUG_WITH_WRITE /* Allow use of write() to record notifications */ debugging code left in? > + > +MODULE_DESCRIPTION("Watch queue"); > +MODULE_AUTHOR("Red Hat, Inc."); > +MODULE_LICENSE("GPL"); > + > +struct watch_type_filter { > + enum watch_notification_type type; > + __u32 subtype_filter[1]; /* Bitmask of subtypes to filter on */ > + __u32 info_filter; /* Filter on watch_notification::info */ > + __u32 info_mask; /* Mask of relevant bits in info_filter */ > +}; > + > +struct watch_filter { > + union { > + struct rcu_head rcu; > + unsigned long type_filter[2]; /* Bitmask of accepted types */ > + }; > + u32 nr_filters; /* Number of filters */ > + struct watch_type_filter filters[]; > +}; > + > +struct watch_queue { > + struct rcu_head rcu; > + struct address_space mapping; > + const struct cred *cred; /* Creds of the owner of the queue */ > + struct watch_filter __rcu *filter; > + wait_queue_head_t waiters; > + struct hlist_head watches; /* Contributory watches */ > + refcount_t usage; Usage of what, this structure? Or something else? > + spinlock_t lock; > + bool defunct; /* T when queues closed */ > + u8 nr_pages; /* Size of pages[] */ > + u8 flag_next; /* Flag to apply to next item */ > +#ifdef DEBUG_WITH_WRITE > + u8 debug; > +#endif > + u32 size; > + struct watch_queue_buffer *buffer; /* Pointer to first record */ > + > + /* The mappable pages. The zeroth page holds the ring pointers. */ > + struct page **pages; > +}; > +EXPORT_SYMBOL(__post_watch_notification); _GPL for new apis? (I have to ask...) > +static long watch_queue_ioctl(struct file *file, unsigned int cmd, unsigned long arg) > +{ > + struct watch_queue *wqueue = file->private_data; > + struct inode *inode = file_inode(file); > + long ret; > + > + switch (cmd) { > + case IOC_WATCH_QUEUE_SET_SIZE: > + if (wqueue->buffer) > + return -EBUSY; > + inode_lock(inode); > + ret = watch_queue_set_size(wqueue, arg); > + inode_unlock(inode); > + return ret; > + > + case IOC_WATCH_QUEUE_SET_FILTER: > + inode_lock(inode); > + ret = watch_queue_set_filter( > + inode, wqueue, > + (struct watch_notification_filter __user *)arg); > + inode_unlock(inode); > + return ret; > + > + default: > + return -EOPNOTSUPP; -ENOTTY is the correct "not a valid ioctl" error value, right? > + } > +} > +/** > + * put_watch_queue - Dispose of a ref on a watchqueue. > + * @wqueue: The watch queue to unref. > + */ > +void put_watch_queue(struct watch_queue *wqueue) > +{ > + if (refcount_dec_and_test(&wqueue->usage)) > + kfree_rcu(wqueue, rcu); Why not just use a kref? > +} > +EXPORT_SYMBOL(put_watch_queue); > +int add_watch_to_object(struct watch *watch, struct watch_list *wlist) > +{ > + struct watch_queue *wqueue = rcu_access_pointer(watch->queue); > + struct watch *w; > + > + hlist_for_each_entry(w, &wlist->watchers, list_node) { > + if (watch->id == w->id) > + return -EBUSY; > + } > + > + rcu_assign_pointer(watch->watch_list, wlist); > + > + spin_lock_bh(&wqueue->lock); > + refcount_inc(&wqueue->usage); > + hlist_add_head(&watch->queue_node, &wqueue->watches); > + spin_unlock_bh(&wqueue->lock); > + > + hlist_add_head(&watch->list_node, &wlist->watchers); > + return 0; > +} > +EXPORT_SYMBOL(add_watch_to_object); Naming nit, shouldn't the "prefix" all be the same for these new functions? watch_queue_add_object()? watch_queue_put()? And so on? > +static int __init watch_queue_init(void) > +{ > + int ret; > + > + ret = misc_register(&watch_queue_dev); > + if (ret < 0) > + pr_err("Failed to register %d\n", ret); > + return ret; > +} > +fs_initcall(watch_queue_init); > + > +static void __exit watch_queue_exit(void) > +{ > + misc_deregister(&watch_queue_dev); > +} > +module_exit(watch_queue_exit); module_misc_device()? > --- /dev/null > +++ b/include/linux/watch_queue.h > @@ -0,0 +1,86 @@ > +/* User-mappable watch queue > + * > + * Copyright (C) 2018 Red Hat, Inc. All Rights Reserved. > + * Written by David Howells (dhowells@redhat.com) > + * > + * This program is free software; you can redistribute it and/or > + * modify it under the terms of the GNU General Public Licence > + * as published by the Free Software Foundation; either version > + * 2 of the Licence, or (at your option) any later version. Again, SPDX headers please. > --- /dev/null > +++ b/include/uapi/linux/watch_queue.h > @@ -0,0 +1,82 @@ > +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */ Yeah!!! No copyright? :( > +#ifndef _UAPI_LINUX_WATCH_QUEUE_H > +#define _UAPI_LINUX_WATCH_QUEUE_H > + > +#include <linux/types.h> > +#include <linux/ioctl.h> > + > +#define IOC_WATCH_QUEUE_SET_SIZE _IO('s', 0x01) /* Set the size in pages */ > +#define IOC_WATCH_QUEUE_SET_FILTER _IO('s', 0x02) /* Set the filter */ > + > +enum watch_notification_type { > + WATCH_TYPE_META = 0, /* Special record */ > + WATCH_TYPE_MOUNT_NOTIFY = 1, /* Mount notification record */ > + WATCH_TYPE_SB_NOTIFY = 2, /* Superblock notification */ > + WATCH_TYPE_KEY_NOTIFY = 3, /* Key/keyring change notification */ > + WATCH_TYPE_BLOCK_NOTIFY = 4, /* Block layer notifications */ > +#define WATCH_TYPE___NR 5 > +}; > + > +enum watch_meta_notification_subtype { > + WATCH_META_SKIP_NOTIFICATION = 0, /* Just skip this record */ > + WATCH_META_REMOVAL_NOTIFICATION = 1, /* Watched object was removed */ > +}; > + > +/* > + * Notification record > + */ > +struct watch_notification { > + __u32 type:24; /* enum watch_notification_type */ > + __u32 subtype:8; /* Type-specific subtype (filterable) */ > + __u32 info; > +#define WATCH_INFO_OVERRUN 0x00000001 /* Event(s) lost due to overrun */ > +#define WATCH_INFO_ENOMEM 0x00000002 /* Event(s) lost due to ENOMEM */ > +#define WATCH_INFO_RECURSIVE 0x00000004 /* Change was recursive */ > +#define WATCH_INFO_LENGTH 0x000001f8 /* Length of record / sizeof(watch_notification) */ > +#define WATCH_INFO_IN_SUBTREE 0x00000200 /* Change was not at watched root */ > +#define WATCH_INFO_TYPE_FLAGS 0x00ff0000 /* Type-specific flags */ > +#define WATCH_INFO_FLAG_0 0x00010000 > +#define WATCH_INFO_FLAG_1 0x00020000 > +#define WATCH_INFO_FLAG_2 0x00040000 > +#define WATCH_INFO_FLAG_3 0x00080000 > +#define WATCH_INFO_FLAG_4 0x00100000 > +#define WATCH_INFO_FLAG_5 0x00200000 > +#define WATCH_INFO_FLAG_6 0x00400000 > +#define WATCH_INFO_FLAG_7 0x00800000 > +#define WATCH_INFO_ID 0xff000000 /* ID of watchpoint */ > +}; > + > +#define WATCH_LENGTH_SHIFT 3 > + > +struct watch_queue_buffer { > + union { > + /* The first few entries are special, containing the > + * ring management variables. > + */ > + struct { > + struct watch_notification watch; /* WATCH_TYPE_SKIP */ > + volatile __u32 head; /* Ring head index */ > + volatile __u32 tail; /* Ring tail index */ A uapi structure that has volatile in it? Are you _SURE_ this is correct? That feels wrong to me... This is not a backing-hardware register, it's "just memory" and slapping volatile on it shouldn't be the correct solution for telling the compiler to not to optimize away reads/flushes, right? You need a proper memory access type primitive for that to work correctly everywhere I thought. We only have 2 users of volatile in include/uapi, one for WMI structures that are backed by firmware (seems correct), and one for DRM which I have no idea how it works as it claims to be a lock. Why is this new addition the correct way to do this that no other ring-buffer that was mmapped has needed to? thanks, greg k-h
Greg KH <gregkh@linuxfoundation.org> wrote: > > Implement a misc device that implements a general notification queue as a > > ring buffer that can be mmap()'d from userspace. > > "general" but just for filesystems, right? :( Whatever gave you that idea? You can watch keyrings events, for example - they're not exactly filesystems. I've added the ability to watch for mount topology changes and superblock events because those are something I've been asked to do. I've added something for block events because I've recently had a problem with trying to recover data from a dodgy disk in that every time the disk goes offline, the ddrecover goes "wheeeee!" as it just sees a lot of EIO/ENODATA at a great rate of knots because it doesn't know the driver is now ignoring the disk. I don't know what else people might want to watch, but I've tried to make it as generic as possible so as not to exclude it if possible. > This doesn't match the structure definition in the documentation, so > something is out of sync. Ah, yes - I need to update that doc, thanks. > I'm all for a "generic" event system for the kernel (heck, Solaris has > had one for decades), but it keeps getting shot down every time it comes > up. What is different about this one? Without studying all the other ones, I can't say - however, I need to add something for keyrings and I would prefer to make something generic. > > +#define DEBUG_WITH_WRITE /* Allow use of write() to record notifications */ > > debugging code left in? I'll switch it to #undef. I want to leave the code in there for testing purposes. Possibly I should make it a Kconfig option. > > + refcount_t usage; > > Usage of what, this structure? Or something else? This is the number of usages of this struct (references to if you prefer). I can add a comment to this effect. > > +EXPORT_SYMBOL(__post_watch_notification); > > _GPL for new apis? (I have to ask...) No. > > + return -EOPNOTSUPP; > > -ENOTTY is the correct "not a valid ioctl" error value, right? fs/ioctl.c does both, but I can switch it if it makes you happier. > > +void put_watch_queue(struct watch_queue *wqueue) > > +{ > > + if (refcount_dec_and_test(&wqueue->usage)) > > + kfree_rcu(wqueue, rcu); > > Why not just use a kref? Why use a kref? It seems like an effort to be a C++ base class, but without the C++ inheritance bit. Using kref doesn't seem to gain anything. It's just a wrapper around refcount_t - so why not just use a refcount_t? kref_put() could potentially add an unnecessary extra stack frame and would seem to be best avoided, though an optimising compiler ought to be able to inline if it can. Are you now on the convert all refcounts to krefs path? > > +EXPORT_SYMBOL(add_watch_to_object); > > Naming nit, shouldn't the "prefix" all be the same for these new > functions? > > watch_queue_add_object()? watch_queue_put()? And so on? Naming is fun. watch_queue_add_object - that suggests something different to what the function actually does. I'll think about adjusting the names. > > +module_exit(watch_queue_exit); > > module_misc_device()? warthog>git grep module_misc_device -- Documentation/ warthog1> > > +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */ > > Yeah!!! Blech. > > + struct { > > + struct watch_notification watch; /* WATCH_TYPE_SKIP */ > > + volatile __u32 head; /* Ring head index */ > > + volatile __u32 tail; /* Ring tail index */ > > A uapi structure that has volatile in it? Are you _SURE_ this is > correct? > > That feels wrong to me... This is not a backing-hardware register, it's > "just memory" and slapping volatile on it shouldn't be the correct > solution for telling the compiler to not to optimize away reads/flushes, > right? You need a proper memory access type primitive for that to work > correctly everywhere I thought. > > We only have 2 users of volatile in include/uapi, one for WMI structures > that are backed by firmware (seems correct), and one for DRM which I > have no idea how it works as it claims to be a lock. Why is this new > addition the correct way to do this that no other ring-buffer that was > mmapped has needed to? Yeah, I understand your concern with this. The reason I put the volatiles in is that the kernel may be modifying the head pointer on one CPU simultaneously with userspace modifying the tail pointer on another CPU. Note that userspace does not need to enter the kernel to find out if there's anything in the buffer or to read stuff out of the buffer. Userspace only needs to enter the kernel, using poll() or similar, to wait for something to appear in the buffer. David
On Tue, May 28, 2019 at 6:03 PM David Howells <dhowells@redhat.com> wrote: > Implement a misc device that implements a general notification queue as a > ring buffer that can be mmap()'d from userspace. [...] > +receive notifications from the kernel. This is can be used in conjunction typo: s/is can/can/ [...] > +Overview > +======== > + > +This facility appears as a misc device file that is opened and then mapped and > +polled. Each time it is opened, it creates a new buffer specific to the > +returned file descriptor. Then, when the opening process sets watches, it > +indicates that particular buffer it wants notifications from that watch to be > +written into. Note that there are no read() and write() methods (except for s/that particular buffer/the particular buffer/ > +debugging). The user is expected to access the ring directly and to use poll > +to wait for new data. [...] > +/** > + * __post_watch_notification - Post an event notification > + * @wlist: The watch list to post the event to. > + * @n: The notification record to post. > + * @cred: The creds of the process that triggered the notification. > + * @id: The ID to match on the watch. > + * > + * Post a notification of an event into a set of watch queues and let the users > + * know. > + * > + * If @n is NULL then WATCH_INFO_LENGTH will be set on the next event posted. > + * > + * The size of the notification should be set in n->info & WATCH_INFO_LENGTH and > + * should be in units of sizeof(*n). > + */ > +void __post_watch_notification(struct watch_list *wlist, > + struct watch_notification *n, > + const struct cred *cred, > + u64 id) > +{ > + const struct watch_filter *wf; > + struct watch_queue *wqueue; > + struct watch *watch; > + > + rcu_read_lock(); > + > + hlist_for_each_entry_rcu(watch, &wlist->watchers, list_node) { > + if (watch->id != id) > + continue; > + n->info &= ~(WATCH_INFO_ID | WATCH_INFO_OVERRUN); > + n->info |= watch->info_id; > + > + wqueue = rcu_dereference(watch->queue); > + wf = rcu_dereference(wqueue->filter); > + if (wf && !filter_watch_notification(wf, n)) > + continue; > + > + post_one_notification(wqueue, n, cred); > + } > + > + rcu_read_unlock(); > +} > +EXPORT_SYMBOL(__post_watch_notification); [...] > +static vm_fault_t watch_queue_fault(struct vm_fault *vmf) > +{ > + struct watch_queue *wqueue = vmf->vma->vm_file->private_data; > + struct page *page; > + > + page = wqueue->pages[vmf->pgoff]; I don't see you setting any special properties on the VMA that would prevent userspace from extending its size via mremap() - no VM_DONTEXPAND or VM_PFNMAP. So I think you might get an out-of-bounds access here? > + get_page(page); > + if (!lock_page_or_retry(page, vmf->vma->vm_mm, vmf->flags)) { > + put_page(page); > + return VM_FAULT_RETRY; > + } > + vmf->page = page; > + return VM_FAULT_LOCKED; > +} > + > +static void watch_queue_map_pages(struct vm_fault *vmf, > + pgoff_t start_pgoff, pgoff_t end_pgoff) > +{ > + struct watch_queue *wqueue = vmf->vma->vm_file->private_data; > + struct page *page; > + > + rcu_read_lock(); > + > + do { > + page = wqueue->pages[start_pgoff]; Same as above. > + if (trylock_page(page)) { > + vm_fault_t ret; > + get_page(page); > + ret = alloc_set_pte(vmf, NULL, page); > + if (ret != 0) > + put_page(page); > + > + unlock_page(page); > + } > + } while (++start_pgoff < end_pgoff); > + > + rcu_read_unlock(); > +} [...] > +static int watch_queue_mmap(struct file *file, struct vm_area_struct *vma) > +{ > + struct watch_queue *wqueue = file->private_data; > + > + if (vma->vm_pgoff != 0 || > + vma->vm_end - vma->vm_start > wqueue->nr_pages * PAGE_SIZE || > + !(pgprot_val(vma->vm_page_prot) & pgprot_val(PAGE_SHARED))) > + return -EINVAL; This thing should probably have locking against concurrent watch_queue_set_size()? > + vma->vm_ops = &watch_queue_vm_ops; > + > + vma_interval_tree_insert(vma, &wqueue->mapping.i_mmap); > + return 0; > +} > + > +/* > + * Allocate the required number of pages. > + */ > +static long watch_queue_set_size(struct watch_queue *wqueue, unsigned long nr_pages) > +{ > + struct watch_queue_buffer *buf; > + u32 len; > + int i; > + > + if (nr_pages == 0 || > + nr_pages > 16 || /* TODO: choose a better hard limit */ > + !is_power_of_2(nr_pages)) > + return -EINVAL; > + > + wqueue->pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL); > + if (!wqueue->pages) > + goto err; > + > + for (i = 0; i < nr_pages; i++) { > + wqueue->pages[i] = alloc_page(GFP_KERNEL | __GFP_ZERO); > + if (!wqueue->pages[i]) > + goto err_some_pages; > + wqueue->pages[i]->mapping = &wqueue->mapping; > + SetPageUptodate(wqueue->pages[i]); > + } > + > + buf = vmap(wqueue->pages, nr_pages, VM_MAP, PAGE_SHARED); > + if (!buf) > + goto err_some_pages; > + > + wqueue->buffer = buf; > + wqueue->nr_pages = nr_pages; > + wqueue->size = ((nr_pages * PAGE_SIZE) / sizeof(struct watch_notification)); > + > + /* The first four slots in the buffer contain metadata about the ring, > + * including the head and tail indices and mask. > + */ > + len = sizeof(buf->meta) / sizeof(buf->slots[0]); > + buf->meta.watch.info = len << WATCH_LENGTH_SHIFT; > + buf->meta.watch.type = WATCH_TYPE_META; > + buf->meta.watch.subtype = WATCH_META_SKIP_NOTIFICATION; > + buf->meta.tail = len; > + buf->meta.mask = wqueue->size - 1; > + smp_store_release(&buf->meta.head, len); Why is this an smp_store_release()? The entire buffer should not be visible to userspace before this setup is complete, right? > + return 0; > + > +err_some_pages: > + for (i--; i >= 0; i--) { > + ClearPageUptodate(wqueue->pages[i]); > + wqueue->pages[i]->mapping = NULL; > + put_page(wqueue->pages[i]); > + } > + > + kfree(wqueue->pages); > + wqueue->pages = NULL; > +err: > + return -ENOMEM; > +} [...] > + > +/* > + * Set parameters. > + */ > +static long watch_queue_ioctl(struct file *file, unsigned int cmd, unsigned long arg) > +{ > + struct watch_queue *wqueue = file->private_data; > + struct inode *inode = file_inode(file); > + long ret; > + > + switch (cmd) { > + case IOC_WATCH_QUEUE_SET_SIZE: > + if (wqueue->buffer) > + return -EBUSY; The preceding check occurs without any locks held and therefore has no reliable effect. It should probably be moved below the inode_lock(...). > + inode_lock(inode); > + ret = watch_queue_set_size(wqueue, arg); > + inode_unlock(inode); > + return ret; > + > + case IOC_WATCH_QUEUE_SET_FILTER: > + inode_lock(inode); > + ret = watch_queue_set_filter( > + inode, wqueue, > + (struct watch_notification_filter __user *)arg); > + inode_unlock(inode); > + return ret; > + > + default: > + return -EOPNOTSUPP; > + } > +} [...] > +static void free_watch(struct rcu_head *rcu) > +{ > + struct watch *watch = container_of(rcu, struct watch, rcu); > + > + put_watch_queue(rcu_access_pointer(watch->queue)); This should be rcu_dereference_protected(..., 1). > +/** > + * remove_watch_from_object - Remove a watch or all watches from an object. > + * @wlist: The watch list to remove from > + * @wq: The watch queue of interest (ignored if @all is true) > + * @id: The ID of the watch to remove (ignored if @all is true) > + * @all: True to remove all objects > + * > + * Remove a specific watch or all watches from an object. A notification is > + * sent to the watcher to tell them that this happened. > + */ > +int remove_watch_from_object(struct watch_list *wlist, struct watch_queue *wq, > + u64 id, bool all) > +{ > + struct watch_notification n; > + struct watch_queue *wqueue; > + struct watch *watch; > + int ret = -EBADSLT; > + > + rcu_read_lock(); > + > +again: > + spin_lock(&wlist->lock); > + hlist_for_each_entry(watch, &wlist->watchers, list_node) { > + if (all || > + (watch->id == id && rcu_access_pointer(watch->queue) == wq)) > + goto found; > + } > + spin_unlock(&wlist->lock); > + goto out; > + > +found: > + ret = 0; > + hlist_del_init_rcu(&watch->list_node); > + rcu_assign_pointer(watch->watch_list, NULL); > + spin_unlock(&wlist->lock); > + > + n.type = WATCH_TYPE_META; > + n.subtype = WATCH_META_REMOVAL_NOTIFICATION; > + n.info = watch->info_id | sizeof(n); > + > + wqueue = rcu_dereference(watch->queue); > + post_one_notification(wqueue, &n, wq ? wq->cred : NULL); > + > + /* We don't need the watch list lock for the next bit as RCU is > + * protecting everything from being deallocated. Does "everything" mean "the wqueue" or more than that? > + */ > + if (wqueue) { > + spin_lock_bh(&wqueue->lock); > + > + if (!hlist_unhashed(&watch->queue_node)) { > + hlist_del_init_rcu(&watch->queue_node); > + put_watch(watch); > + } > + > + spin_unlock_bh(&wqueue->lock); > + } > + > + if (wlist->release_watch) { > + rcu_read_unlock(); > + wlist->release_watch(wlist, watch); > + rcu_read_lock(); > + } > + put_watch(watch); > + > + if (all && !hlist_empty(&wlist->watchers)) > + goto again; > +out: > + rcu_read_unlock(); > + return ret; > +} > +EXPORT_SYMBOL(remove_watch_from_object); > + > +/* > + * Remove all the watches that are contributory to a queue. This will > + * potentially race with removal of the watches by the destruction of the > + * objects being watched or the distribution of notifications. > + */ > +static void watch_queue_clear(struct watch_queue *wqueue) > +{ > + struct watch_list *wlist; > + struct watch *watch; > + bool release; > + > + rcu_read_lock(); > + spin_lock_bh(&wqueue->lock); > + > + /* Prevent new additions and prevent notifications from happening */ > + wqueue->defunct = true; > + > + while (!hlist_empty(&wqueue->watches)) { > + watch = hlist_entry(wqueue->watches.first, struct watch, queue_node); > + hlist_del_init_rcu(&watch->queue_node); > + spin_unlock_bh(&wqueue->lock); > + > + /* We can't do the next bit under the queue lock as we need to > + * get the list lock - which would cause a deadlock if someone > + * was removing from the opposite direction at the same time or > + * posting a notification. > + */ > + wlist = rcu_dereference(watch->watch_list); > + if (wlist) { > + spin_lock(&wlist->lock); > + > + release = !hlist_unhashed(&watch->list_node); > + if (release) { > + hlist_del_init_rcu(&watch->list_node); > + rcu_assign_pointer(watch->watch_list, NULL); > + } > + > + spin_unlock(&wlist->lock); > + > + if (release) { > + if (wlist->release_watch) { > + rcu_read_unlock(); > + /* This might need to call dput(), so > + * we have to drop all the locks. > + */ > + wlist->release_watch(wlist, watch); How are you holding a reference to `wlist` here? You got the reference through rcu_dereference(), you've dropped the RCU read lock, and I don't see anything that stabilizes the reference. > + rcu_read_lock(); > + } > + put_watch(watch); > + } > + } > + > + put_watch(watch); > + spin_lock_bh(&wqueue->lock); > + } > + > + spin_unlock_bh(&wqueue->lock); > + rcu_read_unlock(); > +} > + > +/* > + * Release the file. > + */ > +static int watch_queue_release(struct inode *inode, struct file *file) > +{ > + struct watch_filter *wfilter; > + struct watch_queue *wqueue = file->private_data; > + int i, pgref; > + > + watch_queue_clear(wqueue); > + > + if (wqueue->pages && wqueue->pages[0]) > + WARN_ON(page_ref_count(wqueue->pages[0]) != 1); Is there a reason why there couldn't still be references to the pages from get_user_pages()/get_user_pages_fast()? > + if (wqueue->buffer) > + vfree(wqueue->buffer); > + for (i = 0; i < wqueue->nr_pages; i++) { > + ClearPageUptodate(wqueue->pages[i]); > + wqueue->pages[i]->mapping = NULL; > + pgref = page_ref_count(wqueue->pages[i]); > + WARN(pgref != 1, > + "FREE PAGE[%d] refcount %d\n", i, page_ref_count(wqueue->pages[i])); > + __free_page(wqueue->pages[i]); > + } > + > + wfilter = rcu_access_pointer(wqueue->filter); Again, rcu_dereference_protected(..., 1). > + if (wfilter) > + kfree_rcu(wfilter, rcu); > + kfree(wqueue->pages); > + put_cred(wqueue->cred); > + put_watch_queue(wqueue); > + return 0; > +} > + > +#ifdef DEBUG_WITH_WRITE > +static ssize_t watch_queue_write(struct file *file, > + const char __user *_buf, size_t len, loff_t *pos) > +{ > + struct watch_notification *n; > + struct watch_queue *wqueue = file->private_data; > + ssize_t ret; > + > + if (!wqueue->buffer) > + return -ENOBUFS; > + > + if (len & ~WATCH_INFO_LENGTH || len == 0 || !_buf) > + return -EINVAL; > + > + n = memdup_user(_buf, len); > + if (IS_ERR(n)) > + return PTR_ERR(n); > + > + ret = -EINVAL; > + if ((n->info & WATCH_INFO_LENGTH) != len) > + goto error; > + n->info &= (WATCH_INFO_LENGTH | WATCH_INFO_TYPE_FLAGS | WATCH_INFO_ID); Should the non-atomic modification of n->info (and perhaps also the following uses of ->debug) be protected by some lock? > + if (post_one_notification(wqueue, n, file->f_cred)) > + wqueue->debug = 0; > + else > + wqueue->debug++; > + ret = len; > + if (wqueue->debug > 20) > + ret = -EIO; > + > +error: > + kfree(n); > + return ret; > +} > +#endif [...] > +#define IOC_WATCH_QUEUE_SET_SIZE _IO('s', 0x01) /* Set the size in pages */ > +#define IOC_WATCH_QUEUE_SET_FILTER _IO('s', 0x02) /* Set the filter */ Should these ioctl numbers be registered in Documentation/ioctl/ioctl-number.txt?
Jann Horn <jannh@google.com> wrote: > I don't see you setting any special properties on the VMA that would > prevent userspace from extending its size via mremap() - no > VM_DONTEXPAND or VM_PFNMAP. So I think you might get an out-of-bounds > access here? Should I just set VM_DONTEXPAND in watch_queue_mmap()? Like so: vma->vm_flags |= VM_DONTEXPAND; > > +static void watch_queue_map_pages(struct vm_fault *vmf, > > + pgoff_t start_pgoff, pgoff_t end_pgoff) > ... > > Same as above. Same solution as above? Or do I need ot check start/end_pgoff too? > > +static int watch_queue_mmap(struct file *file, struct vm_area_struct *vma) > > +{ > > + struct watch_queue *wqueue = file->private_data; > > + > > + if (vma->vm_pgoff != 0 || > > + vma->vm_end - vma->vm_start > wqueue->nr_pages * PAGE_SIZE || > > + !(pgprot_val(vma->vm_page_prot) & pgprot_val(PAGE_SHARED))) > > + return -EINVAL; > > This thing should probably have locking against concurrent > watch_queue_set_size()? Yeah. static int watch_queue_mmap(struct file *file, struct vm_area_struct *vma) { struct watch_queue *wqueue = file->private_data; struct inode *inode = file_inode(file); u8 nr_pages; inode_lock(inode); nr_pages = wqueue->nr_pages; inode_unlock(inode); if (nr_pages == 0 || ... return -EINVAL; > > + smp_store_release(&buf->meta.head, len); > > Why is this an smp_store_release()? The entire buffer should not be visible to > userspace before this setup is complete, right? Yes - if I put the above locking in the mmap handler. OTOH, it's a one-off barrier... > > + if (wqueue->buffer) > > + return -EBUSY; > > The preceding check occurs without any locks held and therefore has no > reliable effect. It should probably be moved below the > inode_lock(...). Yeah, it can race. I'll move it into watch_queue_set_size(). > > +static void free_watch(struct rcu_head *rcu) > > +{ > > + struct watch *watch = container_of(rcu, struct watch, rcu); > > + > > + put_watch_queue(rcu_access_pointer(watch->queue)); > > This should be rcu_dereference_protected(..., 1). That shouldn't be necessary. rcu_access_pointer()'s description says: * It is also permissible to use rcu_access_pointer() when read-side * access to the pointer was removed at least one grace period ago, as * is the case in the context of the RCU callback that is freeing up * the data, ... It's in an rcu callback function, so accessing the __rcu pointers in the RCU'd struct should be fine with rcu_access_pointer(). > > + /* We don't need the watch list lock for the next bit as RCU is > > + * protecting everything from being deallocated. > > Does "everything" mean "the wqueue" or more than that? Actually, just 'wqueue' and its buffer. 'watch' is held by us once we've dequeued it as we now own the ref 'wlist' had on it. 'wlist' and 'wq' must be pinned by the caller. > > + if (release) { > > + if (wlist->release_watch) { > > + rcu_read_unlock(); > > + /* This might need to call dput(), so > > + * we have to drop all the locks. > > + */ > > + wlist->release_watch(wlist, watch); > > How are you holding a reference to `wlist` here? You got the reference through > rcu_dereference(), you've dropped the RCU read lock, and I don't see anything > that stabilizes the reference. The watch record must hold a ref on the watched object if the watch_list has a ->release_watch() method. In the code snippet above, the watch record now belongs to us because we unlinked it under the wlist->lock some lines prior. However, you raise a good point, and I think the thing to do is to cache ->release_watch from it and not pass wlist into (*release_watch)(). We don't need to concern ourselves with cleaning up *wlist as it will be cleaned up when the target object is removed. Keyrings don't have a ->release_watch method and neither does the block-layer notification stuff. > > + if (wqueue->pages && wqueue->pages[0]) > > + WARN_ON(page_ref_count(wqueue->pages[0]) != 1); > > Is there a reason why there couldn't still be references to the pages > from get_user_pages()/get_user_pages_fast()? I'm not sure. I'm not sure what to do if there are. What do you suggest? > > + n->info &= (WATCH_INFO_LENGTH | WATCH_INFO_TYPE_FLAGS | WATCH_INFO_ID); > > Should the non-atomic modification of n->info n's an unpublished copy of some userspace data that's local to the function instance. There shouldn't be any way to race with it at this point. > (and perhaps also the > following uses of ->debug) be protected by some lock? > > > + if (post_one_notification(wqueue, n, file->f_cred)) > > + wqueue->debug = 0; > > + else > > + wqueue->debug++; > > + ret = len; > > + if (wqueue->debug > 20) > > + ret = -EIO; It's for debugging purposes, so the non-atomiticity doesn't matter. I'll #undef the symbol to disable the code. > > +#define IOC_WATCH_QUEUE_SET_SIZE _IO('s', 0x01) /* Set the size in pages */ > > +#define IOC_WATCH_QUEUE_SET_FILTER _IO('s', 0x02) /* Set the filter */ > > Should these ioctl numbers be registered in > Documentation/ioctl/ioctl-number.txt? Quite possibly. I'm not sure where's best to actually allocate it. I picked 's' out of the air. David
On Tue, May 28, 2019 at 06:30:20PM +0100, David Howells wrote: > Greg KH <gregkh@linuxfoundation.org> wrote: > > > > Implement a misc device that implements a general notification queue as a > > > ring buffer that can be mmap()'d from userspace. > > > > "general" but just for filesystems, right? :( > > Whatever gave you that idea? You can watch keyrings events, for example - > they're not exactly filesystems. I've added the ability to watch for mount > topology changes and superblock events because those are something I've been > asked to do. I've added something for block events because I've recently had > a problem with trying to recover data from a dodgy disk in that every time the > disk goes offline, the ddrecover goes "wheeeee!" as it just sees a lot of > EIO/ENODATA at a great rate of knots because it doesn't know the driver is now > ignoring the disk. > > I don't know what else people might want to watch, but I've tried to make it > as generic as possible so as not to exclude it if possible. Ok, let me try to dig up some older proposals to see if this fits into the same model to work with them as well. > > > + refcount_t usage; > > > > Usage of what, this structure? Or something else? > > This is the number of usages of this struct (references to if you prefer). I > can add a comment to this effect. I think you answer this later on with the kref comment :) > > > + return -EOPNOTSUPP; > > > > -ENOTTY is the correct "not a valid ioctl" error value, right? > > fs/ioctl.c does both, but I can switch it if it makes you happier. It does. > > > +void put_watch_queue(struct watch_queue *wqueue) > > > +{ > > > + if (refcount_dec_and_test(&wqueue->usage)) > > > + kfree_rcu(wqueue, rcu); > > > > Why not just use a kref? > > Why use a kref? It seems like an effort to be a C++ base class, but without > the C++ inheritance bit. Using kref doesn't seem to gain anything. It's just > a wrapper around refcount_t - so why not just use a refcount_t? > > kref_put() could potentially add an unnecessary extra stack frame and would > seem to be best avoided, though an optimising compiler ought to be able to > inline if it can. If kref_put() is on your fast path, you have worse problems (kfree isn't fast, right?) Anyway, it's an inline function, how can it add an extra stack frame? Don't try to optimize something that isn't needed yet. > Are you now on the convert all refcounts to krefs path? "now"? Remember, I wrote kref all those years ago, everyone should use it. It saves us having to audit the same pattern over and over again. And, even nicer, it uses a refcount now, and as you are trying to reference count an object, it is exactly what this was written for. So yes, I do think it should be used here, unless it is deemed to not fit the pattern/usage model. > > > +EXPORT_SYMBOL(add_watch_to_object); > > > > Naming nit, shouldn't the "prefix" all be the same for these new > > functions? > > > > watch_queue_add_object()? watch_queue_put()? And so on? > > Naming is fun. watch_queue_add_object - that suggests something different to > what the function actually does. I'll think about adjusting the names. Ok, just had to say something. It's your call, and yes, naming is hard. > > > +module_exit(watch_queue_exit); > > > > module_misc_device()? > > warthog>git grep module_misc_device -- Documentation/ > warthog1> Do I have to document all helper macros? Anyway, it saves you boilerplate code, but if built in, it's at the module init level, not the fs init level, like you are asking for here. So that might not work, it's your call. > > > + struct { > > > + struct watch_notification watch; /* WATCH_TYPE_SKIP */ > > > + volatile __u32 head; /* Ring head index */ > > > + volatile __u32 tail; /* Ring tail index */ > > > > A uapi structure that has volatile in it? Are you _SURE_ this is > > correct? > > > > That feels wrong to me... This is not a backing-hardware register, it's > > "just memory" and slapping volatile on it shouldn't be the correct > > solution for telling the compiler to not to optimize away reads/flushes, > > right? You need a proper memory access type primitive for that to work > > correctly everywhere I thought. > > > > We only have 2 users of volatile in include/uapi, one for WMI structures > > that are backed by firmware (seems correct), and one for DRM which I > > have no idea how it works as it claims to be a lock. Why is this new > > addition the correct way to do this that no other ring-buffer that was > > mmapped has needed to? > > Yeah, I understand your concern with this. > > The reason I put the volatiles in is that the kernel may be modifying the head > pointer on one CPU simultaneously with userspace modifying the tail pointer on > another CPU. > > Note that userspace does not need to enter the kernel to find out if there's > anything in the buffer or to read stuff out of the buffer. Userspace only > needs to enter the kernel, using poll() or similar, to wait for something to > appear in the buffer. And how does the tracing and perf ring buffers do this without needing volatile? Why not use the same type of interface they provide, as it's always good to share code that has already had all of the nasty corner cases worked out. Anyway, I don't want you to think I don't like this code/idea overall, I do. I just want to see it be something that everyone can use, and use easily, as it has been something lots of people have been asking for for a long time. thanks, greg k-h
On Wed, May 29, 2019 at 12:28 AM David Howells <dhowells@redhat.com> wrote: > Jann Horn <jannh@google.com> wrote: > > I don't see you setting any special properties on the VMA that would > > prevent userspace from extending its size via mremap() - no > > VM_DONTEXPAND or VM_PFNMAP. So I think you might get an out-of-bounds > > access here? > > Should I just set VM_DONTEXPAND in watch_queue_mmap()? Like so: > > vma->vm_flags |= VM_DONTEXPAND; Yeah, that should work. > > > +static void watch_queue_map_pages(struct vm_fault *vmf, > > > + pgoff_t start_pgoff, pgoff_t end_pgoff) > > ... > > > > Same as above. > > Same solution as above? Or do I need ot check start/end_pgoff too? Same solution. > > > +static int watch_queue_mmap(struct file *file, struct vm_area_struct *vma) > > > +{ > > > + struct watch_queue *wqueue = file->private_data; > > > + > > > + if (vma->vm_pgoff != 0 || > > > + vma->vm_end - vma->vm_start > wqueue->nr_pages * PAGE_SIZE || > > > + !(pgprot_val(vma->vm_page_prot) & pgprot_val(PAGE_SHARED))) > > > + return -EINVAL; > > > > This thing should probably have locking against concurrent > > watch_queue_set_size()? > > Yeah. > > static int watch_queue_mmap(struct file *file, > struct vm_area_struct *vma) > { > struct watch_queue *wqueue = file->private_data; > struct inode *inode = file_inode(file); > u8 nr_pages; > > inode_lock(inode); > nr_pages = wqueue->nr_pages; > inode_unlock(inode); > > if (nr_pages == 0 || > ... > return -EINVAL; Looks reasonable. > > > + smp_store_release(&buf->meta.head, len); > > > > Why is this an smp_store_release()? The entire buffer should not be visible to > > userspace before this setup is complete, right? > > Yes - if I put the above locking in the mmap handler. OTOH, it's a one-off > barrier... > > > > + if (wqueue->buffer) > > > + return -EBUSY; > > > > The preceding check occurs without any locks held and therefore has no > > reliable effect. It should probably be moved below the > > inode_lock(...). > > Yeah, it can race. I'll move it into watch_queue_set_size(). Sounds good. > > > +static void free_watch(struct rcu_head *rcu) > > > +{ > > > + struct watch *watch = container_of(rcu, struct watch, rcu); > > > + > > > + put_watch_queue(rcu_access_pointer(watch->queue)); > > > > This should be rcu_dereference_protected(..., 1). > > That shouldn't be necessary. rcu_access_pointer()'s description says: > > * It is also permissible to use rcu_access_pointer() when read-side > * access to the pointer was removed at least one grace period ago, as > * is the case in the context of the RCU callback that is freeing up > * the data, ... > > It's in an rcu callback function, so accessing the __rcu pointers in the RCU'd > struct should be fine with rcu_access_pointer(). Aah, whoops, you're right, I missed that paragraph in the documentation of rcu_access_pointer(). > > > + /* We don't need the watch list lock for the next bit as RCU is > > > + * protecting everything from being deallocated. > > > > Does "everything" mean "the wqueue" or more than that? > > Actually, just 'wqueue' and its buffer. 'watch' is held by us once we've > dequeued it as we now own the ref 'wlist' had on it. 'wlist' and 'wq' must be > pinned by the caller. > > > > + if (release) { > > > + if (wlist->release_watch) { > > > + rcu_read_unlock(); > > > + /* This might need to call dput(), so > > > + * we have to drop all the locks. > > > + */ > > > + wlist->release_watch(wlist, watch); > > > > How are you holding a reference to `wlist` here? You got the reference through > > rcu_dereference(), you've dropped the RCU read lock, and I don't see anything > > that stabilizes the reference. > > The watch record must hold a ref on the watched object if the watch_list has a > ->release_watch() method. In the code snippet above, the watch record now > belongs to us because we unlinked it under the wlist->lock some lines prior. Ah, of course. > However, you raise a good point, and I think the thing to do is to cache > ->release_watch from it and not pass wlist into (*release_watch)(). We don't > need to concern ourselves with cleaning up *wlist as it will be cleaned up > when the target object is removed. > > Keyrings don't have a ->release_watch method and neither does the block-layer > notification stuff. > > > > + if (wqueue->pages && wqueue->pages[0]) > > > + WARN_ON(page_ref_count(wqueue->pages[0]) != 1); > > > > Is there a reason why there couldn't still be references to the pages > > from get_user_pages()/get_user_pages_fast()? > > I'm not sure. I'm not sure what to do if there are. What do you suggest? I would use put_page() instead of manually freeing it; I think that should be enough? I'm not entirely sure though. > > > + n->info &= (WATCH_INFO_LENGTH | WATCH_INFO_TYPE_FLAGS | WATCH_INFO_ID); > > > > Should the non-atomic modification of n->info > > n's an unpublished copy of some userspace data that's local to the function > instance. There shouldn't be any way to race with it at this point. Ah, derp. Yes.
Greg KH <gregkh@linuxfoundation.org> wrote: > > kref_put() could potentially add an unnecessary extra stack frame and would > > seem to be best avoided, though an optimising compiler ought to be able to > > inline if it can. > > If kref_put() is on your fast path, you have worse problems (kfree isn't > fast, right?) > > Anyway, it's an inline function, how can it add an extra stack frame? The call to the function pointer. Hopefully the compiler will optimise that away for an inlineable function. > > Are you now on the convert all refcounts to krefs path? > > "now"? Remember, I wrote kref all those years ago, Yes - and I thought it wasn't a good idea at the time. But this is the first time you've mentioned it to me, let alone pushed to change to it, that I recall. > everyone should use > it. It saves us having to audit the same pattern over and over again. > And, even nicer, it uses a refcount now, and as you are trying to > reference count an object, it is exactly what this was written for. > > So yes, I do think it should be used here, unless it is deemed to not > fit the pattern/usage model. kref_put() enforces a very specific destructor signature. I know of places where that doesn't work because the destructor takes more than one argument (granted that this is not the case here). So why does kref_put() exist at all? Why not kref_dec_and_test()? Why doesn't refcount_t get merged into kref, or vice versa? Having both would seem redundant. Mind you, I've been gradually reverting atomic_t-to-refcount_t conversions because it seems I'm not allowed refcount_inc/dec_return() and I want to get at the point refcount for tracing purposes. > > > > +module_exit(watch_queue_exit); > > > > > > module_misc_device()? > > > > warthog>git grep module_misc_device -- Documentation/ > > warthog1> > > Do I have to document all helper macros? If you add an API, documenting it is your privilege ;-) It's an important test of the API - if you can't describe it, it's probably wrong. Now I will grant that you didn't add that function... > Anyway, it saves you boilerplate code, but if built in, it's at the module > init level, not the fs init level, like you are asking for here. So that > might not work, it's your call. Actually, I probably shouldn't have a module exit function. It can't be a module as it's called by core code. I'll switch to builtin_misc_device(). > And how does the tracing and perf ring buffers do this without needing > volatile? Why not use the same type of interface they provide, as it's > always good to share code that has already had all of the nasty corner > cases worked out. I've no idea how trace does it - or even where - or even if. As far as I can see, grepping for mmap in kernel/trace/*, there's no mmap support. Reading Documentation/trace/ring-buffer-design.txt the trace subsystem has some sort of transient page fifo which is a lot more complicated than what I want and doesn't look like it'll be mmap'able. Looking at the perf ring buffer, there appears to be a missing barrier in perf_aux_output_end(): rb->user_page->aux_head = rb->aux_head; should be: smp_store_release(&rb->user_page->aux_head, rb->aux_head); It should also be using smp_load_acquire(). See Documentation/core-api/circular-buffers.rst And a (partial) patch has been proposed: https://lkml.org/lkml/2018/5/10/249 David
On Wed, May 29, 2019 at 6:07 PM David Howells <dhowells@redhat.com> wrote: > Greg KH <gregkh@linuxfoundation.org> wrote: > > everyone should use > > it. It saves us having to audit the same pattern over and over again. > > And, even nicer, it uses a refcount now, and as you are trying to > > reference count an object, it is exactly what this was written for. > > > > So yes, I do think it should be used here, unless it is deemed to not > > fit the pattern/usage model. > > kref_put() enforces a very specific destructor signature. I know of places > where that doesn't work because the destructor takes more than one argument > (granted that this is not the case here). So why does kref_put() exist at > all? Why not kref_dec_and_test()? > > Why doesn't refcount_t get merged into kref, or vice versa? Having both would > seem redundant. > > Mind you, I've been gradually reverting atomic_t-to-refcount_t conversions > because it seems I'm not allowed refcount_inc/dec_return() and I want to get > at the point refcount for tracing purposes. Yeeech, that's horrible, please don't do that. Does this mean that refcount_read() isn't sufficient for what you want to do with tracing (because for some reason you actually need to know the values atomically at the time of increment/decrement)?
Jann Horn <jannh@google.com> wrote: > Does this mean that refcount_read() isn't sufficient for what you want > to do with tracing (because for some reason you actually need to know > the values atomically at the time of increment/decrement)? Correct. There's a gap and if an interrupt or something occurs, it's sufficiently big for the refcount trace to go weird. I've seen it in afs/rxrpc where the incoming network packets that are part of the rxrpc call flow disrupt the refcounts noted in trace lines. David
On Wed, May 29, 2019 at 05:06:40PM +0100, David Howells wrote: > Greg KH <gregkh@linuxfoundation.org> wrote: > > > > kref_put() could potentially add an unnecessary extra stack frame and would > > > seem to be best avoided, though an optimising compiler ought to be able to > > > inline if it can. > > > > If kref_put() is on your fast path, you have worse problems (kfree isn't > > fast, right?) > > > > Anyway, it's an inline function, how can it add an extra stack frame? > > The call to the function pointer. Hopefully the compiler will optimise that > away for an inlineable function. The function pointer only gets called for the last "put", and then kfree will be called so you should not have to worry about speed/stack frames at that point in time. > > > Are you now on the convert all refcounts to krefs path? > > > > "now"? Remember, I wrote kref all those years ago, > > Yes - and I thought it wasn't a good idea at the time. But this is the first > time you've mentioned it to me, let alone pushed to change to it, that I > recall. I bring up using a kref any time I see a usage that could use it as it makes it easier for people to understand and "know" you are doing your reference counting for your object "correctly". It's an abstraction that is used to make it easier for us developers to understand. Otherwise you have to hand-roll the same logic here. Yes, refcounts have made it easier to do it in your own (which was their goal), but you still don't have to do it "on your own". Anyway, I'll not push the issue here, if you want to stick to a refcount_t, that's enough for now. We can worry about changing this later after you have debugged all the corner conditions :) > > everyone should use > > it. It saves us having to audit the same pattern over and over again. > > And, even nicer, it uses a refcount now, and as you are trying to > > reference count an object, it is exactly what this was written for. > > > > So yes, I do think it should be used here, unless it is deemed to not > > fit the pattern/usage model. > > kref_put() enforces a very specific destructor signature. I know of places > where that doesn't work because the destructor takes more than one argument > (granted that this is not the case here). So why does kref_put() exist at > all? Why not kref_dec_and_test()? The destructor only takes one object pointer as you are finally freeing that object. What more do you need/want to "know" at that point in time? What would kref_dec_and_test() be needed for? > Why doesn't refcount_t get merged into kref, or vice versa? Having both would > seem redundant. kref uses refcount_t and provides a different functionality on top of it. Not all uses of a refcount in the kernel is for object lifecycle reference counting, as you know :) > Mind you, I've been gradually reverting atomic_t-to-refcount_t conversions > because it seems I'm not allowed refcount_inc/dec_return() and I want to get > at the point refcount for tracing purposes. That's not good, we should address that independently as you are loosing functionality/protection when doing that. thanks, greg k-h
On Wed, May 29, 2019 at 05:06:40PM +0100, David Howells wrote: > Greg KH <gregkh@linuxfoundation.org> wrote: > > And how does the tracing and perf ring buffers do this without needing > > volatile? Why not use the same type of interface they provide, as it's > > always good to share code that has already had all of the nasty corner > > cases worked out. > > I've no idea how trace does it - or even where - or even if. As far as I can > see, grepping for mmap in kernel/trace/*, there's no mmap support. > > Reading Documentation/trace/ring-buffer-design.txt the trace subsystem has > some sort of transient page fifo which is a lot more complicated than what I > want and doesn't look like it'll be mmap'able. > > Looking at the perf ring buffer, there appears to be a missing barrier in > perf_aux_output_end(): > > rb->user_page->aux_head = rb->aux_head; > > should be: > > smp_store_release(&rb->user_page->aux_head, rb->aux_head); > > It should also be using smp_load_acquire(). See > Documentation/core-api/circular-buffers.rst > > And a (partial) patch has been proposed: https://lkml.org/lkml/2018/5/10/249 So, if that's all that needs to be fixed, can you use the same buffer/code if that patch is merged? thanks, greg k-h
> > Looking at the perf ring buffer, there appears to be a missing barrier in > > perf_aux_output_end(): > > > > rb->user_page->aux_head = rb->aux_head; > > > > should be: > > > > smp_store_release(&rb->user_page->aux_head, rb->aux_head); > > > > It should also be using smp_load_acquire(). See > > Documentation/core-api/circular-buffers.rst > > > > And a (partial) patch has been proposed: https://lkml.org/lkml/2018/5/10/249 > > So, if that's all that needs to be fixed, can you use the same > buffer/code if that patch is merged? That's about one year old...: let me add the usual suspects in Cc: ;-) since I'm not sure what the plan was (or if I'm missing something) ... Speaking of ring buffer implementations (and relatively "old" patches), here's another quite interesting: https://lkml.kernel.org/r/20181211034032.32338-1-yuleixzhang@tencent.com Thanks, Andrea
On Thu, May 30, 2019 at 11:50:39AM +0200, Andrea Parri wrote: > > > Looking at the perf ring buffer, there appears to be a missing barrier in > > > perf_aux_output_end(): > > > > > > rb->user_page->aux_head = rb->aux_head; > > > > > > should be: > > > > > > smp_store_release(&rb->user_page->aux_head, rb->aux_head); > > > > > > It should also be using smp_load_acquire(). See > > > Documentation/core-api/circular-buffers.rst > > > > > > And a (partial) patch has been proposed: https://lkml.org/lkml/2018/5/10/249 > > > > So, if that's all that needs to be fixed, can you use the same > > buffer/code if that patch is merged? > > That's about one year old...: let me add the usual suspects in Cc: ;-) > since I'm not sure what the plan was (or if I'm missing something) ... The AUX crud is 'special' and smp_store_release() doesn't really help in many cases. Notable, AUX is typically used in combination with a hardware writer. The driver is in charge of odering here, the generic code doesn't know what the appropriate barrier (if any) is and would have to resort to the most expensive/heavy one available. Also see the comment right above this function: "It is the pmu driver's responsibility to observe ordering rules of the hardware, so that all the data is externally visible before this is called."
On Wed, May 29, 2019 at 05:06:40PM +0100, David Howells wrote: > Looking at the perf ring buffer, there appears to be a missing barrier in > perf_aux_output_end(): > > rb->user_page->aux_head = rb->aux_head; > > should be: > > smp_store_release(&rb->user_page->aux_head, rb->aux_head); I've answered that in another email; the aux bit is 'magic'. > It should also be using smp_load_acquire(). See > Documentation/core-api/circular-buffers.rst We use the control dependency instead, as described in the comment of perf_output_put_handle(): * kernel user * * if (LOAD ->data_tail) { LOAD ->data_head * (A) smp_rmb() (C) * STORE $data LOAD $data * smp_wmb() (B) smp_mb() (D) * STORE ->data_head STORE ->data_tail * } * * Where A pairs with D, and B pairs with C. * * In our case (A) is a control dependency that separates the load of * the ->data_tail and the stores of $data. In case ->data_tail * indicates there is no room in the buffer to store $data we do not. * * D needs to be a full barrier since it separates the data READ * from the tail WRITE. * * For B a WMB is sufficient since it separates two WRITEs, and for C * an RMB is sufficient since it separates two READs. Userspace can choose to use smp_load_acquire() over the first smp_rmb() if that is efficient for the architecture (for w ahole bunch of archs load-acquire would end up using mb() while rmb() is adequate and cheaper).
On Wed, May 29, 2019 at 10:02:43PM +0100, David Howells wrote: > Jann Horn <jannh@google.com> wrote: > > > Does this mean that refcount_read() isn't sufficient for what you want > > to do with tracing (because for some reason you actually need to know > > the values atomically at the time of increment/decrement)? > > Correct. There's a gap and if an interrupt or something occurs, it's > sufficiently big for the refcount trace to go weird. > > I've seen it in afs/rxrpc where the incoming network packets that are part of > the rxrpc call flow disrupt the refcounts noted in trace lines. Can you re-iterate the exact problem? I konw we talked about this in the past, but I seem to have misplaced those memories :/ FWIW I agree that kref is useless fluff, but I've long ago given up on that fight.
Peter Zijlstra <peterz@infradead.org> wrote: > Can you re-iterate the exact problem? I konw we talked about this in the > past, but I seem to have misplaced those memories :/ Take this for example: void afs_put_call(struct afs_call *call) { struct afs_net *net = call->net; int n = atomic_dec_return(&call->usage); int o = atomic_read(&net->nr_outstanding_calls); trace_afs_call(call, afs_call_trace_put, n + 1, o, __builtin_return_address(0)); ASSERTCMP(n, >=, 0); if (n == 0) { ... } } I am printing the usage count in the afs_call tracepoint so that I can use it to debug refcount bugs. If I do it like this: void afs_put_call(struct afs_call *call) { int n = refcount_read(&call->usage); int o = atomic_read(&net->nr_outstanding_calls); trace_afs_call(call, afs_call_trace_put, n, o, __builtin_return_address(0)); if (refcount_dec_and_test(&call->usage)) { ... } } then there's a temporal gap between the usage count being read and the actual atomic decrement in which another CPU can alter the count. This can be exacerbated by an interrupt occurring, a softirq occurring or someone enabling the tracepoint. I can't do the tracepoint after the decrement if refcount_dec_and_test() returns false unless I save all the values from the object that I might need as the object could be destroyed any time from that point on. In this particular case, that's just call->debug_id, but it could be other things in other cases. Note that I also can't touch the afs_net object in that situation either, and the outstanding calls count that I record will potentially be out of date - but there's not a lot I can do about that. David
Greg KH <gregkh@linuxfoundation.org> wrote: > > kref_put() enforces a very specific destructor signature. I know of places > > where that doesn't work because the destructor takes more than one argument > > (granted that this is not the case here). So why does kref_put() exist at > > all? Why not kref_dec_and_test()? > > The destructor only takes one object pointer as you are finally freeing > that object. What more do you need/want to "know" at that point in > time? Imagine that I have an object that's on a list rooted in a namespace and that I have a lot of these objects. Imagine further that any time I want to put a ref on one of these objects, it's in a context that has the namespace pinned. I therefore don't need to store a pointer to the namespace in every object because I can pass that in to the put function Indeed, I can still access the namespace even after the decrement didn't reduce the usage count to 0 - say for doing statistics. > What would kref_dec_and_test() be needed for? Why do you need kref_put() to take a destructor function pointer? Why cannot that be replaced with, say: static inline bool __kref_put(struct kref *k) { return refcount_dec_and_test(&k->refcount); } and then one could do: void put_foo(struct foo_net *ns, struct foo *f) { if (__kref_put(&f->refcount)) { // destroy foo } } that way the destruction code does not have to be offloaded into its own function and you still have your pattern to look for. For tracing purposes, I could live with something like: static inline bool __kref_put_return(struct kref *k, unsigned int *_usage) { return refcount_dec_and_test_return(&k->refcount, _usage); } and then I could do: void put_foo(struct foo_net *ns, struct foo *f) { unsigned int u; bool is_zero = __kref_put_return(&f->refcount, &u); trace_foo_refcount(f, u); if (is_zero) { // destroy foo } } then it could be made such that you can disable the ability of refcount_dec_and_test_return() to pass back a useful refcount value if you want a bit of extra speed. Or even if refcount_dec_return() is guaranteed to return 0 if the count hits the floor and non-zero otherwise and there's a config switch to impose a stronger guarantee that it will return a value that's appropriately transformed to look as if I was using atomic_dec_return(). Similarly for refcount_inc_return() - it could just return gibberish unless the same config switch is enabled. Question for AMD/Intel guys: I'm curious if LOCK DECL faster than LOCK XADD -1 on x86_64? > > Why doesn't refcount_t get merged into kref, or vice versa? Having both > > would seem redundant. > > kref uses refcount_t and provides a different functionality on top of > it. Not all uses of a refcount in the kernel is for object lifecycle > reference counting, as you know :) I do? I can't think of one offhand. Not that I'm saying you're wrong on that - there's an awful lot of kernel. David
On Fri, May 31, 2019 at 01:02:15PM +0100, David Howells wrote: > Peter Zijlstra <peterz@infradead.org> wrote: > > > Can you re-iterate the exact problem? I konw we talked about this in the > > past, but I seem to have misplaced those memories :/ > > Take this for example: > > void afs_put_call(struct afs_call *call) > { > struct afs_net *net = call->net; > int n = atomic_dec_return(&call->usage); > int o = atomic_read(&net->nr_outstanding_calls); > > trace_afs_call(call, afs_call_trace_put, n + 1, o, > __builtin_return_address(0)); > > ASSERTCMP(n, >=, 0); > if (n == 0) { > ... > } > } > > I am printing the usage count in the afs_call tracepoint so that I can use it > to debug refcount bugs. If I do it like this: > > void afs_put_call(struct afs_call *call) > { > int n = refcount_read(&call->usage); > int o = atomic_read(&net->nr_outstanding_calls); > > trace_afs_call(call, afs_call_trace_put, n, o, > __builtin_return_address(0)); > > if (refcount_dec_and_test(&call->usage)) { > ... > } > } > > then there's a temporal gap between the usage count being read and the actual > atomic decrement in which another CPU can alter the count. This can be > exacerbated by an interrupt occurring, a softirq occurring or someone enabling > the tracepoint. > > I can't do the tracepoint after the decrement if refcount_dec_and_test() > returns false unless I save all the values from the object that I might need > as the object could be destroyed any time from that point on. Is it not the responsibility of the task that affects the 1->0 transition to actually free the memory? That is, I'm expecting the '...' in both cases above the include the actual freeing of the object. If this is not the case, then @usage is not a reference count. (and it has already been established that refcount_t doesn't work for usage count scenarios) Aside from that, is the problem that refcount_dec_and_test() returns a boolean (true - last put, false - not last) instead of the refcount value? This does indeed make it hard to print the exact count value for the event.
Peter Zijlstra <peterz@infradead.org> wrote: > Is it not the responsibility of the task that affects the 1->0 > transition to actually free the memory? > > That is, I'm expecting the '...' in both cases above the include the > actual freeing of the object. If this is not the case, then @usage is > not a reference count. Yes. The '...' does the freeing. It seemed unnecessary to include the code ellipsised there since it's not the point of the discussion, but if you want the full function: void afs_put_call(struct afs_call *call) { struct afs_net *net = call->net; int n = atomic_dec_return(&call->usage); int o = atomic_read(&net->nr_outstanding_calls); trace_afs_call(call, afs_call_trace_put, n + 1, o, __builtin_return_address(0)); ASSERTCMP(n, >=, 0); if (n == 0) { ASSERT(!work_pending(&call->async_work)); ASSERT(call->type->name != NULL); if (call->rxcall) { rxrpc_kernel_end_call(net->socket, call->rxcall); call->rxcall = NULL; } if (call->type->destructor) call->type->destructor(call); afs_put_server(call->net, call->server); afs_put_cb_interest(call->net, call->cbi); afs_put_addrlist(call->alist); kfree(call->request); trace_afs_call(call, afs_call_trace_free, 0, o, __builtin_return_address(0)); kfree(call); o = atomic_dec_return(&net->nr_outstanding_calls); if (o == 0) wake_up_var(&net->nr_outstanding_calls); } } You can see the kfree(call) in there. Peter Zijlstra <peterz@infradead.org> wrote: > (and it has already been established that refcount_t doesn't work for > usage count scenarios) ? Does that mean struct kref doesn't either? > Aside from that, is the problem that refcount_dec_and_test() returns a > boolean (true - last put, false - not last) instead of the refcount > value? This does indeed make it hard to print the exact count value for > the event. That is the problem, yes - well, one of them: refcount_inc() doesn't either. David
Greg KH <gregkh@linuxfoundation.org> wrote: > So, if that's all that needs to be fixed, can you use the same > buffer/code if that patch is merged? I really don't know. The perf code is complex, partially in hardware drivers and is tricky to understand - though a chunk of that is the "aux" buffer part; PeterZ used words like "special" and "magic" and the comments in the code talk about the hardware writing into the buffer. __perf_output_begin() does not appear to be SMP safe. It uses local_cmpxchg() and local_add() which on x86 lack the LOCK prefix. stracing the perf command on my test machine, it calls perf_event_open(2) four times and mmap's each fd it gets back. I'm guessing that each one maps a separate buffer for each CPU. So to use watch_queue based on perf's buffering, you would have to have a (2^N)+1 pages-sized buffer for each CPU. So that would be a minimum of 64K of unswappable memory for my desktop machine, say). Multiply that by each process that wants to listen for events... What I'm aiming for is something that has a single buffer used by all CPUs for each instance of /dev/watch_queue opened and I'd also like to avoid having to allocate the metadata page and the aux buffer to save space. This is locked memory and cannot be swapped. Also, perf has to leave a gap in the ring because it uses CIRC_SPACE(), though that's a minor detail that I guess can't be fixed now. I'm also slightly concerned that __perf_output_begin() doesn't check if rb->user->tail has got ahead of rb->user->head or that it's lagging too far behind. I doubt it's a serious problem for the kernel since it won't write outside of the buffer, but userspace might screw up. I think the worst that will happen is that userspace will get confused. One thing I would like is to waive the 2^N size requirement. I understand *why* we do that, but I wonder how expensive DIV instructions are for relatively small divisors. David
On Fri, May 31, 2019 at 03:20:12PM +0100, David Howells wrote: > Peter Zijlstra <peterz@infradead.org> wrote: > > (and it has already been established that refcount_t doesn't work for > > usage count scenarios) > > ? > > Does that mean struct kref doesn't either? Indeed, since kref is just a pointless wrapper around refcount_t it does not either. The main distinction between a reference count and a usage count is that 0 means different things. For a refcount 0 means dead. For a usage count 0 is merely unused but valid. Incrementing a 0 refcount is a serious bug -- use-after-free (and hence refcount_t will refuse this and splat), for a usage count this is no problem. Now, it is sort-of possible to merge the two, by basically stating something like: usage = refcount - 1. But that can get tricky and people have not really liked the result much for the few times I tried.
Peter Zijlstra <peterz@infradead.org> wrote: > > > (and it has already been established that refcount_t doesn't work for > > > usage count scenarios) > > > > ? > > > > Does that mean struct kref doesn't either? > > Indeed, since kref is just a pointless wrapper around refcount_t it does > not either. > > The main distinction between a reference count and a usage count is that > 0 means different things. For a refcount 0 means dead. For a usage count > 0 is merely unused but valid. Ah - I consider the terms interchangeable. Take Documentation/filesystems/vfs.txt for instance: dget: open a new handle for an existing dentry (this just increments the usage count) dput: close a handle for a dentry (decrements the usage count). ... ... d_lookup: look up a dentry given its parent and path name component It looks up the child of that given name from the dcache hash table. If it is found, the reference count is incremented and the dentry is returned. The caller must use dput() to free the dentry when it finishes using it. Here we interchange the terms. Or https://www.kernel.org/doc/gorman/html/understand/understand013.html which seems to interchange the terms in reference to struct page. David
On Fri, May 31, 2019 at 06:12:42PM +0100, David Howells wrote: > Peter Zijlstra <peterz@infradead.org> wrote: > > > > > (and it has already been established that refcount_t doesn't work for > > > > usage count scenarios) > > > > > > ? > > > > > > Does that mean struct kref doesn't either? > > > > Indeed, since kref is just a pointless wrapper around refcount_t it does > > not either. > > > > The main distinction between a reference count and a usage count is that > > 0 means different things. For a refcount 0 means dead. For a usage count > > 0 is merely unused but valid. > > Ah - I consider the terms interchangeable. > > Take Documentation/filesystems/vfs.txt for instance: > > dget: open a new handle for an existing dentry (this just increments > the usage count) > > dput: close a handle for a dentry (decrements the usage count). ... > > ... > > d_lookup: look up a dentry given its parent and path name component > It looks up the child of that given name from the dcache > hash table. If it is found, the reference count is incremented > and the dentry is returned. The caller must use dput() > to free the dentry when it finishes using it. > > Here we interchange the terms. > > Or https://www.kernel.org/doc/gorman/html/understand/understand013.html > which seems to interchange the terms in reference to struct page. Right, but we have two distinct set of semantics, I figured it makes sense to have two different names for them. Do you have an alternative naming scheme we could use? Or should we better document our distinction between reference and usage count?
diff --git a/Documentation/watch_queue.rst b/Documentation/watch_queue.rst new file mode 100644 index 000000000000..01fe937092d6 --- /dev/null +++ b/Documentation/watch_queue.rst @@ -0,0 +1,311 @@ +============================ +Mappable notifications queue +============================ + +This is a misc device that acts as a mapped ring buffer by which userspace can +receive notifications from the kernel. This is can be used in conjunction +with:: + + * Key/keyring notifications + + * Mount topology change notifications + + * Superblock event notifications + + +The notifications buffers can be enabled by: + + "Device Drivers"/"Misc devices"/"Mappable notification queue" + (CONFIG_WATCH_QUEUE) + +This document has the following sections: + +.. contents:: :local: + + +Overview +======== + +This facility appears as a misc device file that is opened and then mapped and +polled. Each time it is opened, it creates a new buffer specific to the +returned file descriptor. Then, when the opening process sets watches, it +indicates that particular buffer it wants notifications from that watch to be +written into. Note that there are no read() and write() methods (except for +debugging). The user is expected to access the ring directly and to use poll +to wait for new data. + +If a watch is in place, notifications are only written into the buffer if the +filter criteria are passed and if there's sufficient space available in the +ring. If neither of those is so, a notification will be discarded. In the +latter case, an overrun indicator will also be set. + +Note that when producing a notification, the kernel does not wait for the +consumers to collect it, but rather just continues on. This means that +notifications can be generated whilst spinlocks are held and also protects the +kernel from being held up indefinitely by a userspace malfunction. + +As far as the ring goes, the head index belongs to the kernel and the tail +index belongs to userspace. The kernel will refuse to write anything if the +tail index becomes invalid. Userspace *must* use appropriate memory barriers +between reading or updating the tail index and reading the ring. + + +Record Structure +================ + +Notification records in the ring may occupy a variable number of slots within +the buffer, beginning with a 1-slot header:: + + struct watch_notification { + __u16 type; + __u16 subtype; + __u32 info; + }; + +"type" indicates the source of the notification record and "subtype" indicates +the type of record from that source (see the Watch Sources section below). The +type may also be "WATCH_TYPE_META". This is a special record type generated +internally by the watch queue driver itself. There are two subtypes, one of +which indicates records that should be just skipped (padding or metadata): + + * WATCH_META_SKIP_NOTIFICATION + * WATCH_META_REMOVAL_NOTIFICATION + +The former indicates a record that should just be skipped and the latter +indicates that an object on which a watchpoint was installed was removed or +destroyed. + +"info" indicates a bunch of things, including: + + * The length of the record (mask with WATCH_INFO_LENGTH). This indicates the + size of the record, which may be between 1 and 63 slots. Note that this is + placed appropriately within the info value so that no shifting is required + to convert number of occupied slots to byte length. + + * The watchpoint ID (mask with WATCH_INFO_ID). This indicates that caller's + ID of the watchpoint, which may be between 0 and 255. Multiple watchpoints + may share a queue, and this provides a means to distinguish them. + + * A buffer overrun flag (WATCH_INFO_OVERRUN flag). If this is set in a + notification record, some of the preceding records were discarded. + + * An ENOMEM-loss flag (WATCH_INFO_ENOMEM flag). This is set to indicate that + an event was lost to ENOMEM. + + * A recursive-change flag (WATCH_INFO_RECURSIVE flag). This is set to + indicate that the change that happened was recursive - for instance + changing the attributes on an entire mount subtree. + + * An exact-match flag (WATCH_INFO_IN_SUBTREE flag). This is set if the event + didn't happen exactly at the watchpoint, but rather somewhere in the + subtree thereunder. + + * Some type-specific flags (WATCH_INFO_TYPE_FLAGS). These are set by the + notification producer to indicate some meaning to the kernel. + +Everything in info apart from the length can be used for filtering. + + +Ring Structure +============== + +The ring is divided into 8-byte slots. The caller uses an ioctl() to set the +size of the ring after opening and this must be a power-of-2 multiple of the +system page size (so that the mask can be used with AND). + +The head and tail indices are stored in the first two slots in the ring, which +are marked out as a skippable entry:: + + struct watch_queue_buffer { + union { + struct { + struct watch_notification watch; + volatile __u32 head; + volatile __u32 tail; + __u32 mask; + } meta; + struct watch_notification slots[0]; + }; + }; + +In "meta.watch", type will be set to WATCH_TYPE_META and subtype to +WATCH_META_SKIP_NOTIFICATION so that anyone processing the buffer will just +skip this record. Also, because this record is here, records cannot wrap round +the end of the buffer, so a skippable padding element will be inserted at the +end of the buffer if needed. Thus the contents of a notification record in the +buffer are always contiguous. + +"meta.mask" is an AND'able mask to turn the index counters into slots array +indices. + +The buffer is empty if "meta.head" == "meta.tail". + +[!] NOTE that the ring indices "meta.head" and "meta.tail" are indices into +"slots[]" not byte offsets into the buffer. + +[!] NOTE that userspace must never change the head pointer. This belongs to +the kernel and will be updated by that. The kernel will never change the tail +pointer. + +[!] NOTE that userspace must never AND-off the tail pointer before updating it, +but should just keep adding to it and letting it wrap naturally. The value +*should* be masked off when used as an index into slots[]. + +[!] NOTE that if the distance between head and tail becomes too great, the +kernel will assume the buffer is full and write no more until the issue is +resolved. + + +Watch Sources +============= + +Any particular buffer can be fed from multiple sources. Sources include: + + * WATCH_TYPE_MOUNT_NOTIFY + + Notifications of this type indicate mount tree topology changes and mount + attribute changes. A watchpoint can be set on a particular file or + directory and notifications from the path subtree rooted at that point will + be intercepted. + + * WATCH_TYPE_SB_NOTIFY + + Notifications of this type indicate superblock events, such as quota limits + being hit, I/O errors being produced or network server loss/reconnection. + Watchpoints of this type are set directly on superblocks. + + * WATCH_TYPE_KEY_NOTIFY + + Notifications of this type indicate changes to keys and keyrings, including + the changes of keyring contents or the attributes of keys. + + See Documentation/security/keys/core.rst for more information. + + * WATCH_TYPE_BLOCK_NOTIFY + + Notifications of this type indicate block layer events, such as I/O errors + or temporary link loss. Watchpoints of this type are set on a global + queue. + + +Configuring Watchpoints +======================= + +When a watchpoint is set up, the caller assigns an ID and can set filtering +parameters. The following structure is filled out and passed to the +watchpoint creation system call:: + + struct watch_notification_filter { + __u64 subtype_filter[4]; + __u32 info_filter; + __u32 info_mask; + __u32 info_id; + __u32 __reserved; + }; + +"subtype_filter" is a bitmask indicating the subtypes that are of interest. In +this version of the structure, only the first 256 subtypes are supported. Bit +0 of subtype_filter[0] corresponds to subtype 0, bit 1 to subtype 1, and so on. + +"info_filter" and "info_mask" act as a filter on the info field of the +notification record. The notification is only written into the buffer if:: + + (watch.info & info_mask) == info_filter + +This can be used, for example, to ignore events that are not exactly on the +watched point in a mount tree by specifying WATCH_INFO_IN_SUBTREE must be 0. + +"info_id" is OR'd into watch.info. This indicates the watchpoint ID in the top +8 bits. All bits outside of WATCH_INFO_ID must be 0. + +"__reserved" must be 0. + +If the pointer to this structure is NULL, this indicates to the system call +that the watchpoint should be removed. + + +Polling +======= + +The file descriptor that holds the buffer may be used with poll() and similar. +POLLIN and POLLRDNORM are set if the buffer indices differ. POLLERR is set if +the buffer indices are further apart than the size of the buffer. Wake-up +events are only generated if the buffer is transitioned from an empty state. + + +Example +======= + +A buffer is created with something like the following:: + + fd = open("/dev/watch_queue", O_RDWR); + + #define BUF_SIZE 4 + ioctl(fd, IOC_WATCH_QUEUE_SET_SIZE, BUF_SIZE); + + page_size = sysconf(_SC_PAGESIZE); + buf = mmap(NULL, BUF_SIZE * page_size, + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + +It can then be set to receive mount topology change notifications, keyring +change notifications and superblock notifications:: + + memset(&filter, 0, sizeof(filter)); + filter.subtype_filter[0] = ~0ULL; + filter.info_mask = WATCH_INFO_IN_SUBTREE; + filter.info_filter = 0; + filter.info_id = 0x01000000; + + keyctl(KEYCTL_WATCH_KEY, KEY_SPEC_SESSION_KEYRING, fd, &filter); + + mount_notify(AT_FDCWD, "/", 0, fd, &filter); + + sb_notify(AT_FDCWD, "/", 0, fd, &filter); + +The notifications can then be consumed by something like the following:: + + extern void saw_mount_change(struct watch_notification *n); + extern void saw_key_change(struct watch_notification *n); + + static int consumer(int fd, struct watch_queue_buffer *buf) + { + struct watch_notification *n; + struct pollfd p[1]; + unsigned int head, tail, mask = buf->meta.mask; + + for (;;) { + p[0].fd = fd; + p[0].events = POLLIN | POLLERR; + p[0].revents = 0; + + if (poll(p, 1, -1) == -1 || p[0].revents & POLLERR) + goto went_wrong; + + while (head = _atomic_load_acquire(buf->meta.head), + tail = buf->meta.tail, + tail != head + ) { + n = &buf->slots[tail & mask]; + if ((n->info & WATCH_INFO_LENGTH) == 0) + goto went_wrong; + + switch (n->type) { + case WATCH_TYPE_MOUNT_NOTIFY: + saw_mount_change(n); + break; + case WATCH_TYPE_KEY_NOTIFY: + saw_key_change(n); + break; + } + + tail += (n->info & WATCH_INFO_LENGTH) >> WATCH_LENGTH_SHIFT; + _atomic_store_release(buf->meta.tail, tail); + } + } + + went_wrong: + return 0; + } + +Note the memory barriers when loading the head pointer and storing the tail +pointer! diff --git a/drivers/misc/Kconfig b/drivers/misc/Kconfig index 6a0365b2332c..19668c0ebe03 100644 --- a/drivers/misc/Kconfig +++ b/drivers/misc/Kconfig @@ -4,6 +4,19 @@ menu "Misc devices" +config WATCH_QUEUE + bool "Mappable notification queue" + default n + depends on MMU + help + This is a general notification queue for the kernel to pass events to + userspace through a mmap()'able ring buffer. It can be used in + conjunction with watches for mount topology change notifications, + superblock change notifications and key/keyring change notifications. + + Note that in theory this should work fine with NOMMU, but I'm not + sure how to make that work. + config SENSORS_LIS3LV02D tristate depends on INPUT diff --git a/drivers/misc/Makefile b/drivers/misc/Makefile index b9affcdaa3d6..bf16acd9f8cc 100644 --- a/drivers/misc/Makefile +++ b/drivers/misc/Makefile @@ -3,6 +3,7 @@ # Makefile for misc devices that really don't fit anywhere else. # +obj-$(CONFIG_WATCH_QUEUE) += watch_queue.o obj-$(CONFIG_IBM_ASM) += ibmasm/ obj-$(CONFIG_IBMVMC) += ibmvmc.o obj-$(CONFIG_AD525X_DPOT) += ad525x_dpot.o diff --git a/drivers/misc/watch_queue.c b/drivers/misc/watch_queue.c new file mode 100644 index 000000000000..39a09ea15d97 --- /dev/null +++ b/drivers/misc/watch_queue.c @@ -0,0 +1,877 @@ +/* User-mappable watch queue + * + * Copyright (C) 2018 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public Licence + * as published by the Free Software Foundation; either version + * 2 of the Licence, or (at your option) any later version. + * + * See Documentation/watch_queue.rst + */ + +#define pr_fmt(fmt) "watchq: " fmt +#include <linux/module.h> +#include <linux/init.h> +#include <linux/sched.h> +#include <linux/slab.h> +#include <linux/printk.h> +#include <linux/miscdevice.h> +#include <linux/fs.h> +#include <linux/mm.h> +#include <linux/pagemap.h> +#include <linux/poll.h> +#include <linux/uaccess.h> +#include <linux/vmalloc.h> +#include <linux/file.h> +#include <linux/security.h> +#include <linux/cred.h> +#include <linux/watch_queue.h> + +#define DEBUG_WITH_WRITE /* Allow use of write() to record notifications */ + +MODULE_DESCRIPTION("Watch queue"); +MODULE_AUTHOR("Red Hat, Inc."); +MODULE_LICENSE("GPL"); + +struct watch_type_filter { + enum watch_notification_type type; + __u32 subtype_filter[1]; /* Bitmask of subtypes to filter on */ + __u32 info_filter; /* Filter on watch_notification::info */ + __u32 info_mask; /* Mask of relevant bits in info_filter */ +}; + +struct watch_filter { + union { + struct rcu_head rcu; + unsigned long type_filter[2]; /* Bitmask of accepted types */ + }; + u32 nr_filters; /* Number of filters */ + struct watch_type_filter filters[]; +}; + +struct watch_queue { + struct rcu_head rcu; + struct address_space mapping; + const struct cred *cred; /* Creds of the owner of the queue */ + struct watch_filter __rcu *filter; + wait_queue_head_t waiters; + struct hlist_head watches; /* Contributory watches */ + refcount_t usage; + spinlock_t lock; + bool defunct; /* T when queues closed */ + u8 nr_pages; /* Size of pages[] */ + u8 flag_next; /* Flag to apply to next item */ +#ifdef DEBUG_WITH_WRITE + u8 debug; +#endif + u32 size; + struct watch_queue_buffer *buffer; /* Pointer to first record */ + + /* The mappable pages. The zeroth page holds the ring pointers. */ + struct page **pages; +}; + +/** + * post_one_notification - Post an event notification to one queue + * @wqueue: The watch queue to add the event to. + * @n: The notification record to post. + * @cred: The credentials to use in security checks. + * + * Post a notification of an event into an mmap'd queue and let the user know. + * Returns true if successful and false on failure (eg. buffer overrun or + * userspace mucked up the ring indices). + * + * + * The size of the notification should be set in n->flags & WATCH_LENGTH and + * should be in units of sizeof(*n). + */ +static bool post_one_notification(struct watch_queue *wqueue, + struct watch_notification *n, + const struct cred *cred) +{ + struct watch_queue_buffer *buf = wqueue->buffer; + unsigned int metalen = sizeof(buf->meta) / sizeof(buf->slots[0]); + unsigned int size = wqueue->size, mask = size - 1; + unsigned int len; + unsigned int ring_tail, tail, head, used, segment, h; + + if (!buf) + return false; + + len = (n->info & WATCH_INFO_LENGTH) >> WATCH_LENGTH_SHIFT; + if (len == 0) + return false; + + spin_lock_bh(&wqueue->lock); /* Protect head pointer */ + + if (wqueue->defunct || + security_post_notification(wqueue->cred, cred, n) < 0) + goto out; + + ring_tail = READ_ONCE(buf->meta.tail); + head = READ_ONCE(buf->meta.head); + used = head - ring_tail; + + /* Check to see if userspace mucked up the pointers */ + if (used >= size) + goto overrun; + tail = ring_tail & mask; + if (tail > 0 && tail < metalen) + goto overrun; + + h = head & mask; + if (h >= tail) { + /* Head is at or after tail in the buffer. There may then be + * two segments: one to the end of buffer and one at the + * beginning of the buffer between the metadata block and the + * tail pointer. + */ + segment = size - h; + if (len > segment) { + /* Not enough space in the post-head segment; we need + * to wrap. When wrapping, we will have to skip the + * metadata at the beginning of the buffer. + */ + if (len > tail - metalen) + goto overrun; + + /* Fill the space at the end of the page */ + buf->slots[h].type = WATCH_TYPE_META; + buf->slots[h].subtype = WATCH_META_SKIP_NOTIFICATION; + buf->slots[h].info = segment << WATCH_LENGTH_SHIFT; + head += segment; + h = 0; + if (h >= tail) + goto overrun; + } + } + + if (h == 0) { + /* Reset and skip the header metadata */ + buf->meta.watch.type = WATCH_TYPE_META; + buf->meta.watch.subtype = WATCH_META_SKIP_NOTIFICATION; + buf->meta.watch.info = metalen << WATCH_LENGTH_SHIFT; + head += metalen; + h = metalen; + if (h >= tail) + goto overrun; + } + + if (h < tail) { + /* Head is before tail in the buffer. There may be one segment + * between the two, but we may need to skip the metadata block. + */ + segment = tail - h; + if (len > segment) + goto overrun; + } + + n->info |= wqueue->flag_next; + wqueue->flag_next = 0; + memcpy(buf->slots + h, n, len * sizeof(buf->slots[0])); + head += len; + + smp_store_release(&buf->meta.head, head); + spin_unlock_bh(&wqueue->lock); + if (used == 0) + wake_up(&wqueue->waiters); + return true; + +overrun: + wqueue->flag_next = WATCH_INFO_OVERRUN; +out: + spin_unlock_bh(&wqueue->lock); + return false; +} + +/* + * Apply filter rules to a notification. + */ +static bool filter_watch_notification(const struct watch_filter *wf, + const struct watch_notification *n) +{ + const struct watch_type_filter *wt; + int i; + + if (!test_bit(n->type, wf->type_filter)) + return false; + + for (i = 0; i < wf->nr_filters; i++) { + wt = &wf->filters[i]; + if (n->type == wt->type && + ((1U << n->subtype) & wt->subtype_filter[0]) && + (n->info & wt->info_mask) == wt->info_filter) + return true; + } + + return false; /* If there is a filter, the default is to reject. */ +} + +/** + * __post_watch_notification - Post an event notification + * @wlist: The watch list to post the event to. + * @n: The notification record to post. + * @cred: The creds of the process that triggered the notification. + * @id: The ID to match on the watch. + * + * Post a notification of an event into a set of watch queues and let the users + * know. + * + * If @n is NULL then WATCH_INFO_LENGTH will be set on the next event posted. + * + * The size of the notification should be set in n->info & WATCH_INFO_LENGTH and + * should be in units of sizeof(*n). + */ +void __post_watch_notification(struct watch_list *wlist, + struct watch_notification *n, + const struct cred *cred, + u64 id) +{ + const struct watch_filter *wf; + struct watch_queue *wqueue; + struct watch *watch; + + rcu_read_lock(); + + hlist_for_each_entry_rcu(watch, &wlist->watchers, list_node) { + if (watch->id != id) + continue; + n->info &= ~(WATCH_INFO_ID | WATCH_INFO_OVERRUN); + n->info |= watch->info_id; + + wqueue = rcu_dereference(watch->queue); + wf = rcu_dereference(wqueue->filter); + if (wf && !filter_watch_notification(wf, n)) + continue; + + post_one_notification(wqueue, n, cred); + } + + rcu_read_unlock(); +} +EXPORT_SYMBOL(__post_watch_notification); + +/* + * Allow the queue to be polled. + */ +static __poll_t watch_queue_poll(struct file *file, poll_table *wait) +{ + struct watch_queue *wqueue = file->private_data; + struct watch_queue_buffer *buf = wqueue->buffer; + unsigned int head, tail; + __poll_t mask = 0; + + poll_wait(file, &wqueue->waiters, wait); + + head = READ_ONCE(buf->meta.head); + tail = READ_ONCE(buf->meta.tail); + if (head != tail) + mask |= EPOLLIN | EPOLLRDNORM; + if (head - tail > wqueue->size) + mask |= EPOLLERR; + return mask; +} + +static int watch_queue_set_page_dirty(struct page *page) +{ + SetPageDirty(page); + return 0; +} + +static const struct address_space_operations watch_queue_aops = { + .set_page_dirty = watch_queue_set_page_dirty, +}; + +static vm_fault_t watch_queue_fault(struct vm_fault *vmf) +{ + struct watch_queue *wqueue = vmf->vma->vm_file->private_data; + struct page *page; + + page = wqueue->pages[vmf->pgoff]; + get_page(page); + if (!lock_page_or_retry(page, vmf->vma->vm_mm, vmf->flags)) { + put_page(page); + return VM_FAULT_RETRY; + } + vmf->page = page; + return VM_FAULT_LOCKED; +} + +static void watch_queue_map_pages(struct vm_fault *vmf, + pgoff_t start_pgoff, pgoff_t end_pgoff) +{ + struct watch_queue *wqueue = vmf->vma->vm_file->private_data; + struct page *page; + + rcu_read_lock(); + + do { + page = wqueue->pages[start_pgoff]; + if (trylock_page(page)) { + vm_fault_t ret; + get_page(page); + ret = alloc_set_pte(vmf, NULL, page); + if (ret != 0) + put_page(page); + + unlock_page(page); + } + } while (++start_pgoff < end_pgoff); + + rcu_read_unlock(); +} + +static const struct vm_operations_struct watch_queue_vm_ops = { + .fault = watch_queue_fault, + .map_pages = watch_queue_map_pages, +}; + +/* + * Map the buffer. + */ +static int watch_queue_mmap(struct file *file, struct vm_area_struct *vma) +{ + struct watch_queue *wqueue = file->private_data; + + if (vma->vm_pgoff != 0 || + vma->vm_end - vma->vm_start > wqueue->nr_pages * PAGE_SIZE || + !(pgprot_val(vma->vm_page_prot) & pgprot_val(PAGE_SHARED))) + return -EINVAL; + + vma->vm_ops = &watch_queue_vm_ops; + + vma_interval_tree_insert(vma, &wqueue->mapping.i_mmap); + return 0; +} + +/* + * Allocate the required number of pages. + */ +static long watch_queue_set_size(struct watch_queue *wqueue, unsigned long nr_pages) +{ + struct watch_queue_buffer *buf; + u32 len; + int i; + + if (nr_pages == 0 || + nr_pages > 16 || /* TODO: choose a better hard limit */ + !is_power_of_2(nr_pages)) + return -EINVAL; + + wqueue->pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL); + if (!wqueue->pages) + goto err; + + for (i = 0; i < nr_pages; i++) { + wqueue->pages[i] = alloc_page(GFP_KERNEL | __GFP_ZERO); + if (!wqueue->pages[i]) + goto err_some_pages; + wqueue->pages[i]->mapping = &wqueue->mapping; + SetPageUptodate(wqueue->pages[i]); + } + + buf = vmap(wqueue->pages, nr_pages, VM_MAP, PAGE_SHARED); + if (!buf) + goto err_some_pages; + + wqueue->buffer = buf; + wqueue->nr_pages = nr_pages; + wqueue->size = ((nr_pages * PAGE_SIZE) / sizeof(struct watch_notification)); + + /* The first four slots in the buffer contain metadata about the ring, + * including the head and tail indices and mask. + */ + len = sizeof(buf->meta) / sizeof(buf->slots[0]); + buf->meta.watch.info = len << WATCH_LENGTH_SHIFT; + buf->meta.watch.type = WATCH_TYPE_META; + buf->meta.watch.subtype = WATCH_META_SKIP_NOTIFICATION; + buf->meta.tail = len; + buf->meta.mask = wqueue->size - 1; + smp_store_release(&buf->meta.head, len); + return 0; + +err_some_pages: + for (i--; i >= 0; i--) { + ClearPageUptodate(wqueue->pages[i]); + wqueue->pages[i]->mapping = NULL; + put_page(wqueue->pages[i]); + } + + kfree(wqueue->pages); + wqueue->pages = NULL; +err: + return -ENOMEM; +} + +/* + * Set the filter on a watch queue. + */ +static long watch_queue_set_filter(struct inode *inode, + struct watch_queue *wqueue, + struct watch_notification_filter __user *_filter) +{ + struct watch_notification_type_filter *tf; + struct watch_notification_filter filter; + struct watch_type_filter *q; + struct watch_filter *wfilter; + int ret, nr_filter = 0, i; + + if (!_filter) { + /* Remove the old filter */ + wfilter = NULL; + goto set; + } + + /* Grab the user's filter specification */ + if (copy_from_user(&filter, _filter, sizeof(filter)) != 0) + return -EFAULT; + if (filter.nr_filters == 0 || + filter.nr_filters > 16 || + filter.__reserved != 0) + return -EINVAL; + + tf = memdup_user(_filter->filters, filter.nr_filters * sizeof(*tf)); + if (IS_ERR(tf)) + return PTR_ERR(tf); + + ret = -EINVAL; + for (i = 0; i < filter.nr_filters; i++) { + if ((tf[i].info_filter & ~tf[i].info_mask) || + tf[i].info_mask & WATCH_INFO_LENGTH) + goto err_filter; + /* Ignore any unknown types */ + if (tf[i].type >= sizeof(wfilter->type_filter) * 8) + continue; + nr_filter++; + } + + /* Now we need to build the internal filter from only the relevant + * user-specified filters. + */ + ret = -ENOMEM; + wfilter = kzalloc(struct_size(wfilter, filters, nr_filter), GFP_KERNEL); + if (!wfilter) + goto err_filter; + wfilter->nr_filters = nr_filter; + + q = wfilter->filters; + for (i = 0; i < filter.nr_filters; i++) { + if (tf[i].type >= sizeof(wfilter->type_filter) * BITS_PER_LONG) + continue; + + q->type = tf[i].type; + q->info_filter = tf[i].info_filter; + q->info_mask = tf[i].info_mask; + q->subtype_filter[0] = tf[i].subtype_filter[0]; + __set_bit(q->type, wfilter->type_filter); + q++; + } + + kfree(tf); +set: + rcu_swap_protected(wqueue->filter, wfilter, + lockdep_is_held(&inode->i_rwsem)); + if (wfilter) + kfree_rcu(wfilter, rcu); + return 0; + +err_filter: + kfree(tf); + return ret; +} + +/* + * Set parameters. + */ +static long watch_queue_ioctl(struct file *file, unsigned int cmd, unsigned long arg) +{ + struct watch_queue *wqueue = file->private_data; + struct inode *inode = file_inode(file); + long ret; + + switch (cmd) { + case IOC_WATCH_QUEUE_SET_SIZE: + if (wqueue->buffer) + return -EBUSY; + inode_lock(inode); + ret = watch_queue_set_size(wqueue, arg); + inode_unlock(inode); + return ret; + + case IOC_WATCH_QUEUE_SET_FILTER: + inode_lock(inode); + ret = watch_queue_set_filter( + inode, wqueue, + (struct watch_notification_filter __user *)arg); + inode_unlock(inode); + return ret; + + default: + return -EOPNOTSUPP; + } +} + +/* + * Open the file. + */ +static int watch_queue_open(struct inode *inode, struct file *file) +{ + struct watch_queue *wqueue; + + wqueue = kzalloc(sizeof(*wqueue), GFP_KERNEL); + if (!wqueue) + return -ENOMEM; + + wqueue->mapping.a_ops = &watch_queue_aops; + wqueue->mapping.i_mmap = RB_ROOT_CACHED; + init_rwsem(&wqueue->mapping.i_mmap_rwsem); + spin_lock_init(&wqueue->mapping.private_lock); + + refcount_set(&wqueue->usage, 1); + spin_lock_init(&wqueue->lock); + init_waitqueue_head(&wqueue->waiters); + wqueue->cred = get_cred(file->f_cred); + + file->private_data = wqueue; + return 0; +} + +/** + * put_watch_queue - Dispose of a ref on a watchqueue. + * @wqueue: The watch queue to unref. + */ +void put_watch_queue(struct watch_queue *wqueue) +{ + if (refcount_dec_and_test(&wqueue->usage)) + kfree_rcu(wqueue, rcu); +} +EXPORT_SYMBOL(put_watch_queue); + +static void free_watch(struct rcu_head *rcu) +{ + struct watch *watch = container_of(rcu, struct watch, rcu); + + put_watch_queue(rcu_access_pointer(watch->queue)); +} + +/* + * Discard a watch. + */ +static void put_watch(struct watch *watch) +{ + if (refcount_dec_and_test(&watch->usage)) + call_rcu(&watch->rcu, free_watch); +} + +/** + * init_watch_queue - Initialise a watch + * @watch: The watch to initialise. + * @wqueue: The queue to assign. + * + * Initialise a watch and set the watch queue. + */ +void init_watch(struct watch *watch, struct watch_queue *wqueue) +{ + refcount_set(&watch->usage, 1); + INIT_HLIST_NODE(&watch->list_node); + INIT_HLIST_NODE(&watch->queue_node); + rcu_assign_pointer(watch->queue, wqueue); +} + +/** + * add_watch_to_object - Add a watch on an object to a watch list + * @watch: The watch to add + * @wlist: The watch list to add to + * + * @watch->queue must have been set to point to the queue to post notifications + * to and the watch list of the object to be watched. + * + * The caller must pin the queue and the list both and must hold the list + * locked against racing watch additions/removals. + */ +int add_watch_to_object(struct watch *watch, struct watch_list *wlist) +{ + struct watch_queue *wqueue = rcu_access_pointer(watch->queue); + struct watch *w; + + hlist_for_each_entry(w, &wlist->watchers, list_node) { + if (watch->id == w->id) + return -EBUSY; + } + + rcu_assign_pointer(watch->watch_list, wlist); + + spin_lock_bh(&wqueue->lock); + refcount_inc(&wqueue->usage); + hlist_add_head(&watch->queue_node, &wqueue->watches); + spin_unlock_bh(&wqueue->lock); + + hlist_add_head(&watch->list_node, &wlist->watchers); + return 0; +} +EXPORT_SYMBOL(add_watch_to_object); + +/** + * remove_watch_from_object - Remove a watch or all watches from an object. + * @wlist: The watch list to remove from + * @wq: The watch queue of interest (ignored if @all is true) + * @id: The ID of the watch to remove (ignored if @all is true) + * @all: True to remove all objects + * + * Remove a specific watch or all watches from an object. A notification is + * sent to the watcher to tell them that this happened. + */ +int remove_watch_from_object(struct watch_list *wlist, struct watch_queue *wq, + u64 id, bool all) +{ + struct watch_notification n; + struct watch_queue *wqueue; + struct watch *watch; + int ret = -EBADSLT; + + rcu_read_lock(); + +again: + spin_lock(&wlist->lock); + hlist_for_each_entry(watch, &wlist->watchers, list_node) { + if (all || + (watch->id == id && rcu_access_pointer(watch->queue) == wq)) + goto found; + } + spin_unlock(&wlist->lock); + goto out; + +found: + ret = 0; + hlist_del_init_rcu(&watch->list_node); + rcu_assign_pointer(watch->watch_list, NULL); + spin_unlock(&wlist->lock); + + n.type = WATCH_TYPE_META; + n.subtype = WATCH_META_REMOVAL_NOTIFICATION; + n.info = watch->info_id | sizeof(n); + + wqueue = rcu_dereference(watch->queue); + post_one_notification(wqueue, &n, wq ? wq->cred : NULL); + + /* We don't need the watch list lock for the next bit as RCU is + * protecting everything from being deallocated. + */ + if (wqueue) { + spin_lock_bh(&wqueue->lock); + + if (!hlist_unhashed(&watch->queue_node)) { + hlist_del_init_rcu(&watch->queue_node); + put_watch(watch); + } + + spin_unlock_bh(&wqueue->lock); + } + + if (wlist->release_watch) { + rcu_read_unlock(); + wlist->release_watch(wlist, watch); + rcu_read_lock(); + } + put_watch(watch); + + if (all && !hlist_empty(&wlist->watchers)) + goto again; +out: + rcu_read_unlock(); + return ret; +} +EXPORT_SYMBOL(remove_watch_from_object); + +/* + * Remove all the watches that are contributory to a queue. This will + * potentially race with removal of the watches by the destruction of the + * objects being watched or the distribution of notifications. + */ +static void watch_queue_clear(struct watch_queue *wqueue) +{ + struct watch_list *wlist; + struct watch *watch; + bool release; + + rcu_read_lock(); + spin_lock_bh(&wqueue->lock); + + /* Prevent new additions and prevent notifications from happening */ + wqueue->defunct = true; + + while (!hlist_empty(&wqueue->watches)) { + watch = hlist_entry(wqueue->watches.first, struct watch, queue_node); + hlist_del_init_rcu(&watch->queue_node); + spin_unlock_bh(&wqueue->lock); + + /* We can't do the next bit under the queue lock as we need to + * get the list lock - which would cause a deadlock if someone + * was removing from the opposite direction at the same time or + * posting a notification. + */ + wlist = rcu_dereference(watch->watch_list); + if (wlist) { + spin_lock(&wlist->lock); + + release = !hlist_unhashed(&watch->list_node); + if (release) { + hlist_del_init_rcu(&watch->list_node); + rcu_assign_pointer(watch->watch_list, NULL); + } + + spin_unlock(&wlist->lock); + + if (release) { + if (wlist->release_watch) { + rcu_read_unlock(); + /* This might need to call dput(), so + * we have to drop all the locks. + */ + wlist->release_watch(wlist, watch); + rcu_read_lock(); + } + put_watch(watch); + } + } + + put_watch(watch); + spin_lock_bh(&wqueue->lock); + } + + spin_unlock_bh(&wqueue->lock); + rcu_read_unlock(); +} + +/* + * Release the file. + */ +static int watch_queue_release(struct inode *inode, struct file *file) +{ + struct watch_filter *wfilter; + struct watch_queue *wqueue = file->private_data; + int i, pgref; + + watch_queue_clear(wqueue); + + if (wqueue->pages && wqueue->pages[0]) + WARN_ON(page_ref_count(wqueue->pages[0]) != 1); + + if (wqueue->buffer) + vfree(wqueue->buffer); + for (i = 0; i < wqueue->nr_pages; i++) { + ClearPageUptodate(wqueue->pages[i]); + wqueue->pages[i]->mapping = NULL; + pgref = page_ref_count(wqueue->pages[i]); + WARN(pgref != 1, + "FREE PAGE[%d] refcount %d\n", i, page_ref_count(wqueue->pages[i])); + __free_page(wqueue->pages[i]); + } + + wfilter = rcu_access_pointer(wqueue->filter); + if (wfilter) + kfree_rcu(wfilter, rcu); + kfree(wqueue->pages); + put_cred(wqueue->cred); + put_watch_queue(wqueue); + return 0; +} + +#ifdef DEBUG_WITH_WRITE +static ssize_t watch_queue_write(struct file *file, + const char __user *_buf, size_t len, loff_t *pos) +{ + struct watch_notification *n; + struct watch_queue *wqueue = file->private_data; + ssize_t ret; + + if (!wqueue->buffer) + return -ENOBUFS; + + if (len & ~WATCH_INFO_LENGTH || len == 0 || !_buf) + return -EINVAL; + + n = memdup_user(_buf, len); + if (IS_ERR(n)) + return PTR_ERR(n); + + ret = -EINVAL; + if ((n->info & WATCH_INFO_LENGTH) != len) + goto error; + n->info &= (WATCH_INFO_LENGTH | WATCH_INFO_TYPE_FLAGS | WATCH_INFO_ID); + + if (post_one_notification(wqueue, n, file->f_cred)) + wqueue->debug = 0; + else + wqueue->debug++; + ret = len; + if (wqueue->debug > 20) + ret = -EIO; + +error: + kfree(n); + return ret; +} +#endif + +static const struct file_operations watch_queue_fops = { + .owner = THIS_MODULE, + .open = watch_queue_open, + .release = watch_queue_release, + .unlocked_ioctl = watch_queue_ioctl, + .poll = watch_queue_poll, + .mmap = watch_queue_mmap, +#ifdef DEBUG_WITH_WRITE + .write = watch_queue_write, +#endif + .llseek = no_llseek, +}; + +/** + * get_watch_queue - Get a watch queue from its file descriptor. + * @fd: The fd to query. + */ +struct watch_queue *get_watch_queue(int fd) +{ + struct watch_queue *wqueue = ERR_PTR(-EBADF); + struct fd f; + + f = fdget(fd); + if (f.file) { + wqueue = ERR_PTR(-EINVAL); + if (f.file->f_op == &watch_queue_fops) { + wqueue = f.file->private_data; + refcount_inc(&wqueue->usage); + } + fdput(f); + } + + return wqueue; +} +EXPORT_SYMBOL(get_watch_queue); + +static struct miscdevice watch_queue_dev = { + .minor = MISC_DYNAMIC_MINOR, + .name = "watch_queue", + .fops = &watch_queue_fops, + .mode = 0666, +}; + +static int __init watch_queue_init(void) +{ + int ret; + + ret = misc_register(&watch_queue_dev); + if (ret < 0) + pr_err("Failed to register %d\n", ret); + return ret; +} +fs_initcall(watch_queue_init); + +static void __exit watch_queue_exit(void) +{ + misc_deregister(&watch_queue_dev); +} +module_exit(watch_queue_exit); diff --git a/include/linux/lsm_hooks.h b/include/linux/lsm_hooks.h index 2474c3f785ca..2f72ea80d4fe 100644 --- a/include/linux/lsm_hooks.h +++ b/include/linux/lsm_hooks.h @@ -1420,6 +1420,13 @@ * @ctx is a pointer in which to place the allocated security context. * @ctxlen points to the place to put the length of @ctx. * + * @post_notification: + * Check to see if a watch notification can be posted to a particular + * queue. + * @q_cred: The credentials of the target watch queue. + * @cred: The event-triggerer's credentials + * @n: The notification being posted + * * Security hooks for using the eBPF maps and programs functionalities through * eBPF syscalls. * @@ -1698,6 +1705,11 @@ union security_list_options { int (*inode_notifysecctx)(struct inode *inode, void *ctx, u32 ctxlen); int (*inode_setsecctx)(struct dentry *dentry, void *ctx, u32 ctxlen); int (*inode_getsecctx)(struct inode *inode, void **ctx, u32 *ctxlen); +#ifdef CONFIG_WATCH_QUEUE + int (*post_notification)(const struct cred *q_cred, + const struct cred *cred, + struct watch_notification *n); +#endif #ifdef CONFIG_SECURITY_NETWORK int (*unix_stream_connect)(struct sock *sock, struct sock *other, @@ -1977,6 +1989,9 @@ struct security_hook_heads { struct hlist_head inode_notifysecctx; struct hlist_head inode_setsecctx; struct hlist_head inode_getsecctx; +#ifdef CONFIG_WATCH_QUEUE + struct hlist_head post_notification; +#endif #ifdef CONFIG_SECURITY_NETWORK struct hlist_head unix_stream_connect; struct hlist_head unix_may_send; diff --git a/include/linux/security.h b/include/linux/security.h index 23c8b602c0ab..1df8d55de8da 100644 --- a/include/linux/security.h +++ b/include/linux/security.h @@ -58,6 +58,7 @@ struct fs_context; struct fs_parameter; enum fs_value_type; struct fsinfo_kparams; +struct watch_notification; /* Default (no) options for the capable function */ #define CAP_OPT_NONE 0x0 @@ -396,6 +397,11 @@ void security_inode_invalidate_secctx(struct inode *inode); int security_inode_notifysecctx(struct inode *inode, void *ctx, u32 ctxlen); int security_inode_setsecctx(struct dentry *dentry, void *ctx, u32 ctxlen); int security_inode_getsecctx(struct inode *inode, void **ctx, u32 *ctxlen); +#ifdef CONFIG_WATCH_QUEUE +int security_post_notification(const struct cred *q_cred, + const struct cred *cred, + struct watch_notification *n); +#endif #else /* CONFIG_SECURITY */ static inline int call_lsm_notifier(enum lsm_event event, void *data) @@ -1215,6 +1221,14 @@ static inline int security_inode_getsecctx(struct inode *inode, void **ctx, u32 { return -EOPNOTSUPP; } +#ifdef CONFIG_WATCH_QUEUE +static inline int security_post_notification(const struct cred *q_cred, + const struct cred *cred, + struct watch_notification *n) +{ + return 0; +} +#endif #endif /* CONFIG_SECURITY */ #ifdef CONFIG_SECURITY_NETWORK diff --git a/include/linux/watch_queue.h b/include/linux/watch_queue.h new file mode 100644 index 000000000000..f200b68c799e --- /dev/null +++ b/include/linux/watch_queue.h @@ -0,0 +1,86 @@ +/* User-mappable watch queue + * + * Copyright (C) 2018 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public Licence + * as published by the Free Software Foundation; either version + * 2 of the Licence, or (at your option) any later version. + * + * See Documentation/watch_queue.rst + */ + +#ifndef _LINUX_WATCH_QUEUE_H +#define _LINUX_WATCH_QUEUE_H + +#include <uapi/linux/watch_queue.h> + +#ifdef CONFIG_WATCH_QUEUE + +struct watch_queue; + +/* + * Representation of a watch on an object. + */ +struct watch { + union { + struct rcu_head rcu; + u32 info_id; /* ID to be OR'd in to info field */ + }; + struct watch_queue __rcu *queue; /* Queue to post events to */ + struct hlist_node queue_node; /* Link in queue->watches */ + struct watch_list __rcu *watch_list; + struct hlist_node list_node; /* Link in watch_list->watchers */ + void *private; /* Private data for the watched object */ + u64 id; /* Internal identifier */ + refcount_t usage; +}; + +/* + * List of watches on an object. + */ +struct watch_list { + struct rcu_head rcu; + struct hlist_head watchers; + void (*release_watch)(struct watch_list *, struct watch *); + spinlock_t lock; +}; + +extern void __post_watch_notification(struct watch_list *, + struct watch_notification *, + const struct cred *, + u64); +extern struct watch_queue *get_watch_queue(int); +extern void put_watch_queue(struct watch_queue *); +extern void put_watch_list(struct watch_list *); +extern void init_watch(struct watch *, struct watch_queue *); +extern int add_watch_to_object(struct watch *, struct watch_list *); +extern int remove_watch_from_object(struct watch_list *, struct watch_queue *, u64, bool); + +static inline void init_watch_list(struct watch_list *wlist) +{ + INIT_HLIST_HEAD(&wlist->watchers); + spin_lock_init(&wlist->lock); +} + +static inline void post_watch_notification(struct watch_list *wlist, + struct watch_notification *n, + const struct cred *cred, + u64 id) +{ + if (unlikely(wlist)) + __post_watch_notification(wlist, n, cred, id); +} + +static inline void remove_watch_list(struct watch_list *wlist) +{ + if (wlist) { + remove_watch_from_object(wlist, NULL, 0, true); + kfree_rcu(wlist, rcu); + } +} + +#endif + +#endif /* _LINUX_WATCH_QUEUE_H */ diff --git a/include/uapi/linux/watch_queue.h b/include/uapi/linux/watch_queue.h new file mode 100644 index 000000000000..01746982c2ba --- /dev/null +++ b/include/uapi/linux/watch_queue.h @@ -0,0 +1,82 @@ +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */ +#ifndef _UAPI_LINUX_WATCH_QUEUE_H +#define _UAPI_LINUX_WATCH_QUEUE_H + +#include <linux/types.h> +#include <linux/ioctl.h> + +#define IOC_WATCH_QUEUE_SET_SIZE _IO('s', 0x01) /* Set the size in pages */ +#define IOC_WATCH_QUEUE_SET_FILTER _IO('s', 0x02) /* Set the filter */ + +enum watch_notification_type { + WATCH_TYPE_META = 0, /* Special record */ + WATCH_TYPE_MOUNT_NOTIFY = 1, /* Mount notification record */ + WATCH_TYPE_SB_NOTIFY = 2, /* Superblock notification */ + WATCH_TYPE_KEY_NOTIFY = 3, /* Key/keyring change notification */ + WATCH_TYPE_BLOCK_NOTIFY = 4, /* Block layer notifications */ +#define WATCH_TYPE___NR 5 +}; + +enum watch_meta_notification_subtype { + WATCH_META_SKIP_NOTIFICATION = 0, /* Just skip this record */ + WATCH_META_REMOVAL_NOTIFICATION = 1, /* Watched object was removed */ +}; + +/* + * Notification record + */ +struct watch_notification { + __u32 type:24; /* enum watch_notification_type */ + __u32 subtype:8; /* Type-specific subtype (filterable) */ + __u32 info; +#define WATCH_INFO_OVERRUN 0x00000001 /* Event(s) lost due to overrun */ +#define WATCH_INFO_ENOMEM 0x00000002 /* Event(s) lost due to ENOMEM */ +#define WATCH_INFO_RECURSIVE 0x00000004 /* Change was recursive */ +#define WATCH_INFO_LENGTH 0x000001f8 /* Length of record / sizeof(watch_notification) */ +#define WATCH_INFO_IN_SUBTREE 0x00000200 /* Change was not at watched root */ +#define WATCH_INFO_TYPE_FLAGS 0x00ff0000 /* Type-specific flags */ +#define WATCH_INFO_FLAG_0 0x00010000 +#define WATCH_INFO_FLAG_1 0x00020000 +#define WATCH_INFO_FLAG_2 0x00040000 +#define WATCH_INFO_FLAG_3 0x00080000 +#define WATCH_INFO_FLAG_4 0x00100000 +#define WATCH_INFO_FLAG_5 0x00200000 +#define WATCH_INFO_FLAG_6 0x00400000 +#define WATCH_INFO_FLAG_7 0x00800000 +#define WATCH_INFO_ID 0xff000000 /* ID of watchpoint */ +}; + +#define WATCH_LENGTH_SHIFT 3 + +struct watch_queue_buffer { + union { + /* The first few entries are special, containing the + * ring management variables. + */ + struct { + struct watch_notification watch; /* WATCH_TYPE_SKIP */ + volatile __u32 head; /* Ring head index */ + volatile __u32 tail; /* Ring tail index */ + __u32 mask; /* Ring index mask */ + } meta; + struct watch_notification slots[0]; + }; +}; + +/* + * Notification filtering rules (IOC_WATCH_QUEUE_SET_FILTER). + */ +struct watch_notification_type_filter { + __u32 type; /* Type to apply filter to */ + __u32 info_filter; /* Filter on watch_notification::info */ + __u32 info_mask; /* Mask of relevant bits in info_filter */ + __u32 subtype_filter[8]; /* Bitmask of subtypes to filter on */ +}; + +struct watch_notification_filter { + __u32 nr_filters; /* Number of filters */ + __u32 __reserved; /* Must be 0 */ + struct watch_notification_type_filter filters[]; +}; + +#endif /* _UAPI_LINUX_WATCH_QUEUE_H */ diff --git a/mm/interval_tree.c b/mm/interval_tree.c index 27ddfd29112a..9a53ddf4bd62 100644 --- a/mm/interval_tree.c +++ b/mm/interval_tree.c @@ -25,6 +25,8 @@ INTERVAL_TREE_DEFINE(struct vm_area_struct, shared.rb, unsigned long, shared.rb_subtree_last, vma_start_pgoff, vma_last_pgoff,, vma_interval_tree) +EXPORT_SYMBOL_GPL(vma_interval_tree_insert); + /* Insert node immediately after prev in the interval tree */ void vma_interval_tree_insert_after(struct vm_area_struct *node, struct vm_area_struct *prev, diff --git a/mm/memory.c b/mm/memory.c index 96f1d473c89a..9f2fa2138287 100644 --- a/mm/memory.c +++ b/mm/memory.c @@ -3360,6 +3360,7 @@ vm_fault_t alloc_set_pte(struct vm_fault *vmf, struct mem_cgroup *memcg, return 0; } +EXPORT_SYMBOL_GPL(alloc_set_pte); /** diff --git a/security/security.c b/security/security.c index 3af886e8fced..af758dc71e24 100644 --- a/security/security.c +++ b/security/security.c @@ -1929,6 +1929,15 @@ int security_inode_getsecctx(struct inode *inode, void **ctx, u32 *ctxlen) } EXPORT_SYMBOL(security_inode_getsecctx); +#ifdef CONFIG_WATCH_QUEUE +int security_post_notification(const struct cred *q_cred, + const struct cred *cred, + struct watch_notification *n) +{ + return call_int_hook(post_notification, 0, q_cred, cred, n); +} +#endif + #ifdef CONFIG_SECURITY_NETWORK int security_unix_stream_connect(struct sock *sock, struct sock *other, struct sock *newsk)
Implement a misc device that implements a general notification queue as a ring buffer that can be mmap()'d from userspace. The way this is done is: (1) An application opens the device and indicates the size of the ring buffer that it wants to reserve in pages (this can only be set once): fd = open("/dev/watch_queue", O_RDWR); ioctl(fd, IOC_WATCH_QUEUE_NR_PAGES, nr_of_pages); (2) The application should then map the pages that the device has reserved. Each instance of the device created by open() allocates separate pages so that maps of different fds don't interfere with one another. Multiple mmap() calls on the same fd, however, will all work together. page_size = sysconf(_SC_PAGESIZE); mapping_size = nr_of_pages * page_size; char *buf = mmap(NULL, mapping_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); The ring is divided into 8-byte slots. Entries written into the ring are variable size and can use between 1 and 63 slots. A special entry is maintained in the first two slots of the ring that contains the head and tail pointers. This is skipped when the ring wraps round. Note that multislot entries, therefore, aren't allowed to be broken over the end of the ring, but instead "skip" entries are inserted to pad out the buffer. Each entry has a 1-slot header that describes it: struct watch_notification { __u32 type:24; __u32 subtype:8; __u32 info; }; The type indicates the source (eg. mount tree changes, superblock events, keyring changes, block layer events) and the subtype indicates the event type (eg. mount, unmount; EIO, EDQUOT; link, unlink). The info field indicates a number of things, including the entry length, an ID assigned to a watchpoint contributing to this buffer, type-specific flags and meta flags, such as an overrun indicator. Supplementary data, such as the key ID that generated an event, are attached in additional slots. Signed-off-by: David Howells <dhowells@redhat.com> --- Documentation/watch_queue.rst | 311 +++++++++++++ drivers/misc/Kconfig | 13 + drivers/misc/Makefile | 1 drivers/misc/watch_queue.c | 877 ++++++++++++++++++++++++++++++++++++++ include/linux/lsm_hooks.h | 15 + include/linux/security.h | 14 + include/linux/watch_queue.h | 86 ++++ include/uapi/linux/watch_queue.h | 82 ++++ mm/interval_tree.c | 2 mm/memory.c | 1 security/security.c | 9 11 files changed, 1411 insertions(+) create mode 100644 Documentation/watch_queue.rst create mode 100644 drivers/misc/watch_queue.c create mode 100644 include/linux/watch_queue.h create mode 100644 include/uapi/linux/watch_queue.h