diff mbox series

[12/23] fsmonitor--daemon: create token-based changed path cache

Message ID f1fa803ebe9c9f78608c22e55ec590f8c6775c94.1617291666.git.gitgitgadget@gmail.com (mailing list archive)
State New, archived
Headers show
Series Builtin FSMonitor Feature | expand

Commit Message

Jeff Hostetler April 1, 2021, 3:40 p.m. UTC
From: Jeff Hostetler <jeffhost@microsoft.com>

Teach fsmonitor--daemon to build lists of changed paths and associate
them with a token-id.  This will be used by the platform-specific
backends to accumulate changed paths in response to filesystem events.

The platform-specific event loops receive batches containing one or
more changed paths.  Their fs listener thread will accumulate them in
a `fsmonitor_batch` (and without locking) and then "publish" them to
associate them with the current token and to make them visible to the
client worker threads.

Signed-off-by: Jeff Hostetler <jeffhost@microsoft.com>
---
 builtin/fsmonitor--daemon.c | 192 ++++++++++++++++++++++++++++++++++++
 fsmonitor--daemon.h         |  40 ++++++++
 2 files changed, 232 insertions(+)

Comments

Derrick Stolee April 26, 2021, 8:22 p.m. UTC | #1
On 4/1/2021 11:40 AM, Jeff Hostetler via GitGitGadget wrote:
> From: Jeff Hostetler <jeffhost@microsoft.com>
> 
> Teach fsmonitor--daemon to build lists of changed paths and associate
> them with a token-id.  This will be used by the platform-specific
> backends to accumulate changed paths in response to filesystem events.
> 
> The platform-specific event loops receive batches containing one or
> more changed paths.  Their fs listener thread will accumulate them in

I think the lowercase "fs" here is strange. "Their listener thread"
could be interpreted as the IPC listener, so it's probably best to
spell it out: "Their filesystem listener thread".

> a `fsmonitor_batch` (and without locking) and then "publish" them to
> associate them with the current token and to make them visible to the
> client worker threads.
...
> +struct fsmonitor_batch {
> +	struct fsmonitor_batch *next;
> +	uint64_t batch_seq_nr;
> +	const char **interned_paths;
> +	size_t nr, alloc;
> +	time_t pinned_time;
> +};

A linked list to help with adding while consuming it, but also
batching for efficiency. I can see how this will work out
nicely.

> +struct fsmonitor_batch *fsmonitor_batch__new(void)
> +{
> +	struct fsmonitor_batch *batch = xcalloc(1, sizeof(*batch));

I mentioned earlier that I think `CALLOC_ARRAY(batch, 1)` is the
typical pattern here.

> +
> +	return batch;
> +}
> +
> +struct fsmonitor_batch *fsmonitor_batch__free(struct fsmonitor_batch *batch)

Since this method frees the tip of the list and returns the next
item (instead of freeing the entire list), perhaps this would be
better named as _pop()?

> +{
> +	struct fsmonitor_batch *next;
> +
> +	if (!batch)
> +		return NULL;
> +
> +	next = batch->next;
> +
> +	/*
> +	 * The actual strings within the array are interned, so we don't
> +	 * own them.
> +	 */
> +	free(batch->interned_paths);
> +
> +	return next;
> +}
> +
> +void fsmonitor_batch__add_path(struct fsmonitor_batch *batch,
> +			       const char *path)
> +{
> +	const char *interned_path = strintern(path);

This use of interned paths is interesting, although I become
concerned for the amount of memory we are consuming over the
lifetime of the process. This could be considered as a target
for future improvements, perhaps with an LRU cache or something
similar.

> +
> +	trace_printf_key(&trace_fsmonitor, "event: %s", interned_path);
> +
> +	ALLOC_GROW(batch->interned_paths, batch->nr + 1, batch->alloc);
> +	batch->interned_paths[batch->nr++] = interned_path;
> +}
> +
> +static void fsmonitor_batch__combine(struct fsmonitor_batch *batch_dest,
> +				     const struct fsmonitor_batch *batch_src)
> +{
> +	/* assert state->main_lock */
> +

This comment seems stale.

> +	size_t k;
> +
> +	ALLOC_GROW(batch_dest->interned_paths,
> +		   batch_dest->nr + batch_src->nr + 1,
> +		   batch_dest->alloc);
> +
> +	for (k = 0; k < batch_src->nr; k++)
> +		batch_dest->interned_paths[batch_dest->nr++] =
> +			batch_src->interned_paths[k];
> +}
> +
> +static void fsmonitor_free_token_data(struct fsmonitor_token_data *token)

