diff mbox series

[v7,4/5] object-file.c: add "write_stream_object_file()" to support read in stream

Message ID 20211221115201.12120-5-chiyutianyi@gmail.com (mailing list archive)
State Superseded
Headers show
Series unpack large blobs in stream | expand

Commit Message

Han Xin Dec. 21, 2021, 11:52 a.m. UTC
From: Han Xin <hanxin.hx@alibaba-inc.com>

We used to call "get_data()" in "unpack_non_delta_entry()" to read the
entire contents of a blob object, no matter how big it is. This
implementation may consume all the memory and cause OOM.

This can be improved by feeding data to "write_stream_object_file()"
in a stream. The input stream is implemented as an interface.

The difference with "write_loose_object()" is that we have no chance
to run "write_object_file_prepare()" to calculate the oid in advance.
In "write_loose_object()", we know the oid and we can write the
temporary file in the same directory as the final object, but for an
object with an undetermined oid, we don't know the exact directory for
the object, so we have to save the temporary file in ".git/objects/"
directory instead.

"freshen_packed_object()" or "freshen_loose_object()" will be called
inside "write_stream_object_file()" after obtaining the "oid".

Helped-by: René Scharfe <l.s.r@web.de>
Helped-by: Ævar Arnfjörð Bjarmason <avarab@gmail.com>
Helped-by: Jiang Xin <zhiyou.jx@alibaba-inc.com>
Signed-off-by: Han Xin <hanxin.hx@alibaba-inc.com>
---
 object-file.c  | 85 ++++++++++++++++++++++++++++++++++++++++++++++++++
 object-store.h |  9 ++++++
 2 files changed, 94 insertions(+)

Comments

Ævar Arnfjörð Bjarmason Dec. 21, 2021, 2:20 p.m. UTC | #1
On Tue, Dec 21 2021, Han Xin wrote:

