Message ID | 239bf39bfb21ef621a15839bade34446dcbc3103.1697560266.git.me@ttaylorr.com (mailing list archive) |
---|---|
State | Superseded |
Headers | show |
Series | merge-ort: implement support for packing objects together | expand |
Taylor Blau <me@ttaylorr.com> writes: > bulk-checkin.c | 118 +++++++++++++++++++++++++++++++++++++++++++++++++ > bulk-checkin.h | 4 ++ > 2 files changed, 122 insertions(+) Unlike the previous four, which were very clear refactoring to create reusable helper functions, this step leaves a bad aftertaste after reading twice, and I think what is disturbing is that many new lines are pretty much literally copied from stream_blob_to_pack(). I wonder if we can introduce an "input" source abstraction, that replaces "fd" and "size" (and "path" for error reporting) parameters to the stream_blob_to_pack(), so that the bulk of the implementation of stream_blob_to_pack() can call its .read() method to read bytes up to "size" from such an abstracted interface? That would be a good sized first half of this change. Then in the second half, you can add another "input" source that works with in-core "buf" and "size", whose .read() method will merely be a memcpy(). That way, we will have two functions, one for stream_obj_to_pack() that reads from an open file descriptor, and the other for stream_obj_to_pack_incore() that reads from an in-core buffer, sharing the bulk of the implementation that is extracted from the current code, which hopefully be easier to audit. > diff --git a/bulk-checkin.c b/bulk-checkin.c > index f4914fb6d1..25cd1ffa25 100644 > --- a/bulk-checkin.c > +++ b/bulk-checkin.c > @@ -140,6 +140,69 @@ static int already_written(struct bulk_checkin_packfile *state, struct object_id > return 0; > } > > +static int stream_obj_to_pack_incore(struct bulk_checkin_packfile *state, > + git_hash_ctx *ctx, > + off_t *already_hashed_to, > + const void *buf, size_t size, > + enum object_type type, > + const char *path, unsigned flags) > +{ > + git_zstream s; > + unsigned char obuf[16384]; > + unsigned hdrlen; > + int status = Z_OK; > + int write_object = (flags & HASH_WRITE_OBJECT); > + > + git_deflate_init(&s, pack_compression_level); > + > + hdrlen = encode_in_pack_object_header(obuf, sizeof(obuf), type, size); > + s.next_out = obuf + hdrlen; > + s.avail_out = sizeof(obuf) - hdrlen; > + > + if (*already_hashed_to < size) { > + size_t hsize = size - *already_hashed_to; > + if (hsize) { > + the_hash_algo->update_fn(ctx, buf, hsize); > + } > + *already_hashed_to = size; > + } > + s.next_in = (void *)buf; > + s.avail_in = size; > + > + while (status != Z_STREAM_END) { > + status = git_deflate(&s, Z_FINISH); > + if (!s.avail_out || status == Z_STREAM_END) { > + if (write_object) { > + size_t written = s.next_out - obuf; > + > + /* would we bust the size limit? */ > + if (state->nr_written && > + pack_size_limit_cfg && > + pack_size_limit_cfg < state->offset + written) { > + git_deflate_abort(&s); > + return -1; > + } > + > + hashwrite(state->f, obuf, written); > + state->offset += written; > + } > + s.next_out = obuf; > + s.avail_out = sizeof(obuf); > + } > + > + switch (status) { > + case Z_OK: > + case Z_BUF_ERROR: > + case Z_STREAM_END: > + continue; > + default: > + die("unexpected deflate failure: %d", status); > + } > + } > + git_deflate_end(&s); > + return 0; > +} > + > /* > * Read the contents from fd for size bytes, streaming it to the > * packfile in state while updating the hash in ctx. Signal a failure > @@ -316,6 +379,50 @@ static void finalize_checkpoint(struct bulk_checkin_packfile *state, > } > } > > +static int deflate_obj_contents_to_pack_incore(struct bulk_checkin_packfile *state, > + git_hash_ctx *ctx, > + struct hashfile_checkpoint *checkpoint, > + struct object_id *result_oid, > + const void *buf, size_t size, > + enum object_type type, > + const char *path, unsigned flags) > +{ > + struct pack_idx_entry *idx = NULL; > + off_t already_hashed_to = 0; > + > + /* Note: idx is non-NULL when we are writing */ > + if (flags & HASH_WRITE_OBJECT) > + CALLOC_ARRAY(idx, 1); > + > + while (1) { > + prepare_checkpoint(state, checkpoint, idx, flags); > + if (!stream_obj_to_pack_incore(state, ctx, &already_hashed_to, > + buf, size, type, path, flags)) > + break; > + truncate_checkpoint(state, checkpoint, idx); > + } > + > + finalize_checkpoint(state, ctx, checkpoint, idx, result_oid); > + > + return 0; > +} > + > +static int deflate_blob_to_pack_incore(struct bulk_checkin_packfile *state, > + struct object_id *result_oid, > + const void *buf, size_t size, > + const char *path, unsigned flags) > +{ > + git_hash_ctx ctx; > + struct hashfile_checkpoint checkpoint = {0}; > + > + format_object_header_hash(the_hash_algo, &ctx, &checkpoint, OBJ_BLOB, > + size); > + > + return deflate_obj_contents_to_pack_incore(state, &ctx, &checkpoint, > + result_oid, buf, size, > + OBJ_BLOB, path, flags); > +} > + > static int deflate_blob_to_pack(struct bulk_checkin_packfile *state, > struct object_id *result_oid, > int fd, size_t size, > @@ -396,6 +503,17 @@ int index_blob_bulk_checkin(struct object_id *oid, > return status; > } > > +int index_blob_bulk_checkin_incore(struct object_id *oid, > + const void *buf, size_t size, > + const char *path, unsigned flags) > +{ > + int status = deflate_blob_to_pack_incore(&bulk_checkin_packfile, oid, > + buf, size, path, flags); > + if (!odb_transaction_nesting) > + flush_bulk_checkin_packfile(&bulk_checkin_packfile); > + return status; > +} > + > void begin_odb_transaction(void) > { > odb_transaction_nesting += 1; > diff --git a/bulk-checkin.h b/bulk-checkin.h > index aa7286a7b3..1b91daeaee 100644 > --- a/bulk-checkin.h > +++ b/bulk-checkin.h > @@ -13,6 +13,10 @@ int index_blob_bulk_checkin(struct object_id *oid, > int fd, size_t size, > const char *path, unsigned flags); > > +int index_blob_bulk_checkin_incore(struct object_id *oid, > + const void *buf, size_t size, > + const char *path, unsigned flags); > + > /* > * Tell the object database to optimize for adding > * multiple objects. end_odb_transaction must be called
On Tue, Oct 17, 2023 at 07:18:04PM -0700, Junio C Hamano wrote: > Taylor Blau <me@ttaylorr.com> writes: > > > bulk-checkin.c | 118 +++++++++++++++++++++++++++++++++++++++++++++++++ > > bulk-checkin.h | 4 ++ > > 2 files changed, 122 insertions(+) > > Unlike the previous four, which were very clear refactoring to > create reusable helper functions, this step leaves a bad aftertaste > after reading twice, and I think what is disturbing is that many new > lines are pretty much literally copied from stream_blob_to_pack(). > > I wonder if we can introduce an "input" source abstraction, that > replaces "fd" and "size" (and "path" for error reporting) parameters > to the stream_blob_to_pack(), so that the bulk of the implementation > of stream_blob_to_pack() can call its .read() method to read bytes > up to "size" from such an abstracted interface? That would be a > good sized first half of this change. Then in the second half, you > can add another "input" source that works with in-core "buf" and > "size", whose .read() method will merely be a memcpy(). Thanks, I like this idea. I had initially avoided it in the first couple of rounds, because the abstraction felt clunky and involved an unnecessary extra memcpy(). But having applied your suggestion here, I think that the price is well worth the result, which is that `stream_blob_to_pack()` does not have to be implemented twice with very subtle differences. Thanks again for the suggestion, I'm really pleased with how it came out. Reroll coming shortly... Thanks, Taylor
diff --git a/bulk-checkin.c b/bulk-checkin.c index f4914fb6d1..25cd1ffa25 100644 --- a/bulk-checkin.c +++ b/bulk-checkin.c @@ -140,6 +140,69 @@ static int already_written(struct bulk_checkin_packfile *state, struct object_id return 0; } +static int stream_obj_to_pack_incore(struct bulk_checkin_packfile *state, + git_hash_ctx *ctx, + off_t *already_hashed_to, + const void *buf, size_t size, + enum object_type type, + const char *path, unsigned flags) +{ + git_zstream s; + unsigned char obuf[16384]; + unsigned hdrlen; + int status = Z_OK; + int write_object = (flags & HASH_WRITE_OBJECT); + + git_deflate_init(&s, pack_compression_level); + + hdrlen = encode_in_pack_object_header(obuf, sizeof(obuf), type, size); + s.next_out = obuf + hdrlen; + s.avail_out = sizeof(obuf) - hdrlen; + + if (*already_hashed_to < size) { + size_t hsize = size - *already_hashed_to; + if (hsize) { + the_hash_algo->update_fn(ctx, buf, hsize); + } + *already_hashed_to = size; + } + s.next_in = (void *)buf; + s.avail_in = size; + + while (status != Z_STREAM_END) { + status = git_deflate(&s, Z_FINISH); + if (!s.avail_out || status == Z_STREAM_END) { + if (write_object) { + size_t written = s.next_out - obuf; + + /* would we bust the size limit? */ + if (state->nr_written && + pack_size_limit_cfg && + pack_size_limit_cfg < state->offset + written) { + git_deflate_abort(&s); + return -1; + } + + hashwrite(state->f, obuf, written); + state->offset += written; + } + s.next_out = obuf; + s.avail_out = sizeof(obuf); + } + + switch (status) { + case Z_OK: + case Z_BUF_ERROR: + case Z_STREAM_END: + continue; + default: + die("unexpected deflate failure: %d", status); + } + } + git_deflate_end(&s); + return 0; +} + /* * Read the contents from fd for size bytes, streaming it to the * packfile in state while updating the hash in ctx. Signal a failure @@ -316,6 +379,50 @@ static void finalize_checkpoint(struct bulk_checkin_packfile *state, } } +static int deflate_obj_contents_to_pack_incore(struct bulk_checkin_packfile *state, + git_hash_ctx *ctx, + struct hashfile_checkpoint *checkpoint, + struct object_id *result_oid, + const void *buf, size_t size, + enum object_type type, + const char *path, unsigned flags) +{ + struct pack_idx_entry *idx = NULL; + off_t already_hashed_to = 0; + + /* Note: idx is non-NULL when we are writing */ + if (flags & HASH_WRITE_OBJECT) + CALLOC_ARRAY(idx, 1); + + while (1) { + prepare_checkpoint(state, checkpoint, idx, flags); + if (!stream_obj_to_pack_incore(state, ctx, &already_hashed_to, + buf, size, type, path, flags)) + break; + truncate_checkpoint(state, checkpoint, idx); + } + + finalize_checkpoint(state, ctx, checkpoint, idx, result_oid); + + return 0; +} + +static int deflate_blob_to_pack_incore(struct bulk_checkin_packfile *state, + struct object_id *result_oid, + const void *buf, size_t size, + const char *path, unsigned flags) +{ + git_hash_ctx ctx; + struct hashfile_checkpoint checkpoint = {0}; + + format_object_header_hash(the_hash_algo, &ctx, &checkpoint, OBJ_BLOB, + size); + + return deflate_obj_contents_to_pack_incore(state, &ctx, &checkpoint, + result_oid, buf, size, + OBJ_BLOB, path, flags); +} + static int deflate_blob_to_pack(struct bulk_checkin_packfile *state, struct object_id *result_oid, int fd, size_t size, @@ -396,6 +503,17 @@ int index_blob_bulk_checkin(struct object_id *oid, return status; } +int index_blob_bulk_checkin_incore(struct object_id *oid, + const void *buf, size_t size, + const char *path, unsigned flags) +{ + int status = deflate_blob_to_pack_incore(&bulk_checkin_packfile, oid, + buf, size, path, flags); + if (!odb_transaction_nesting) + flush_bulk_checkin_packfile(&bulk_checkin_packfile); + return status; +} + void begin_odb_transaction(void) { odb_transaction_nesting += 1; diff --git a/bulk-checkin.h b/bulk-checkin.h index aa7286a7b3..1b91daeaee 100644 --- a/bulk-checkin.h +++ b/bulk-checkin.h @@ -13,6 +13,10 @@ int index_blob_bulk_checkin(struct object_id *oid, int fd, size_t size, const char *path, unsigned flags); +int index_blob_bulk_checkin_incore(struct object_id *oid, + const void *buf, size_t size, + const char *path, unsigned flags); + /* * Tell the object database to optimize for adding * multiple objects. end_odb_transaction must be called
Now that we have factored out many of the common routines necessary to index a new object into a pack created by the bulk-checkin machinery, we can introduce a variant of `index_blob_bulk_checkin()` that acts on blobs whose contents we can fit in memory. This will be useful in a couple of more commits in order to provide the `merge-tree` builtin with a mechanism to create a new pack containing any objects it created during the merge, instead of storing those objects individually as loose. Similar to the existing `index_blob_bulk_checkin()` function, the entrypoint delegates to `deflate_blob_to_pack_incore()`, which is responsible for formatting the pack header and then deflating the contents into the pack. The latter is accomplished by calling deflate_blob_contents_to_pack_incore(), which takes advantage of the earlier refactoring and is responsible for writing the object to the pack and handling any overage from pack.packSizeLimit. The bulk of the new functionality is implemented in the function `stream_obj_to_pack_incore()`, which is a generic implementation for writing objects of arbitrary type (whose contents we can fit in-core) into a bulk-checkin pack. The new function shares an unfortunate degree of similarity to the existing `stream_blob_to_pack()` function. But DRY-ing up these two would likely be more trouble than it's worth, since the latter has to deal with reading and writing the contents of the object. Consistent with the rest of the bulk-checkin mechanism, there are no direct tests here. In future commits when we expose this new functionality via the `merge-tree` builtin, we will test it indirectly there. Signed-off-by: Taylor Blau <me@ttaylorr.com> --- bulk-checkin.c | 118 +++++++++++++++++++++++++++++++++++++++++++++++++ bulk-checkin.h | 4 ++ 2 files changed, 122 insertions(+)