This one _does_ free the whole list.

> +{
> +	struct fsmonitor_batch *p;
> +
> +	if (!token)
> +		return;
> +
> +	assert(token->client_ref_count == 0);
> +
> +	strbuf_release(&token->token_id);
> +
> +	for (p = token->batch_head; p; p = fsmonitor_batch__free(p))
> +		;
> +
> +	free(token);
> +}
> +
> +/*
> + * Flush all of our cached data about the filesystem.  Call this if we
> + * lose sync with the filesystem and miss some notification events.
> + *
> + * [1] If we are missing events, then we no longer have a complete
> + *     history of the directory (relative to our current start token).
> + *     We should create a new token and start fresh (as if we just
> + *     booted up).
> + *
> + * If there are no readers of the the current token data series, we
> + * can free it now.  Otherwise, let the last reader free it.  Either
> + * way, the old token data series is no longer associated with our
> + * state data.
> + */
> +void fsmonitor_force_resync(struct fsmonitor_daemon_state *state)
> +{
> +	struct fsmonitor_token_data *free_me = NULL;
> +	struct fsmonitor_token_data *new_one = NULL;
> +
> +	new_one = fsmonitor_new_token_data();
> +
> +	pthread_mutex_lock(&state->main_lock);
> +
> +	trace_printf_key(&trace_fsmonitor,
> +			 "force resync [old '%s'][new '%s']",
> +			 state->current_token_data->token_id.buf,
> +			 new_one->token_id.buf);
> +
> +	if (state->current_token_data->client_ref_count == 0)
> +		free_me = state->current_token_data;
> +	state->current_token_data = new_one;
> +
> +	pthread_mutex_unlock(&state->main_lock);
> +
> +	fsmonitor_free_token_data(free_me);
> +}
> +

Swap the pointer under a lock, free outside of it. Good.

> +/*
> + * We try to combine small batches at the front of the batch-list to avoid
> + * having a long list.  This hopefully makes it a little easier when we want
> + * to truncate and maintain the list.  However, we don't want the paths array
> + * to just keep growing and growing with realloc, so we insert an arbitrary
> + * limit.
> + */
> +#define MY_COMBINE_LIMIT (1024)
> +
> +void fsmonitor_publish(struct fsmonitor_daemon_state *state,
> +		       struct fsmonitor_batch *batch,
> +		       const struct string_list *cookie_names)
> +{
> +	if (!batch && !cookie_names->nr)
> +		return;
> +
> +	pthread_mutex_lock(&state->main_lock);
> +
> +	if (batch) {
> +		struct fsmonitor_batch *head;
> +
> +		head = state->current_token_data->batch_head;
> +		if (!head) {
> +			batch->batch_seq_nr = 0;
> +			batch->next = NULL;
> +			state->current_token_data->batch_head = batch;
> +			state->current_token_data->batch_tail = batch;
> +		} else if (head->pinned_time) {
> +			/*
> +			 * We cannot alter the current batch list
> +			 * because:
> +			 *
> +			 * [a] it is being transmitted to at least one
> +			 * client and the handle_client() thread has a
> +			 * ref-count, but not a lock on the batch list
> +			 * starting with this item.
> +			 *
> +			 * [b] it has been transmitted in the past to
> +			 * at least one client such that future
> +			 * requests are relative to this head batch.
> +			 *
> +			 * So, we can only prepend a new batch onto
> +			 * the front of the list.
> +			 */
> +			batch->batch_seq_nr = head->batch_seq_nr + 1;
> +			batch->next = head;
> +			state->current_token_data->batch_head = batch;
> +		} else if (head->nr + batch->nr > MY_COMBINE_LIMIT) {
> +			/*
> +			 * The head batch in the list has never been
> +			 * transmitted to a client, but folding the
> +			 * contents of the new batch onto it would
> +			 * exceed our arbitrary limit, so just prepend
> +			 * the new batch onto the list.
> +			 */
> +			batch->batch_seq_nr = head->batch_seq_nr + 1;
> +			batch->next = head;
> +			state->current_token_data->batch_head = batch;
> +		} else {
> +			/*
> +			 * We are free to append the paths in the given
> +			 * batch onto the end of the current head batch.
> +			 */
> +			fsmonitor_batch__combine(head, batch);
> +			fsmonitor_batch__free(batch);
> +		}
> +	}
> +
> +	pthread_mutex_unlock(&state->main_lock);
> +}

