From patchwork Thu Oct 21 14:24:58 2021 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Jeff Hostetler X-Patchwork-Id: 12575225 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from mail.kernel.org (mail.kernel.org [198.145.29.99]) by smtp.lore.kernel.org (Postfix) with ESMTP id 0CEB1C433F5 for ; Thu, 21 Oct 2021 14:25:53 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [23.128.96.18]) by mail.kernel.org (Postfix) with ESMTP id EBC166120F for ; Thu, 21 Oct 2021 14:25:52 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S231777AbhJUO2H (ORCPT ); Thu, 21 Oct 2021 10:28:07 -0400 Received: from lindbergh.monkeyblade.net ([23.128.96.19]:38358 "EHLO lindbergh.monkeyblade.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S231401AbhJUO1m (ORCPT ); Thu, 21 Oct 2021 10:27:42 -0400 Received: from mail-wm1-x335.google.com (mail-wm1-x335.google.com [IPv6:2a00:1450:4864:20::335]) by lindbergh.monkeyblade.net (Postfix) with ESMTPS id D15E9C06122F for ; Thu, 21 Oct 2021 07:25:23 -0700 (PDT) Received: by mail-wm1-x335.google.com with SMTP id o24-20020a05600c511800b0030d9da600aeso871591wms.4 for ; Thu, 21 Oct 2021 07:25:23 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20210112; h=message-id:in-reply-to:references:from:date:subject:fcc :content-transfer-encoding:mime-version:to:cc; bh=8+AgGU8TVF/ZjHBuGCfxCzotLqwQQ/VqMskDBsu1QcY=; b=M1+Ta3kkC8U93Q6xH/1bHZsmDyc19EXQ+l4J3H/n00gdsQEoqYRmSYxAyU0Ff1xnWL Ie3XZuQut9O1suneNqsmHxZYJff94IjvK259LtJTDhltlDQFRVALYzSBgSp/FTPkNazL 8KCmy8PvPDnfA2eobsMoMYbcFzqGm723+lT4iTUBd7KXEGYmgZ0ETC93HkfxXcoYwJ2T ShjYyAAIEyFczwvkIBuC+ZI1KyP1//SqK3QKEsykdb8/hJNJq0BoF8ytWaLgjyg4BJH8 WYqCHSgZPbASwR5P2CZXuLuRKezUGxIS5U6m5amueNxh/LA8ECpvWDykqk3G3W8eVWor w0iA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20210112; h=x-gm-message-state:message-id:in-reply-to:references:from:date :subject:fcc:content-transfer-encoding:mime-version:to:cc; bh=8+AgGU8TVF/ZjHBuGCfxCzotLqwQQ/VqMskDBsu1QcY=; b=hRfZhqMRvkKDXIQgMv4ghPuarBvXh/P8IORirG2188IsM3M1Cby6XslM5YgngvxvB5 0HkGWoihRx1PUf0kycCQhuckOuRvqIcjVF5BN+Nz4uMSrucjoO1LqLv5oJtMnwef9ggd elLTq8O2eGDtHPhn/3JPFbGkg17xx+ODSgRXcA3TLGfLkua457JuEomJd2AaLbvKa0cH 6bVoaD62Hq+AROdV+75Fl28RdKxljhQylxyUawmlfKolHp89zZDckYm0T203Gx8vrAhg i462DY+6Fqj99/momxbH2hqj5lxX37P7Y7/lGTNcB0DTV9iJZGstynD7R+djADQ/TddX xRpw== X-Gm-Message-State: AOAM530HzaCn8RjoTBFTmkHYLiP6kiKsRYTcy1+0E6zxWzVwpMMoBk9a fy45+HBaAUV9bfDX6T1RsqEIq1QgfRM= X-Google-Smtp-Source: ABdhPJz2PvhBbqcKgbFSBVtT+pIRs1fmp7h6fxTrJM70bYBdlbOZp+hAHqXRQG2zrSLrFz5rEI5jHw== X-Received: by 2002:a05:600c:2109:: with SMTP id u9mr6874106wml.68.1634826322450; Thu, 21 Oct 2021 07:25:22 -0700 (PDT) Received: from [127.0.0.1] ([13.74.141.28]) by smtp.gmail.com with ESMTPSA id k17sm4937356wrc.93.2021.10.21.07.25.21 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Thu, 21 Oct 2021 07:25:22 -0700 (PDT) Message-Id: <241962894f1a25e9422621dc80baf42d70ea1433.1634826309.git.gitgitgadget@gmail.com> In-Reply-To: References: Date: Thu, 21 Oct 2021 14:24:58 +0000 Subject: [PATCH v4 18/29] fsmonitor--daemon: implement handle_client callback Fcc: Sent MIME-Version: 1.0 To: git@vger.kernel.org Cc: Jeff Hostetler , Jeff Hostetler , Jeff Hostetler Precedence: bulk List-ID: X-Mailing-List: git@vger.kernel.org From: Jeff Hostetler From: Jeff Hostetler Teach fsmonitor--daemon to respond to IPC requests from client Git processes and respond with a list of modified pathnames relative to the provided token. Signed-off-by: Jeff Hostetler --- builtin/fsmonitor--daemon.c | 312 +++++++++++++++++++++++++++++++++++- 1 file changed, 310 insertions(+), 2 deletions(-) diff --git a/builtin/fsmonitor--daemon.c b/builtin/fsmonitor--daemon.c index 34603f23053..4c88171e06c 100644 --- a/builtin/fsmonitor--daemon.c +++ b/builtin/fsmonitor--daemon.c @@ -7,6 +7,7 @@ #include "fsmonitor--daemon.h" #include "simple-ipc.h" #include "khash.h" +#include "pkt-line.h" static const char * const builtin_fsmonitor__daemon_usage[] = { N_("git fsmonitor--daemon start []"), @@ -351,6 +352,311 @@ void fsmonitor_force_resync(struct fsmonitor_daemon_state *state) pthread_mutex_unlock(&state->main_lock); } +/* + * Format an opaque token string to send to the client. + */ +static void with_lock__format_response_token( + struct strbuf *response_token, + const struct strbuf *response_token_id, + const struct fsmonitor_batch *batch) +{ + /* assert current thread holding state->main_lock */ + + strbuf_reset(response_token); + strbuf_addf(response_token, "builtin:%s:%"PRIu64, + response_token_id->buf, batch->batch_seq_nr); +} + +/* + * Parse an opaque token from the client. + * Returns -1 on error. + */ +static int fsmonitor_parse_client_token(const char *buf_token, + struct strbuf *requested_token_id, + uint64_t *seq_nr) +{ + const char *p; + char *p_end; + + strbuf_reset(requested_token_id); + *seq_nr = 0; + + if (!skip_prefix(buf_token, "builtin:", &p)) + return -1; + + while (*p && *p != ':') + strbuf_addch(requested_token_id, *p++); + if (!*p++) + return -1; + + *seq_nr = (uint64_t)strtoumax(p, &p_end, 10); + if (*p_end) + return -1; + + return 0; +} + +KHASH_INIT(str, const char *, int, 0, kh_str_hash_func, kh_str_hash_equal) + +static int do_handle_client(struct fsmonitor_daemon_state *state, + const char *command, + ipc_server_reply_cb *reply, + struct ipc_server_reply_data *reply_data) +{ + struct fsmonitor_token_data *token_data = NULL; + struct strbuf response_token = STRBUF_INIT; + struct strbuf requested_token_id = STRBUF_INIT; + struct strbuf payload = STRBUF_INIT; + uint64_t requested_oldest_seq_nr = 0; + uint64_t total_response_len = 0; + const char *p; + const struct fsmonitor_batch *batch_head; + const struct fsmonitor_batch *batch; + intmax_t count = 0, duplicates = 0; + kh_str_t *shown; + int hash_ret; + int do_trivial = 0; + int do_flush = 0; + + /* + * We expect `command` to be of the form: + * + * := quit NUL + * | flush NUL + * | NUL + * | NUL + */ + + if (!strcmp(command, "quit")) { + /* + * A client has requested over the socket/pipe that the + * daemon shutdown. + * + * Tell the IPC thread pool to shutdown (which completes + * the await in the main thread (which can stop the + * fsmonitor listener thread)). + * + * There is no reply to the client. + */ + return SIMPLE_IPC_QUIT; + + } else if (!strcmp(command, "flush")) { + /* + * Flush all of our cached data and generate a new token + * just like if we lost sync with the filesystem. + * + * Then send a trivial response using the new token. + */ + do_flush = 1; + do_trivial = 1; + + } else if (!skip_prefix(command, "builtin:", &p)) { + /* assume V1 timestamp or garbage */ + + char *p_end; + + strtoumax(command, &p_end, 10); + trace_printf_key(&trace_fsmonitor, + ((*p_end) ? + "fsmonitor: invalid command line '%s'" : + "fsmonitor: unsupported V1 protocol '%s'"), + command); + do_trivial = 1; + + } else { + /* We have "builtin:*" */ + if (fsmonitor_parse_client_token(command, &requested_token_id, + &requested_oldest_seq_nr)) { + trace_printf_key(&trace_fsmonitor, + "fsmonitor: invalid V2 protocol token '%s'", + command); + do_trivial = 1; + + } else { + /* + * We have a V2 valid token: + * "builtin::" + */ + } + } + + pthread_mutex_lock(&state->main_lock); + + if (!state->current_token_data) + BUG("fsmonitor state does not have a current token"); + + if (do_flush) + with_lock__do_force_resync(state); + + /* + * We mark the current head of the batch list as "pinned" so + * that the listener thread will treat this item as read-only + * (and prevent any more paths from being added to it) from + * now on. + */ + token_data = state->current_token_data; + batch_head = token_data->batch_head; + ((struct fsmonitor_batch *)batch_head)->pinned_time = time(NULL); + + /* + * FSMonitor Protocol V2 requires that we send a response header + * with a "new current token" and then all of the paths that changed + * since the "requested token". We send the seq_nr of the just-pinned + * head batch so that future requests from a client will be relative + * to it. + */ + with_lock__format_response_token(&response_token, + &token_data->token_id, batch_head); + + reply(reply_data, response_token.buf, response_token.len + 1); + total_response_len += response_token.len + 1; + + trace2_data_string("fsmonitor", the_repository, "response/token", + response_token.buf); + trace_printf_key(&trace_fsmonitor, "response token: %s", + response_token.buf); + + if (!do_trivial) { + if (strcmp(requested_token_id.buf, token_data->token_id.buf)) { + /* + * The client last spoke to a different daemon + * instance -OR- the daemon had to resync with + * the filesystem (and lost events), so reject. + */ + trace2_data_string("fsmonitor", the_repository, + "response/token", "different"); + do_trivial = 1; + + } else if (requested_oldest_seq_nr < + token_data->batch_tail->batch_seq_nr) { + /* + * The client wants older events than we have for + * this token_id. This means that the end of our + * batch list was truncated and we cannot give the + * client a complete snapshot relative to their + * request. + */ + trace_printf_key(&trace_fsmonitor, + "client requested truncated data"); + do_trivial = 1; + } + } + + if (do_trivial) { + pthread_mutex_unlock(&state->main_lock); + + reply(reply_data, "/", 2); + + trace2_data_intmax("fsmonitor", the_repository, + "response/trivial", 1); + + strbuf_release(&response_token); + strbuf_release(&requested_token_id); + return 0; + } + + /* + * We're going to hold onto a pointer to the current + * token-data while we walk the list of batches of files. + * During this time, we will NOT be under the lock. + * So we ref-count it. + * + * This allows the listener thread to continue prepending + * new batches of items to the token-data (which we'll ignore). + * + * AND it allows the listener thread to do a token-reset + * (and install a new `current_token_data`). + */ + token_data->client_ref_count++; + + pthread_mutex_unlock(&state->main_lock); + + /* + * The client request is relative to the token that they sent, + * so walk the batch list backwards from the current head back + * to the batch (sequence number) they named. + * + * We use khash to de-dup the list of pathnames. + * + * NEEDSWORK: each batch contains a list of interned strings, + * so we only need to do pointer comparisons here to build the + * hash table. Currently, we're still comparing the string + * values. + */ + shown = kh_init_str(); + for (batch = batch_head; + batch && batch->batch_seq_nr > requested_oldest_seq_nr; + batch = batch->next) { + size_t k; + + for (k = 0; k < batch->nr; k++) { + const char *s = batch->interned_paths[k]; + size_t s_len; + + if (kh_get_str(shown, s) != kh_end(shown)) + duplicates++; + else { + kh_put_str(shown, s, &hash_ret); + + trace_printf_key(&trace_fsmonitor, + "send[%"PRIuMAX"]: %s", + count, s); + + /* Each path gets written with a trailing NUL */ + s_len = strlen(s) + 1; + + if (payload.len + s_len >= + LARGE_PACKET_DATA_MAX) { + reply(reply_data, payload.buf, + payload.len); + total_response_len += payload.len; + strbuf_reset(&payload); + } + + strbuf_add(&payload, s, s_len); + count++; + } + } + } + + if (payload.len) { + reply(reply_data, payload.buf, payload.len); + total_response_len += payload.len; + } + + kh_release_str(shown); + + pthread_mutex_lock(&state->main_lock); + + if (token_data->client_ref_count > 0) + token_data->client_ref_count--; + + if (token_data->client_ref_count == 0) { + if (token_data != state->current_token_data) { + /* + * The listener thread did a token-reset while we were + * walking the batch list. Therefore, this token is + * stale and can be discarded completely. If we are + * the last reader thread using this token, we own + * that work. + */ + fsmonitor_free_token_data(token_data); + } + } + + pthread_mutex_unlock(&state->main_lock); + + trace2_data_intmax("fsmonitor", the_repository, "response/length", total_response_len); + trace2_data_intmax("fsmonitor", the_repository, "response/count/files", count); + trace2_data_intmax("fsmonitor", the_repository, "response/count/duplicates", duplicates); + + strbuf_release(&response_token); + strbuf_release(&requested_token_id); + strbuf_release(&payload); + + return 0; +} + static ipc_server_application_cb handle_client; static int handle_client(void *data, @@ -358,7 +664,7 @@ static int handle_client(void *data, ipc_server_reply_cb *reply, struct ipc_server_reply_data *reply_data) { - /* struct fsmonitor_daemon_state *state = data; */ + struct fsmonitor_daemon_state *state = data; int result; /* @@ -369,10 +675,12 @@ static int handle_client(void *data, if (command_len != strlen(command)) BUG("FSMonitor assumes text messages"); + trace_printf_key(&trace_fsmonitor, "requested token: %s", command); + trace2_region_enter("fsmonitor", "handle_client", the_repository); trace2_data_string("fsmonitor", the_repository, "request", command); - result = 0; /* TODO Do something here. */ + result = do_handle_client(state, command, reply, reply_data); trace2_region_leave("fsmonitor", "handle_client", the_repository);