Message ID | f1fa803ebe9c9f78608c22e55ec590f8c6775c94.1617291666.git.gitgitgadget@gmail.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | Builtin FSMonitor Feature | expand |
On 4/1/2021 11:40 AM, Jeff Hostetler via GitGitGadget wrote: > From: Jeff Hostetler <jeffhost@microsoft.com> > > Teach fsmonitor--daemon to build lists of changed paths and associate > them with a token-id. This will be used by the platform-specific > backends to accumulate changed paths in response to filesystem events. > > The platform-specific event loops receive batches containing one or > more changed paths. Their fs listener thread will accumulate them in I think the lowercase "fs" here is strange. "Their listener thread" could be interpreted as the IPC listener, so it's probably best to spell it out: "Their filesystem listener thread". > a `fsmonitor_batch` (and without locking) and then "publish" them to > associate them with the current token and to make them visible to the > client worker threads. ... > +struct fsmonitor_batch { > + struct fsmonitor_batch *next; > + uint64_t batch_seq_nr; > + const char **interned_paths; > + size_t nr, alloc; > + time_t pinned_time; > +}; A linked list to help with adding while consuming it, but also batching for efficiency. I can see how this will work out nicely. > +struct fsmonitor_batch *fsmonitor_batch__new(void) > +{ > + struct fsmonitor_batch *batch = xcalloc(1, sizeof(*batch)); I mentioned earlier that I think `CALLOC_ARRAY(batch, 1)` is the typical pattern here. > + > + return batch; > +} > + > +struct fsmonitor_batch *fsmonitor_batch__free(struct fsmonitor_batch *batch) Since this method frees the tip of the list and returns the next item (instead of freeing the entire list), perhaps this would be better named as _pop()? > +{ > + struct fsmonitor_batch *next; > + > + if (!batch) > + return NULL; > + > + next = batch->next; > + > + /* > + * The actual strings within the array are interned, so we don't > + * own them. > + */ > + free(batch->interned_paths); > + > + return next; > +} > + > +void fsmonitor_batch__add_path(struct fsmonitor_batch *batch, > + const char *path) > +{ > + const char *interned_path = strintern(path); This use of interned paths is interesting, although I become concerned for the amount of memory we are consuming over the lifetime of the process. This could be considered as a target for future improvements, perhaps with an LRU cache or something similar. > + > + trace_printf_key(&trace_fsmonitor, "event: %s", interned_path); > + > + ALLOC_GROW(batch->interned_paths, batch->nr + 1, batch->alloc); > + batch->interned_paths[batch->nr++] = interned_path; > +} > + > +static void fsmonitor_batch__combine(struct fsmonitor_batch *batch_dest, > + const struct fsmonitor_batch *batch_src) > +{ > + /* assert state->main_lock */ > + This comment seems stale. > + size_t k; > + > + ALLOC_GROW(batch_dest->interned_paths, > + batch_dest->nr + batch_src->nr + 1, > + batch_dest->alloc); > + > + for (k = 0; k < batch_src->nr; k++) > + batch_dest->interned_paths[batch_dest->nr++] = > + batch_src->interned_paths[k]; > +} > + > +static void fsmonitor_free_token_data(struct fsmonitor_token_data *token) This one _does_ free the whole list. > +{ > + struct fsmonitor_batch *p; > + > + if (!token) > + return; > + > + assert(token->client_ref_count == 0); > + > + strbuf_release(&token->token_id); > + > + for (p = token->batch_head; p; p = fsmonitor_batch__free(p)) > + ; > + > + free(token); > +} > + > +/* > + * Flush all of our cached data about the filesystem. Call this if we > + * lose sync with the filesystem and miss some notification events. > + * > + * [1] If we are missing events, then we no longer have a complete > + * history of the directory (relative to our current start token). > + * We should create a new token and start fresh (as if we just > + * booted up). > + * > + * If there are no readers of the the current token data series, we > + * can free it now. Otherwise, let the last reader free it. Either > + * way, the old token data series is no longer associated with our > + * state data. > + */ > +void fsmonitor_force_resync(struct fsmonitor_daemon_state *state) > +{ > + struct fsmonitor_token_data *free_me = NULL; > + struct fsmonitor_token_data *new_one = NULL; > + > + new_one = fsmonitor_new_token_data(); > + > + pthread_mutex_lock(&state->main_lock); > + > + trace_printf_key(&trace_fsmonitor, > + "force resync [old '%s'][new '%s']", > + state->current_token_data->token_id.buf, > + new_one->token_id.buf); > + > + if (state->current_token_data->client_ref_count == 0) > + free_me = state->current_token_data; > + state->current_token_data = new_one; > + > + pthread_mutex_unlock(&state->main_lock); > + > + fsmonitor_free_token_data(free_me); > +} > + Swap the pointer under a lock, free outside of it. Good. > +/* > + * We try to combine small batches at the front of the batch-list to avoid > + * having a long list. This hopefully makes it a little easier when we want > + * to truncate and maintain the list. However, we don't want the paths array > + * to just keep growing and growing with realloc, so we insert an arbitrary > + * limit. > + */ > +#define MY_COMBINE_LIMIT (1024) > + > +void fsmonitor_publish(struct fsmonitor_daemon_state *state, > + struct fsmonitor_batch *batch, > + const struct string_list *cookie_names) > +{ > + if (!batch && !cookie_names->nr) > + return; > + > + pthread_mutex_lock(&state->main_lock); > + > + if (batch) { > + struct fsmonitor_batch *head; > + > + head = state->current_token_data->batch_head; > + if (!head) { > + batch->batch_seq_nr = 0; > + batch->next = NULL; > + state->current_token_data->batch_head = batch; > + state->current_token_data->batch_tail = batch; > + } else if (head->pinned_time) { > + /* > + * We cannot alter the current batch list > + * because: > + * > + * [a] it is being transmitted to at least one > + * client and the handle_client() thread has a > + * ref-count, but not a lock on the batch list > + * starting with this item. > + * > + * [b] it has been transmitted in the past to > + * at least one client such that future > + * requests are relative to this head batch. > + * > + * So, we can only prepend a new batch onto > + * the front of the list. > + */ > + batch->batch_seq_nr = head->batch_seq_nr + 1; > + batch->next = head; > + state->current_token_data->batch_head = batch; > + } else if (head->nr + batch->nr > MY_COMBINE_LIMIT) { > + /* > + * The head batch in the list has never been > + * transmitted to a client, but folding the > + * contents of the new batch onto it would > + * exceed our arbitrary limit, so just prepend > + * the new batch onto the list. > + */ > + batch->batch_seq_nr = head->batch_seq_nr + 1; > + batch->next = head; > + state->current_token_data->batch_head = batch; > + } else { > + /* > + * We are free to append the paths in the given > + * batch onto the end of the current head batch. > + */ > + fsmonitor_batch__combine(head, batch); > + fsmonitor_batch__free(batch); > + } > + } > + > + pthread_mutex_unlock(&state->main_lock); > +} I appreciate the careful comments in this critical piece of the data structure. Also, it is good that you already have a batch of results to merge into the list instead of updating a lock for every filesystem event. Thanks, -Stolee
On 4/26/21 4:22 PM, Derrick Stolee wrote: ... >> + >> +void fsmonitor_batch__add_path(struct fsmonitor_batch *batch, >> + const char *path) >> +{ >> + const char *interned_path = strintern(path); > > This use of interned paths is interesting, although I become > concerned for the amount of memory we are consuming over the > lifetime of the process. This could be considered as a target > for future improvements, perhaps with an LRU cache or something > similar. Interning gives us a fixed pointer for any given path. This gives us a way to de-dup paths using just pointers rather than string compares. Yes, we will accumulate paths in that dictionary, but the set of paths present in the typical working directory are usually pretty fixed. We only generate these for modified paths. Users don't typically create/modify/delete that many paths in their source trees during normal development. Compilers may generate lots of trash files in their worktree, but those names are usually repeated (with each "make"). So we might acculuate a lot of paths for a repo, it should become stable. However, if they use temp files in the tree, it might invalidate this statement. WRT LRUs, that gets us into threading and lock contention problem and ref-counting. I have it designed such that parallel threads read and send the current queue to the client without a lock. They only need a quick lock to get the current head pointer; the rest is done lock free. Also, purging from the end of the LRU would put is in contention with the FS listener thread that is adding new paths to the LRU. So, yeah, maybe this is something to keep an eye on -- especially in the monorepo case, but I don't think we need to address it now. Thanks, Jeff
diff --git a/builtin/fsmonitor--daemon.c b/builtin/fsmonitor--daemon.c index 2d25e36601fe..48071d445c49 100644 --- a/builtin/fsmonitor--daemon.c +++ b/builtin/fsmonitor--daemon.c @@ -255,6 +255,120 @@ static struct fsmonitor_token_data *fsmonitor_new_token_data(void) return token; } +struct fsmonitor_batch { + struct fsmonitor_batch *next; + uint64_t batch_seq_nr; + const char **interned_paths; + size_t nr, alloc; + time_t pinned_time; +}; + +struct fsmonitor_batch *fsmonitor_batch__new(void) +{ + struct fsmonitor_batch *batch = xcalloc(1, sizeof(*batch)); + + return batch; +} + +struct fsmonitor_batch *fsmonitor_batch__free(struct fsmonitor_batch *batch) +{ + struct fsmonitor_batch *next; + + if (!batch) + return NULL; + + next = batch->next; + + /* + * The actual strings within the array are interned, so we don't + * own them. + */ + free(batch->interned_paths); + + return next; +} + +void fsmonitor_batch__add_path(struct fsmonitor_batch *batch, + const char *path) +{ + const char *interned_path = strintern(path); + + trace_printf_key(&trace_fsmonitor, "event: %s", interned_path); + + ALLOC_GROW(batch->interned_paths, batch->nr + 1, batch->alloc); + batch->interned_paths[batch->nr++] = interned_path; +} + +static void fsmonitor_batch__combine(struct fsmonitor_batch *batch_dest, + const struct fsmonitor_batch *batch_src) +{ + /* assert state->main_lock */ + + size_t k; + + ALLOC_GROW(batch_dest->interned_paths, + batch_dest->nr + batch_src->nr + 1, + batch_dest->alloc); + + for (k = 0; k < batch_src->nr; k++) + batch_dest->interned_paths[batch_dest->nr++] = + batch_src->interned_paths[k]; +} + +static void fsmonitor_free_token_data(struct fsmonitor_token_data *token) +{ + struct fsmonitor_batch *p; + + if (!token) + return; + + assert(token->client_ref_count == 0); + + strbuf_release(&token->token_id); + + for (p = token->batch_head; p; p = fsmonitor_batch__free(p)) + ; + + free(token); +} + +/* + * Flush all of our cached data about the filesystem. Call this if we + * lose sync with the filesystem and miss some notification events. + * + * [1] If we are missing events, then we no longer have a complete + * history of the directory (relative to our current start token). + * We should create a new token and start fresh (as if we just + * booted up). + * + * If there are no readers of the the current token data series, we + * can free it now. Otherwise, let the last reader free it. Either + * way, the old token data series is no longer associated with our + * state data. + */ +void fsmonitor_force_resync(struct fsmonitor_daemon_state *state) +{ + struct fsmonitor_token_data *free_me = NULL; + struct fsmonitor_token_data *new_one = NULL; + + new_one = fsmonitor_new_token_data(); + + pthread_mutex_lock(&state->main_lock); + + trace_printf_key(&trace_fsmonitor, + "force resync [old '%s'][new '%s']", + state->current_token_data->token_id.buf, + new_one->token_id.buf); + + if (state->current_token_data->client_ref_count == 0) + free_me = state->current_token_data; + state->current_token_data = new_one; + + pthread_mutex_unlock(&state->main_lock); + + fsmonitor_free_token_data(free_me); +} + static ipc_server_application_cb handle_client; static int handle_client(void *data, const char *command, @@ -355,6 +469,77 @@ enum fsmonitor_path_type fsmonitor_classify_path_absolute( return fsmonitor_classify_path_gitdir_relative(rel); } +/* + * We try to combine small batches at the front of the batch-list to avoid + * having a long list. This hopefully makes it a little easier when we want + * to truncate and maintain the list. However, we don't want the paths array + * to just keep growing and growing with realloc, so we insert an arbitrary + * limit. + */ +#define MY_COMBINE_LIMIT (1024) + +void fsmonitor_publish(struct fsmonitor_daemon_state *state, + struct fsmonitor_batch *batch, + const struct string_list *cookie_names) +{ + if (!batch && !cookie_names->nr) + return; + + pthread_mutex_lock(&state->main_lock); + + if (batch) { + struct fsmonitor_batch *head; + + head = state->current_token_data->batch_head; + if (!head) { + batch->batch_seq_nr = 0; + batch->next = NULL; + state->current_token_data->batch_head = batch; + state->current_token_data->batch_tail = batch; + } else if (head->pinned_time) { + /* + * We cannot alter the current batch list + * because: + * + * [a] it is being transmitted to at least one + * client and the handle_client() thread has a + * ref-count, but not a lock on the batch list + * starting with this item. + * + * [b] it has been transmitted in the past to + * at least one client such that future + * requests are relative to this head batch. + * + * So, we can only prepend a new batch onto + * the front of the list. + */ + batch->batch_seq_nr = head->batch_seq_nr + 1; + batch->next = head; + state->current_token_data->batch_head = batch; + } else if (head->nr + batch->nr > MY_COMBINE_LIMIT) { + /* + * The head batch in the list has never been + * transmitted to a client, but folding the + * contents of the new batch onto it would + * exceed our arbitrary limit, so just prepend + * the new batch onto the list. + */ + batch->batch_seq_nr = head->batch_seq_nr + 1; + batch->next = head; + state->current_token_data->batch_head = batch; + } else { + /* + * We are free to append the paths in the given + * batch onto the end of the current head batch. + */ + fsmonitor_batch__combine(head, batch); + fsmonitor_batch__free(batch); + } + } + + pthread_mutex_unlock(&state->main_lock); +} + static void *fsmonitor_fs_listen__thread_proc(void *_state) { struct fsmonitor_daemon_state *state = _state; @@ -369,6 +554,13 @@ static void *fsmonitor_fs_listen__thread_proc(void *_state) fsmonitor_fs_listen__loop(state); + pthread_mutex_lock(&state->main_lock); + if (state->current_token_data && + state->current_token_data->client_ref_count == 0) + fsmonitor_free_token_data(state->current_token_data); + state->current_token_data = NULL; + pthread_mutex_unlock(&state->main_lock); + trace2_thread_exit(); return NULL; } diff --git a/fsmonitor--daemon.h b/fsmonitor--daemon.h index 97ea3766e900..06563b6ed56c 100644 --- a/fsmonitor--daemon.h +++ b/fsmonitor--daemon.h @@ -12,6 +12,27 @@ struct fsmonitor_batch; struct fsmonitor_token_data; +/* + * Create a new batch of path(s). The returned batch is considered + * private and not linked into the fsmonitor daemon state. The caller + * should fill this batch with one or more paths and then publish it. + */ +struct fsmonitor_batch *fsmonitor_batch__new(void); + +/* + * Free this batch and return the value of the batch->next field. + */ +struct fsmonitor_batch *fsmonitor_batch__free(struct fsmonitor_batch *batch); + +/* + * Add this path to this batch of modified files. + * + * The batch should be private and NOT (yet) linked into the fsmonitor + * daemon state and therefore not yet visible to worker threads and so + * no locking is required. + */ +void fsmonitor_batch__add_path(struct fsmonitor_batch *batch, const char *path); + struct fsmonitor_daemon_backend_data; /* opaque platform-specific data */ struct fsmonitor_daemon_state { @@ -93,5 +114,24 @@ enum fsmonitor_path_type fsmonitor_classify_path_absolute( struct fsmonitor_daemon_state *state, const char *path); +/* + * Prepend the this batch of path(s) onto the list of batches associated + * with the current token. This makes the batch visible to worker threads. + * + * The caller no longer owns the batch and must not free it. + * + * Wake up the client threads waiting on these cookies. + */ +void fsmonitor_publish(struct fsmonitor_daemon_state *state, + struct fsmonitor_batch *batch, + const struct string_list *cookie_names); + +/* + * If the platform-specific layer loses sync with the filesystem, + * it should call this to invalidate cached data and abort waiting + * threads. + */ +void fsmonitor_force_resync(struct fsmonitor_daemon_state *state); + #endif /* HAVE_FSMONITOR_DAEMON_BACKEND */ #endif /* FSMONITOR_DAEMON_H */