I appreciate the careful comments in this critical piece of the
data structure. Also, it is good that you already have a batch
of results to merge into the list instead of updating a lock for
every filesystem event.

Thanks,
-Stolee
Jeff Hostetler April 30, 2021, 5:36 p.m. UTC | #2
On 4/26/21 4:22 PM, Derrick Stolee wrote:
...
>> +
>> +void fsmonitor_batch__add_path(struct fsmonitor_batch *batch,
>> +			       const char *path)
>> +{
>> +	const char *interned_path = strintern(path);
> 
> This use of interned paths is interesting, although I become
> concerned for the amount of memory we are consuming over the
> lifetime of the process. This could be considered as a target
> for future improvements, perhaps with an LRU cache or something
> similar.

Interning gives us a fixed pointer for any given path.  This
gives us a way to de-dup paths using just pointers rather than
string compares.

Yes, we will accumulate paths in that dictionary, but the set of
paths present in the typical working directory are usually pretty
fixed.

We only generate these for modified paths.  Users don't typically
create/modify/delete that many paths in their source trees during
normal development.

Compilers may generate lots of trash files in their worktree, but
those names are usually repeated (with each "make").  So we might
acculuate a lot of paths for a repo, it should become stable.
However, if they use temp files in the tree, it might invalidate
this statement.

WRT LRUs, that gets us into threading and lock contention problem
and ref-counting.  I have it designed such that parallel threads
read and send the current queue to the client without a lock.  They
only need a quick lock to get the current head pointer; the rest
is done lock free.  Also, purging from the end of the LRU would
put is in contention with the FS listener thread that is adding
new paths to the LRU.

So, yeah, maybe this is something to keep an eye on -- especially
in the monorepo case, but I don't think we need to address it now.

Thanks,
Jeff
diff mbox series

Patch

diff --git a/builtin/fsmonitor--daemon.c b/builtin/fsmonitor--daemon.c
index 2d25e36601fe..48071d445c49 100644
--- a/builtin/fsmonitor--daemon.c
+++ b/builtin/fsmonitor--daemon.c
@@ -255,6 +255,120 @@  static struct fsmonitor_token_data *fsmonitor_new_token_data(void)
 	return token;
 }
 