> From: Han Xin <hanxin.hx@alibaba-inc.com>
> [...]
> +int write_stream_object_file(struct input_stream *in_stream, size_t len,
> +			     enum object_type type, time_t mtime,
> +			     unsigned flags, struct object_id *oid)
> +{
> +	int fd, ret, flush = 0;
> +	unsigned char compressed[4096];
> +	git_zstream stream;
> +	git_hash_ctx c;
> +	struct object_id parano_oid;
> +	static struct strbuf tmp_file = STRBUF_INIT;
> +	static struct strbuf filename = STRBUF_INIT;
> +	int dirlen;
> +	char hdr[MAX_HEADER_LEN];
> +	int hdrlen = sizeof(hdr);
> +
> +	/* Since "filename" is defined as static, it will be reused. So reset it
> +	 * first before using it. */
> +	strbuf_reset(&filename);
> +	/* When oid is not determined, save tmp file to odb path. */
> +	strbuf_addf(&filename, "%s/", get_object_directory());

I realize this is somewhat following the pattern of code you moved
around earlier, but FWIW I think these sorts of comments are really
over-doing it. I.e. we try not to comment on things that are obvious
from the code itself.

Also René's comment on v6 still applies here:

    Given that this function is only used for huge objects I think making
    the strbufs non-static and releasing them is the best choice here.

I thin just making them non-static and doing a strbuf_release() as he
suggested is best here.

> +
> +	fd = create_tmpfile(&tmp_file, filename.buf, flags);
> +	if (fd < 0)
> +		return -1;
> +
> +	hdrlen = format_object_header(hdr, hdrlen, type, len);
> +
> +	/* Set it up and write header */
> +	setup_stream_and_header(&stream, compressed, sizeof(compressed),
> +				&c, hdr, hdrlen);
> +
> +	/* Then the data itself.. */
> +	do {
> +		unsigned char *in0 = stream.next_in;
> +		if (!stream.avail_in) {
> +			const void *in = in_stream->read(in_stream, &stream.avail_in);
> +			stream.next_in = (void *)in;
> +			in0 = (unsigned char *)in;
> +			/* All data has been read. */
> +			if (len + hdrlen == stream.total_in + stream.avail_in)
> +				flush = Z_FINISH;
> +		}
> +		ret = git_deflate(&stream, flush);
> +		the_hash_algo->update_fn(&c, in0, stream.next_in - in0);
> +		if (write_buffer(fd, compressed, stream.next_out - compressed) < 0)
> +			die(_("unable to write loose object file"));
> +		stream.next_out = compressed;
> +		stream.avail_out = sizeof(compressed);
> +	} while (ret == Z_OK || ret == Z_BUF_ERROR);
> +
> +	if (ret != Z_STREAM_END)
> +		die(_("unable to deflate new object streamingly (%d)"), ret);
> +	ret = git_deflate_end_gently(&stream);
> +	if (ret != Z_OK)
> +		die(_("deflateEnd on object streamingly failed (%d)"), ret);

nit: let's say "unable to stream deflate new object" or something, and
not use the confusing (invented?) word "streamingly".

> +	the_hash_algo->final_oid_fn(&parano_oid, &c);
> +
> +	close_loose_object(fd);
> +
> +	oidcpy(oid, &parano_oid);

I see there's still quite a bit of duplication between this and
write_loose_object(), but maybe it's not easy to factor out.

> +	if (freshen_packed_object(oid) || freshen_loose_object(oid)) {
> +		unlink_or_warn(tmp_file.buf);
> +		return 0;
> +	}
> +
> +	loose_object_path(the_repository, &filename, oid);
> +
> +	/* We finally know the object path, and create the missing dir. */
> +	dirlen = directory_size(filename.buf);
> +	if (dirlen) {
> +		struct strbuf dir = STRBUF_INIT;
> +		strbuf_add(&dir, filename.buf, dirlen - 1);

Just a minor nit, but I noticed we could have this on top, i.e. this
"remove the slash" is now what 1/3 users of it wan:
	
	 object-file.c | 10 +++++-----
	 1 file changed, 5 insertions(+), 5 deletions(-)
	
	diff --git a/object-file.c b/object-file.c
	index 77a3217fd0e..b0dea96906e 100644
	--- a/object-file.c
	+++ b/object-file.c
	@@ -1878,13 +1878,13 @@ static void close_loose_object(int fd)
	 		die_errno(_("error when closing loose object file"));
	 }
	 
	-/* Size of directory component, including the ending '/' */
	+/* Size of directory component, excluding the ending '/' */
	 static inline int directory_size(const char *filename)
	 {
	 	const char *s = strrchr(filename, '/');
	 	if (!s)
	 		return 0;
	-	return s - filename + 1;
	+	return s - filename;
	 }
	 
	 /*
	@@ -1901,7 +1901,7 @@ static int create_tmpfile(struct strbuf *tmp, const char *filename,
	 
	 	strbuf_reset(tmp);
	 	strbuf_add(tmp, filename, dirlen);
	-	strbuf_addstr(tmp, "tmp_obj_XXXXXX");
	+	strbuf_addstr(tmp, "/tmp_obj_XXXXXX");
	 	fd = git_mkstemp_mode(tmp->buf, 0444);
	 	do {
	 		if (fd >= 0 || !dirlen || errno != ENOENT)
	@@ -1913,7 +1913,7 @@ static int create_tmpfile(struct strbuf *tmp, const char *filename,
	 		 * scratch.
	 		 */
	 		strbuf_reset(tmp);
	-		strbuf_add(tmp, filename, dirlen - 1);
	+		strbuf_add(tmp, filename, dirlen);
	 		if (mkdir(tmp->buf, 0777) && errno != EEXIST)
	 			break;
	 		if (adjust_shared_perm(tmp->buf))
	@@ -2100,7 +2100,7 @@ int write_stream_object_file(struct input_stream *in_stream, size_t len,
	 	dirlen = directory_size(filename.buf);
	 	if (dirlen) {
	 		struct strbuf dir = STRBUF_INIT;
	-		strbuf_add(&dir, filename.buf, dirlen - 1);
	+		strbuf_add(&dir, filename.buf, dirlen);
	 
	 		if (mkdir_in_gitdir(dir.buf) && errno != EEXIST) {
	 			ret = error_errno(_("unable to create directory %s"), dir.buf);

On my platform (linux) it's not needed either way, a "mkdir foo" works
as well as "mkdir foo/", but maybe some oS's have trouble with it.
Ævar Arnfjörð Bjarmason Dec. 21, 2021, 3:05 p.m. UTC | #2
On Tue, Dec 21 2021, Ævar Arnfjörð Bjarmason wrote:

> On Tue, Dec 21 2021, Han Xin wrote:

>> +	/* Then the data itself.. */
>> +	do {
>> +		unsigned char *in0 = stream.next_in;
>> +		if (!stream.avail_in) {
>> +			const void *in = in_stream->read(in_stream, &stream.avail_in);
>> +			stream.next_in = (void *)in;
>> +			in0 = (unsigned char *)in;
>> +			/* All data has been read. */
>> +			if (len + hdrlen == stream.total_in + stream.avail_in)
>> +				flush = Z_FINISH;
>> +		}
>> +		ret = git_deflate(&stream, flush);
>> +		the_hash_algo->update_fn(&c, in0, stream.next_in - in0);
>> +		if (write_buffer(fd, compressed, stream.next_out - compressed) < 0)
>> +			die(_("unable to write loose object file"));
>> +		stream.next_out = compressed;
>> +		stream.avail_out = sizeof(compressed);
>> +	} while (ret == Z_OK || ret == Z_BUF_ERROR);
>> +
>> +	if (ret != Z_STREAM_END)
>> +		die(_("unable to deflate new object streamingly (%d)"), ret);
>> +	ret = git_deflate_end_gently(&stream);
>> +	if (ret != Z_OK)
>> +		die(_("deflateEnd on object streamingly failed (%d)"), ret);
>
> nit: let's say "unable to stream deflate new object" or something, and
> not use the confusing (invented?) word "streamingly".
>
>> +	the_hash_algo->final_oid_fn(&parano_oid, &c);
>> +
>> +	close_loose_object(fd);
>> +
>> +	oidcpy(oid, &parano_oid);
>
> I see there's still quite a bit of duplication between this and
> write_loose_object(), but maybe it's not easy to factor out.

For what it's worth I tried to do that and the result doesn't really
seem worth it. I.e. something like the below. The inner loop of the
do/while looks like it could get a similar treatment, but likewise
doesn't seem worth the effort.

diff --git a/object-file.c b/object-file.c
index b0dea96906e..7fc2363cfa1 100644
--- a/object-file.c
+++ b/object-file.c
@@ -1957,6 +1957,46 @@ static void setup_stream_and_header(git_zstream *stream,
 	the_hash_algo->update_fn(c, hdr, hdrlen);
 }
 
+static int start_loose_object_common(struct strbuf *tmp_file,
+				     const char *filename, unsigned flags,
+				     git_zstream *stream,
+				     unsigned char *buf, size_t buflen,
+				     git_hash_ctx *c,
+				     enum object_type type, size_t len,
+				     char *hdr, int *hdrlen)
+{
+	int fd;
+
+	fd = create_tmpfile(tmp_file, filename, flags);
+	if (fd < 0)
+		return -1;
+
+	if (type != OBJ_NONE)
+		*hdrlen = format_object_header(hdr, *hdrlen, type, len);
+
+	/* Set it up and write header */
+	setup_stream_and_header(stream, buf, buflen, c, hdr, *hdrlen);
+
+	return fd;
+
+}
+
+static void end_loose_object_common(int ret, git_hash_ctx *c,
+				    git_zstream *stream,
+				    struct object_id *parano_oid,
+				    const struct object_id *expected_oid,
+				    const char *zstream_end_fmt,
+				    const char *z_ok_fmt)
+{
+	if (ret != Z_STREAM_END)
+		die(_(zstream_end_fmt), ret, expected_oid);
+	ret = git_deflate_end_gently(stream);
+	if (ret != Z_OK)
+		die(_(z_ok_fmt), ret, expected_oid);
+	the_hash_algo->final_oid_fn(parano_oid, c);
+}
+
+
 static int write_loose_object(const struct object_id *oid, char *hdr,
 			      int hdrlen, const void *buf, unsigned long len,
 			      time_t mtime, unsigned flags)
@@ -1970,15 +2010,12 @@ static int write_loose_object(const struct object_id *oid, char *hdr,
 	static struct strbuf filename = STRBUF_INIT;
 
 	loose_object_path(the_repository, &filename, oid);
-
-	fd = create_tmpfile(&tmp_file, filename.buf, flags);
+	fd = start_loose_object_common(&tmp_file, filename.buf, flags,
+				       &stream, compressed, sizeof(compressed),
+				       &c, OBJ_NONE, 0, hdr, &hdrlen);
 	if (fd < 0)
 		return -1;
 
-	/* Set it up and write header */
-	setup_stream_and_header(&stream, compressed, sizeof(compressed),
-				&c, hdr, hdrlen);
-
 	/* Then the data itself.. */
 	stream.next_in = (void *)buf;
 	stream.avail_in = len;
@@ -1992,14 +2029,9 @@ static int write_loose_object(const struct object_id *oid, char *hdr,
 		stream.avail_out = sizeof(compressed);
 	} while (ret == Z_OK);
 
-	if (ret != Z_STREAM_END)
-		die(_("unable to deflate new object %s (%d)"), oid_to_hex(oid),
-		    ret);
-	ret = git_deflate_end_gently(&stream);
-	if (ret != Z_OK)
-		die(_("deflateEnd on object %s failed (%d)"), oid_to_hex(oid),
-		    ret);
-	the_hash_algo->final_oid_fn(&parano_oid, &c);
+	end_loose_object_common(ret, &c, &stream, &parano_oid, oid,
+				N_("unable to deflate new object %s (%d)"),
+				N_("deflateEnd on object %s failed (%d)"));
 	if (!oideq(oid, &parano_oid))
 		die(_("confused by unstable object source data for %s"),
 		    oid_to_hex(oid));
@@ -2049,16 +2081,12 @@ int write_stream_object_file(struct input_stream *in_stream, size_t len,
 	/* When oid is not determined, save tmp file to odb path. */
 	strbuf_addf(&filename, "%s/", get_object_directory());
 
-	fd = create_tmpfile(&tmp_file, filename.buf, flags);
+	fd = start_loose_object_common(&tmp_file, filename.buf, flags,
+				       &stream, compressed, sizeof(compressed),
+				       &c, type, len, hdr, &hdrlen);
 	if (fd < 0)
 		return -1;
 
-	hdrlen = format_object_header(hdr, hdrlen, type, len);
-
-	/* Set it up and write header */
-	setup_stream_and_header(&stream, compressed, sizeof(compressed),
-				&c, hdr, hdrlen);
-
 	/* Then the data itself.. */
 	do {
 		unsigned char *in0 = stream.next_in;
@@ -2078,12 +2106,9 @@ int write_stream_object_file(struct input_stream *in_stream, size_t len,
 		stream.avail_out = sizeof(compressed);
 	} while (ret == Z_OK || ret == Z_BUF_ERROR);
 
-	if (ret != Z_STREAM_END)
-		die(_("unable to deflate new object streamingly (%d)"), ret);
-	ret = git_deflate_end_gently(&stream);
-	if (ret != Z_OK)
-		die(_("deflateEnd on object streamingly failed (%d)"), ret);
-	the_hash_algo->final_oid_fn(&parano_oid, &c);
+	end_loose_object_common(ret, &c, &stream, &parano_oid, NULL,
+				N_("unable to deflate new object streamingly (%d)"),
+				N_("deflateEnd on object streamingly failed (%d)"));
 
 	close_loose_object(fd);
diff mbox series

Patch

diff --git a/object-file.c b/object-file.c
index e048f3d39e..d0573e2a61 100644
--- a/object-file.c
+++ b/object-file.c
@@ -1989,6 +1989,91 @@  static int freshen_packed_object(const struct object_id *oid)
 	return 1;
 }
 
+int write_stream_object_file(struct input_stream *in_stream, size_t len,
+			     enum object_type type, time_t mtime,
+			     unsigned flags, struct object_id *oid)
+{
+	int fd, ret, flush = 0;
+	unsigned char compressed[4096];
+	git_zstream stream;
+	git_hash_ctx c;
+	struct object_id parano_oid;
+	static struct strbuf tmp_file = STRBUF_INIT;
+	static struct strbuf filename = STRBUF_INIT;
+	int dirlen;
+	char hdr[MAX_HEADER_LEN];
+	int hdrlen = sizeof(hdr);
+
+	/* Since "filename" is defined as static, it will be reused. So reset it
+	 * first before using it. */
+	strbuf_reset(&filename);
+	/* When oid is not determined, save tmp file to odb path. */
+	strbuf_addf(&filename, "%s/", get_object_directory());
+
+	fd = create_tmpfile(&tmp_file, filename.buf, flags);
+	if (fd < 0)
+		return -1;
+
+	hdrlen = format_object_header(hdr, hdrlen, type, len);
+
+	/* Set it up and write header */
+	setup_stream_and_header(&stream, compressed, sizeof(compressed),
+				&c, hdr, hdrlen);
+
+	/* Then the data itself.. */
+	do {
+		unsigned char *in0 = stream.next_in;
+		if (!stream.avail_in) {
+			const void *in = in_stream->read(in_stream, &stream.avail_in);
+			stream.next_in = (void *)in;
+			in0 = (unsigned char *)in;
+			/* All data has been read. */
+			if (len + hdrlen == stream.total_in + stream.avail_in)
+				flush = Z_FINISH;
+		}
+		ret = git_deflate(&stream, flush);
+		the_hash_algo->update_fn(&c, in0, stream.next_in - in0);
+		if (write_buffer(fd, compressed, stream.next_out - compressed) < 0)
+			die(_("unable to write loose object file"));
+		stream.next_out = compressed;
+		stream.avail_out = sizeof(compressed);
+	} while (ret == Z_OK || ret == Z_BUF_ERROR);
+
+	if (ret != Z_STREAM_END)
+		die(_("unable to deflate new object streamingly (%d)"), ret);
+	ret = git_deflate_end_gently(&stream);
+	if (ret != Z_OK)
+		die(_("deflateEnd on object streamingly failed (%d)"), ret);
+	the_hash_algo->final_oid_fn(&parano_oid, &c);
+
+	close_loose_object(fd);
+
+	oidcpy(oid, &parano_oid);
+
+	if (freshen_packed_object(oid) || freshen_loose_object(oid)) {
+		unlink_or_warn(tmp_file.buf);
+		return 0;
+	}
+
+	loose_object_path(the_repository, &filename, oid);
+
+	/* We finally know the object path, and create the missing dir. */
+	dirlen = directory_size(filename.buf);
+	if (dirlen) {
+		struct strbuf dir = STRBUF_INIT;
+		strbuf_add(&dir, filename.buf, dirlen - 1);
+
+		if (mkdir_in_gitdir(dir.buf) && errno != EEXIST) {
+			ret = error_errno(_("unable to create directory %s"), dir.buf);
+			strbuf_release(&dir);
+			return ret;
+		}
+		strbuf_release(&dir);
+	}
+
+	return finalize_object_file_with_mtime(tmp_file.buf, filename.buf, mtime, flags);
+}
+
 int write_object_file_flags(const void *buf, unsigned long len,
 			    const char *type, struct object_id *oid,
 			    unsigned flags)
diff --git a/object-store.h b/object-store.h
index 952efb6a4b..061b0cb2ba 100644
--- a/object-store.h
+++ b/object-store.h
@@ -34,6 +34,11 @@  struct object_directory {
 	char *path;
 };
 
+struct input_stream {
+	const void *(*read)(struct input_stream *, unsigned long *len);
+	void *data;
+};
+
 KHASH_INIT(odb_path_map, const char * /* key: odb_path */,
 	struct object_directory *, 1, fspathhash, fspatheq)
 
@@ -232,6 +237,10 @@  static inline int write_object_file(const void *buf, unsigned long len,
 	return write_object_file_flags(buf, len, type, oid, 0);
 }
 
+int write_stream_object_file(struct input_stream *in_stream, size_t len,
+			     enum object_type type, time_t mtime,
+			     unsigned flags, struct object_id *oid);
+
 int hash_object_file_literally(const void *buf, unsigned long len,
 			       const char *type, struct object_id *oid,
 			       unsigned flags);