+struct fsmonitor_batch {
+	struct fsmonitor_batch *next;
+	uint64_t batch_seq_nr;
+	const char **interned_paths;
+	size_t nr, alloc;
+	time_t pinned_time;
+};
+
+struct fsmonitor_batch *fsmonitor_batch__new(void)
+{
+	struct fsmonitor_batch *batch = xcalloc(1, sizeof(*batch));
+
+	return batch;
+}
+
+struct fsmonitor_batch *fsmonitor_batch__free(struct fsmonitor_batch *batch)
+{
+	struct fsmonitor_batch *next;
+
+	if (!batch)
+		return NULL;
+
+	next = batch->next;
+
+	/*
+	 * The actual strings within the array are interned, so we don't
+	 * own them.
+	 */
+	free(batch->interned_paths);
+
+	return next;
+}
+
+void fsmonitor_batch__add_path(struct fsmonitor_batch *batch,
+			       const char *path)
+{
+	const char *interned_path = strintern(path);
+
+	trace_printf_key(&trace_fsmonitor, "event: %s", interned_path);
+
+	ALLOC_GROW(batch->interned_paths, batch->nr + 1, batch->alloc);
+	batch->interned_paths[batch->nr++] = interned_path;
+}
+
+static void fsmonitor_batch__combine(struct fsmonitor_batch *batch_dest,
+				     const struct fsmonitor_batch *batch_src)
+{
+	/* assert state->main_lock */
+
+	size_t k;
+
+	ALLOC_GROW(batch_dest->interned_paths,
+		   batch_dest->nr + batch_src->nr + 1,
+		   batch_dest->alloc);
+
+	for (k = 0; k < batch_src->nr; k++)
+		batch_dest->interned_paths[batch_dest->nr++] =
+			batch_src->interned_paths[k];
+}
+
+static void fsmonitor_free_token_data(struct fsmonitor_token_data *token)
+{
+	struct fsmonitor_batch *p;
+
+	if (!token)
+		return;
+
+	assert(token->client_ref_count == 0);
+
+	strbuf_release(&token->token_id);
+
+	for (p = token->batch_head; p; p = fsmonitor_batch__free(p))
+		;
+
+	free(token);
+}
+
+/*
+ * Flush all of our cached data about the filesystem.  Call this if we
+ * lose sync with the filesystem and miss some notification events.
+ *
+ * [1] If we are missing events, then we no longer have a complete
+ *     history of the directory (relative to our current start token).
+ *     We should create a new token and start fresh (as if we just
+ *     booted up).
+ *
+ * If there are no readers of the the current token data series, we
+ * can free it now.  Otherwise, let the last reader free it.  Either
+ * way, the old token data series is no longer associated with our
+ * state data.
+ */
+void fsmonitor_force_resync(struct fsmonitor_daemon_state *state)
+{
+	struct fsmonitor_token_data *free_me = NULL;
+	struct fsmonitor_token_data *new_one = NULL;
+
+	new_one = fsmonitor_new_token_data();
+
+	pthread_mutex_lock(&state->main_lock);
+
+	trace_printf_key(&trace_fsmonitor,
+			 "force resync [old '%s'][new '%s']",
+			 state->current_token_data->token_id.buf,
+			 new_one->token_id.buf);
+
+	if (state->current_token_data->client_ref_count == 0)
+		free_me = state->current_token_data;
+	state->current_token_data = new_one;
+
+	pthread_mutex_unlock(&state->main_lock);
+
+	fsmonitor_free_token_data(free_me);
+}
+
 static ipc_server_application_cb handle_client;
 
 static int handle_client(void *data, const char *command,
@@ -355,6 +469,77 @@  enum fsmonitor_path_type fsmonitor_classify_path_absolute(
 	return fsmonitor_classify_path_gitdir_relative(rel);
 }
 
+/*
+ * We try to combine small batches at the front of the batch-list to avoid
+ * having a long list.  This hopefully makes it a little easier when we want
+ * to truncate and maintain the list.  However, we don't want the paths array
+ * to just keep growing and growing with realloc, so we insert an arbitrary
+ * limit.
+ */
+#define MY_COMBINE_LIMIT (1024)
+
+void fsmonitor_publish(struct fsmonitor_daemon_state *state,
+		       struct fsmonitor_batch *batch,
+		       const struct string_list *cookie_names)
+{
+	if (!batch && !cookie_names->nr)
+		return;
+
+	pthread_mutex_lock(&state->main_lock);
+
+	if (batch) {
+		struct fsmonitor_batch *head;
+
+		head = state->current_token_data->batch_head;
+		if (!head) {
+			batch->batch_seq_nr = 0;
+			batch->next = NULL;
+			state->current_token_data->batch_head = batch;
+			state->current_token_data->batch_tail = batch;
+		} else if (head->pinned_time) {
+			/*
+			 * We cannot alter the current batch list
+			 * because:
+			 *
+			 * [a] it is being transmitted to at least one
+			 * client and the handle_client() thread has a
+			 * ref-count, but not a lock on the batch list
+			 * starting with this item.
+			 *
+			 * [b] it has been transmitted in the past to
+			 * at least one client such that future
+			 * requests are relative to this head batch.
+			 *
+			 * So, we can only prepend a new batch onto
+			 * the front of the list.
+			 */
+			batch->batch_seq_nr = head->batch_seq_nr + 1;
+			batch->next = head;
+			state->current_token_data->batch_head = batch;
+		} else if (head->nr + batch->nr > MY_COMBINE_LIMIT) {
+			/*
+			 * The head batch in the list has never been
+			 * transmitted to a client, but folding the
+			 * contents of the new batch onto it would
+			 * exceed our arbitrary limit, so just prepend
+			 * the new batch onto the list.
+			 */
+			batch->batch_seq_nr = head->batch_seq_nr + 1;
+			batch->next = head;
+			state->current_token_data->batch_head = batch;
+		} else {
+			/*
+			 * We are free to append the paths in the given
+			 * batch onto the end of the current head batch.
+			 */
+			fsmonitor_batch__combine(head, batch);
+			fsmonitor_batch__free(batch);
+		}
+	}
+
+	pthread_mutex_unlock(&state->main_lock);
+}
+
 static void *fsmonitor_fs_listen__thread_proc(void *_state)
 {
 	struct fsmonitor_daemon_state *state = _state;
@@ -369,6 +554,13 @@  static void *fsmonitor_fs_listen__thread_proc(void *_state)
 
 	fsmonitor_fs_listen__loop(state);
 
+	pthread_mutex_lock(&state->main_lock);
+	if (state->current_token_data &&
+	    state->current_token_data->client_ref_count == 0)
+		fsmonitor_free_token_data(state->current_token_data);
+	state->current_token_data = NULL;
+	pthread_mutex_unlock(&state->main_lock);
+
 	trace2_thread_exit();
 	return NULL;
 }
diff --git a/fsmonitor--daemon.h b/fsmonitor--daemon.h
index 97ea3766e900..06563b6ed56c 100644
--- a/fsmonitor--daemon.h
+++ b/fsmonitor--daemon.h
@@ -12,6 +12,27 @@ 
 struct fsmonitor_batch;
 struct fsmonitor_token_data;
 
+/*
+ * Create a new batch of path(s).  The returned batch is considered
+ * private and not linked into the fsmonitor daemon state.  The caller
+ * should fill this batch with one or more paths and then publish it.
+ */
+struct fsmonitor_batch *fsmonitor_batch__new(void);
+
+/*
+ * Free this batch and return the value of the batch->next field.
+ */
+struct fsmonitor_batch *fsmonitor_batch__free(struct fsmonitor_batch *batch);
+
+/*
+ * Add this path to this batch of modified files.
+ *
+ * The batch should be private and NOT (yet) linked into the fsmonitor
+ * daemon state and therefore not yet visible to worker threads and so
+ * no locking is required.
+ */
+void fsmonitor_batch__add_path(struct fsmonitor_batch *batch, const char *path);
+
 struct fsmonitor_daemon_backend_data; /* opaque platform-specific data */
 
 struct fsmonitor_daemon_state {
@@ -93,5 +114,24 @@  enum fsmonitor_path_type fsmonitor_classify_path_absolute(
 	struct fsmonitor_daemon_state *state,
 	const char *path);
 
+/*
+ * Prepend the this batch of path(s) onto the list of batches associated
+ * with the current token.  This makes the batch visible to worker threads.
+ *
+ * The caller no longer owns the batch and must not free it.
+ *
+ * Wake up the client threads waiting on these cookies.
+ */
+void fsmonitor_publish(struct fsmonitor_daemon_state *state,
+		       struct fsmonitor_batch *batch,
+		       const struct string_list *cookie_names);
+
+/*
+ * If the platform-specific layer loses sync with the filesystem,
+ * it should call this to invalidate cached data and abort waiting
+ * threads.
+ */
+void fsmonitor_force_resync(struct fsmonitor_daemon_state *state);
+
 #endif /* HAVE_FSMONITOR_DAEMON_BACKEND */
 #endif /* FSMONITOR_DAEMON_H */