Message ID | 06689de6a56f046d5e41525fa12c7af92db478e5.1630515568.git.osandov@fb.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | None | expand |
On 1.09.21 г. 20:01, Omar Sandoval wrote: > From: Boris Burkov <boris@bur.io> > <snip> > @@ -79,6 +83,12 @@ struct btrfs_receive > struct subvol_uuid_search sus; > > int honor_end_cmd; > + > + int force_decompress; Make it bool > + > + /* Reuse stream objects for encoded_write decompression fallback */ > + ZSTD_DStream *zstd_dstream; > + z_stream *zlib_stream; > }; > > static int finish_subvol(struct btrfs_receive *rctx) > @@ -989,9 +999,222 @@ static int process_update_extent(const char *path, u64 offset, u64 len, > return 0; > } > <snip> > + > +static int decompress_lzo(const char *encoded_data, u64 encoded_len, > + char *unencoded_data, u64 unencoded_len, > + unsigned int page_size) > +{ > + uint32_t total_len; > + size_t in_pos, out_pos; > + > + if (encoded_len < 4) { > + error("lzo header is truncated"); > + return -EIO; > + } > + memcpy(&total_len, encoded_data, 4); > + total_len = le32toh(total_len); > + if (total_len > encoded_len) { > + error("lzo header is invalid"); > + return -EIO; > + } > + > + in_pos = 4; > + out_pos = 0; > + while (in_pos < total_len && out_pos < unencoded_len) { > + size_t page_remaining; > + uint32_t src_len; > + lzo_uint dst_len; > + int ret; > + > + page_remaining = -in_pos % page_size; Why the -in_pos? > + if (page_remaining < 4) { > + if (total_len - in_pos <= page_remaining) > + break; > + in_pos += page_remaining; > + } > + > + if (total_len - in_pos < 4) { > + error("lzo segment header is truncated"); > + return -EIO; > + } > + > + memcpy(&src_len, encoded_data + in_pos, 4); > + src_len = le32toh(src_len); > + in_pos += 4; > + if (src_len > total_len - in_pos) { > + error("lzo segment header is invalid"); > + return -EIO; > + } > + > + dst_len = page_size; > + ret = lzo1x_decompress_safe((void *)(encoded_data + in_pos), > + src_len, > + (void *)(unencoded_data + out_pos), > + &dst_len, NULL); > + if (ret != LZO_E_OK) { > + error("lzo1x_decompress_safe failed: %d", ret); > + return -EIO; > + } > + > + in_pos += src_len; > + out_pos += dst_len; > + } > + return 0; > +} > + > +static int decompress_and_write(struct btrfs_receive *rctx, > + const char *encoded_data, u64 offset, > + u64 encoded_len, u64 unencoded_file_len, > + u64 unencoded_len, u64 unencoded_offset, > + u32 compression) > +{ > + int ret = 0; > + size_t pos; > + ssize_t w; > + char *unencoded_data; > + int page_shift; > + > + unencoded_data = calloc(unencoded_len, 1); > + if (!unencoded_data) { > + error("allocating space for unencoded data failed: %m"); > + return -errno; > + } > + > + switch (compression) { > + case BTRFS_ENCODED_IO_COMPRESSION_ZLIB: > + ret = decompress_zlib(rctx, encoded_data, encoded_len, > + unencoded_data, unencoded_len); > + if (ret) > + goto out; > + break; > + case BTRFS_ENCODED_IO_COMPRESSION_ZSTD: > + ret = decompress_zstd(rctx, encoded_data, encoded_len, > + unencoded_data, unencoded_len); > + if (ret) > + goto out; > + break; > + case BTRFS_ENCODED_IO_COMPRESSION_LZO_4K: > + case BTRFS_ENCODED_IO_COMPRESSION_LZO_8K: > + case BTRFS_ENCODED_IO_COMPRESSION_LZO_16K: > + case BTRFS_ENCODED_IO_COMPRESSION_LZO_32K: > + case BTRFS_ENCODED_IO_COMPRESSION_LZO_64K: > + page_shift = compression - BTRFS_ENCODED_IO_COMPRESSION_LZO_4K + 12; Doesn't this calculation assume page size is 4k, what about arches with larger page size (ppc/aarch64), shouldn't that '12' be adjusted? > + ret = decompress_lzo(encoded_data, encoded_len, unencoded_data, > + unencoded_len, 1U << page_shift); > + if (ret) > + goto out; > + break; > + default: > + error("unknown compression: %d", compression); > + ret = -EOPNOTSUPP; > + goto out; > + } > + > + pos = unencoded_offset; > + while (pos < unencoded_file_len) { > + w = pwrite(rctx->write_fd, unencoded_data + pos, > + unencoded_file_len - pos, offset); > + if (w < 0) { > + ret = -errno; > + error("writing unencoded data failed: %m"); > + goto out; > + } > + pos += w; > + offset += w; > + } > +out: > + free(unencoded_data); > + return ret; > +} > + > static int process_encoded_write(const char *path, const void *data, u64 offset, > - u64 len, u64 unencoded_file_len, u64 unencoded_len, > - u64 unencoded_offset, u32 compression, u32 encryption, void *user) > + u64 len, u64 unencoded_file_len, > + u64 unencoded_len, u64 unencoded_offset, > + u32 compression, u32 encryption, void *user) That's irrelevant change for this patch, it should be squashed in the previous patch which introduces process_encoded_write. > { > int ret; > struct btrfs_receive *rctx = user; > @@ -1007,6 +1230,7 @@ static int process_encoded_write(const char *path, const void *data, u64 offset, > .compression = compression, > .encryption = encryption, > }; > + bool encoded_write = !rctx->force_decompress; No point in the local variable simply use !rctx->force_decompress in the condition below. > > if (encryption) { > error("encoded_write: encryption not supported"); > @@ -1023,13 +1247,21 @@ static int process_encoded_write(const char *path, const void *data, u64 offset, > if (ret < 0) > return ret; > > - ret = ioctl(rctx->write_fd, BTRFS_IOC_ENCODED_WRITE, &encoded); > - if (ret < 0) { > - ret = -errno; > - error("encoded_write: writing to %s failed: %m", path); > - return ret; > + if (encoded_write) { > + ret = ioctl(rctx->write_fd, BTRFS_IOC_ENCODED_WRITE, &encoded); > + if (ret >= 0) > + return 0; > + /* Fall back for these errors, fail hard for anything else. */ > + if (errno != ENOSPC && errno != ENOTTY && errno != EINVAL) { > + ret = -errno; > + error("encoded_write: writing to %s failed: %m", path); > + return ret; > + } > } > - return 0; > + > + return decompress_and_write(rctx, data, offset, len, unencoded_file_len, > + unencoded_len, unencoded_offset, > + compression); > } > > static struct btrfs_send_ops send_ops = { <snip>
On Thu, Oct 21, 2021 at 04:55:19PM +0300, Nikolay Borisov wrote: > > > On 1.09.21 г. 20:01, Omar Sandoval wrote: > > From: Boris Burkov <boris@bur.io> > > > > +static int decompress_lzo(const char *encoded_data, u64 encoded_len, > > + char *unencoded_data, u64 unencoded_len, > > + unsigned int page_size) > > +{ > > + uint32_t total_len; > > + size_t in_pos, out_pos; > > + > > + if (encoded_len < 4) { > > + error("lzo header is truncated"); > > + return -EIO; > > + } > > + memcpy(&total_len, encoded_data, 4); > > + total_len = le32toh(total_len); > > + if (total_len > encoded_len) { > > + error("lzo header is invalid"); > > + return -EIO; > > + } > > + > > + in_pos = 4; > > + out_pos = 0; > > + while (in_pos < total_len && out_pos < unencoded_len) { > > + size_t page_remaining; > > + uint32_t src_len; > > + lzo_uint dst_len; > > + int ret; > > + > > + page_remaining = -in_pos % page_size; > > Why the -in_pos? in_pos is our position in the encoded data. This calculates how many bytes are remaining in the page that in_pos current points to. > > + if (page_remaining < 4) { > > + if (total_len - in_pos <= page_remaining) > > + break; > > + in_pos += page_remaining; > > + } > > + > > + if (total_len - in_pos < 4) { > > + error("lzo segment header is truncated"); > > + return -EIO; > > + } > > + > > + memcpy(&src_len, encoded_data + in_pos, 4); > > + src_len = le32toh(src_len); > > + in_pos += 4; > > + if (src_len > total_len - in_pos) { > > + error("lzo segment header is invalid"); > > + return -EIO; > > + } > > + > > + dst_len = page_size; > > + ret = lzo1x_decompress_safe((void *)(encoded_data + in_pos), > > + src_len, > > + (void *)(unencoded_data + out_pos), > > + &dst_len, NULL); > > + if (ret != LZO_E_OK) { > > + error("lzo1x_decompress_safe failed: %d", ret); > > + return -EIO; > > + } > > + > > + in_pos += src_len; > > + out_pos += dst_len; > > + } > > + return 0; > > +} > > + > > +static int decompress_and_write(struct btrfs_receive *rctx, > > + const char *encoded_data, u64 offset, > > + u64 encoded_len, u64 unencoded_file_len, > > + u64 unencoded_len, u64 unencoded_offset, > > + u32 compression) > > +{ > > + int ret = 0; > > + size_t pos; > > + ssize_t w; > > + char *unencoded_data; > > + int page_shift; > > + > > + unencoded_data = calloc(unencoded_len, 1); > > + if (!unencoded_data) { > > + error("allocating space for unencoded data failed: %m"); > > + return -errno; > > + } > > + > > + switch (compression) { > > + case BTRFS_ENCODED_IO_COMPRESSION_ZLIB: > > + ret = decompress_zlib(rctx, encoded_data, encoded_len, > > + unencoded_data, unencoded_len); > > + if (ret) > > + goto out; > > + break; > > + case BTRFS_ENCODED_IO_COMPRESSION_ZSTD: > > + ret = decompress_zstd(rctx, encoded_data, encoded_len, > > + unencoded_data, unencoded_len); > > + if (ret) > > + goto out; > > + break; > > + case BTRFS_ENCODED_IO_COMPRESSION_LZO_4K: > > + case BTRFS_ENCODED_IO_COMPRESSION_LZO_8K: > > + case BTRFS_ENCODED_IO_COMPRESSION_LZO_16K: > > + case BTRFS_ENCODED_IO_COMPRESSION_LZO_32K: > > + case BTRFS_ENCODED_IO_COMPRESSION_LZO_64K: > > + page_shift = compression - BTRFS_ENCODED_IO_COMPRESSION_LZO_4K + 12; > > Doesn't this calculation assume page size is 4k, what about arches with > larger page size (ppc/aarch64), shouldn't that '12' be adjusted? This is unrelated to the machine page size. It is translating the BTRFS_ENCODED_IO_COMPRESSION_LZO_* value to the page size used for compressing the data: compression | - LZO_4K | + 12 | 1 << ===================================== LZO_4K = 3 | 0 | 12 | 4096 LZO_8K = 4 | 1 | 13 | 8192 LZO_16K = 5 | 2 | 14 | 16384 LZO_32K = 6 | 3 | 15 | 32768 LZO_64K = 7 | 4 | 16 | 65536 I'll fix the other comments, thanks.
diff --git a/Documentation/btrfs-receive.asciidoc b/Documentation/btrfs-receive.asciidoc index e4c4d2c0..354a71dc 100644 --- a/Documentation/btrfs-receive.asciidoc +++ b/Documentation/btrfs-receive.asciidoc @@ -60,6 +60,10 @@ By default the mountpoint is searched in '/proc/self/mounts'. If '/proc' is not accessible, eg. in a chroot environment, use this option to tell us where this filesystem is mounted. +--force-decompress:: +if the stream contains compressed data (see '--compressed-data' in +`btrfs-send`(8)), always decompress it instead of writing it with encoded I/O. + --dump:: dump the stream metadata, one line per operation + diff --git a/cmds/receive.c b/cmds/receive.c index b43c298f..2eebcfd1 100644 --- a/cmds/receive.c +++ b/cmds/receive.c @@ -40,6 +40,10 @@ #include <sys/xattr.h> #include <uuid/uuid.h> +#include <lzo/lzo1x.h> +#include <zlib.h> +#include <zstd.h> + #include "kernel-shared/ctree.h" #include "ioctl.h" #include "cmds/commands.h" @@ -79,6 +83,12 @@ struct btrfs_receive struct subvol_uuid_search sus; int honor_end_cmd; + + int force_decompress; + + /* Reuse stream objects for encoded_write decompression fallback */ + ZSTD_DStream *zstd_dstream; + z_stream *zlib_stream; }; static int finish_subvol(struct btrfs_receive *rctx) @@ -989,9 +999,222 @@ static int process_update_extent(const char *path, u64 offset, u64 len, return 0; } +static int decompress_zlib(struct btrfs_receive *rctx, const char *encoded_data, + u64 encoded_len, char *unencoded_data, + u64 unencoded_len) +{ + bool init = false; + int ret; + + if (!rctx->zlib_stream) { + init = true; + rctx->zlib_stream = malloc(sizeof(z_stream)); + if (!rctx->zlib_stream) { + error("failed to allocate zlib stream %m"); + return -ENOMEM; + } + } + rctx->zlib_stream->next_in = (void *)encoded_data; + rctx->zlib_stream->avail_in = encoded_len; + rctx->zlib_stream->next_out = (void *)unencoded_data; + rctx->zlib_stream->avail_out = unencoded_len; + + if (init) { + rctx->zlib_stream->zalloc = Z_NULL; + rctx->zlib_stream->zfree = Z_NULL; + rctx->zlib_stream->opaque = Z_NULL; + ret = inflateInit(rctx->zlib_stream); + } else { + ret = inflateReset(rctx->zlib_stream); + } + if (ret != Z_OK) { + error("zlib inflate init failed: %d", ret); + return -EIO; + } + + while (rctx->zlib_stream->avail_in > 0 && + rctx->zlib_stream->avail_out > 0) { + ret = inflate(rctx->zlib_stream, Z_FINISH); + if (ret == Z_STREAM_END) { + break; + } else if (ret != Z_OK) { + error("zlib inflate failed: %d", ret); + return -EIO; + } + } + return 0; +} + +static int decompress_zstd(struct btrfs_receive *rctx, const char *encoded_buf, + u64 encoded_len, char *unencoded_buf, + u64 unencoded_len) +{ + ZSTD_inBuffer in_buf = { + .src = encoded_buf, + .size = encoded_len + }; + ZSTD_outBuffer out_buf = { + .dst = unencoded_buf, + .size = unencoded_len + }; + size_t ret; + + if (!rctx->zstd_dstream) { + rctx->zstd_dstream = ZSTD_createDStream(); + if (!rctx->zstd_dstream) { + error("failed to create zstd dstream"); + return -ENOMEM; + } + } + ret = ZSTD_initDStream(rctx->zstd_dstream); + if (ZSTD_isError(ret)) { + error("failed to init zstd stream: %s", ZSTD_getErrorName(ret)); + return -EIO; + } + while (in_buf.pos < in_buf.size && out_buf.pos < out_buf.size) { + ret = ZSTD_decompressStream(rctx->zstd_dstream, &out_buf, &in_buf); + if (ret == 0) { + break; + } else if (ZSTD_isError(ret)) { + error("failed to decompress zstd stream: %s", + ZSTD_getErrorName(ret)); + return -EIO; + } + } + return 0; +} + +static int decompress_lzo(const char *encoded_data, u64 encoded_len, + char *unencoded_data, u64 unencoded_len, + unsigned int page_size) +{ + uint32_t total_len; + size_t in_pos, out_pos; + + if (encoded_len < 4) { + error("lzo header is truncated"); + return -EIO; + } + memcpy(&total_len, encoded_data, 4); + total_len = le32toh(total_len); + if (total_len > encoded_len) { + error("lzo header is invalid"); + return -EIO; + } + + in_pos = 4; + out_pos = 0; + while (in_pos < total_len && out_pos < unencoded_len) { + size_t page_remaining; + uint32_t src_len; + lzo_uint dst_len; + int ret; + + page_remaining = -in_pos % page_size; + if (page_remaining < 4) { + if (total_len - in_pos <= page_remaining) + break; + in_pos += page_remaining; + } + + if (total_len - in_pos < 4) { + error("lzo segment header is truncated"); + return -EIO; + } + + memcpy(&src_len, encoded_data + in_pos, 4); + src_len = le32toh(src_len); + in_pos += 4; + if (src_len > total_len - in_pos) { + error("lzo segment header is invalid"); + return -EIO; + } + + dst_len = page_size; + ret = lzo1x_decompress_safe((void *)(encoded_data + in_pos), + src_len, + (void *)(unencoded_data + out_pos), + &dst_len, NULL); + if (ret != LZO_E_OK) { + error("lzo1x_decompress_safe failed: %d", ret); + return -EIO; + } + + in_pos += src_len; + out_pos += dst_len; + } + return 0; +} + +static int decompress_and_write(struct btrfs_receive *rctx, + const char *encoded_data, u64 offset, + u64 encoded_len, u64 unencoded_file_len, + u64 unencoded_len, u64 unencoded_offset, + u32 compression) +{ + int ret = 0; + size_t pos; + ssize_t w; + char *unencoded_data; + int page_shift; + + unencoded_data = calloc(unencoded_len, 1); + if (!unencoded_data) { + error("allocating space for unencoded data failed: %m"); + return -errno; + } + + switch (compression) { + case BTRFS_ENCODED_IO_COMPRESSION_ZLIB: + ret = decompress_zlib(rctx, encoded_data, encoded_len, + unencoded_data, unencoded_len); + if (ret) + goto out; + break; + case BTRFS_ENCODED_IO_COMPRESSION_ZSTD: + ret = decompress_zstd(rctx, encoded_data, encoded_len, + unencoded_data, unencoded_len); + if (ret) + goto out; + break; + case BTRFS_ENCODED_IO_COMPRESSION_LZO_4K: + case BTRFS_ENCODED_IO_COMPRESSION_LZO_8K: + case BTRFS_ENCODED_IO_COMPRESSION_LZO_16K: + case BTRFS_ENCODED_IO_COMPRESSION_LZO_32K: + case BTRFS_ENCODED_IO_COMPRESSION_LZO_64K: + page_shift = compression - BTRFS_ENCODED_IO_COMPRESSION_LZO_4K + 12; + ret = decompress_lzo(encoded_data, encoded_len, unencoded_data, + unencoded_len, 1U << page_shift); + if (ret) + goto out; + break; + default: + error("unknown compression: %d", compression); + ret = -EOPNOTSUPP; + goto out; + } + + pos = unencoded_offset; + while (pos < unencoded_file_len) { + w = pwrite(rctx->write_fd, unencoded_data + pos, + unencoded_file_len - pos, offset); + if (w < 0) { + ret = -errno; + error("writing unencoded data failed: %m"); + goto out; + } + pos += w; + offset += w; + } +out: + free(unencoded_data); + return ret; +} + static int process_encoded_write(const char *path, const void *data, u64 offset, - u64 len, u64 unencoded_file_len, u64 unencoded_len, - u64 unencoded_offset, u32 compression, u32 encryption, void *user) + u64 len, u64 unencoded_file_len, + u64 unencoded_len, u64 unencoded_offset, + u32 compression, u32 encryption, void *user) { int ret; struct btrfs_receive *rctx = user; @@ -1007,6 +1230,7 @@ static int process_encoded_write(const char *path, const void *data, u64 offset, .compression = compression, .encryption = encryption, }; + bool encoded_write = !rctx->force_decompress; if (encryption) { error("encoded_write: encryption not supported"); @@ -1023,13 +1247,21 @@ static int process_encoded_write(const char *path, const void *data, u64 offset, if (ret < 0) return ret; - ret = ioctl(rctx->write_fd, BTRFS_IOC_ENCODED_WRITE, &encoded); - if (ret < 0) { - ret = -errno; - error("encoded_write: writing to %s failed: %m", path); - return ret; + if (encoded_write) { + ret = ioctl(rctx->write_fd, BTRFS_IOC_ENCODED_WRITE, &encoded); + if (ret >= 0) + return 0; + /* Fall back for these errors, fail hard for anything else. */ + if (errno != ENOSPC && errno != ENOTTY && errno != EINVAL) { + ret = -errno; + error("encoded_write: writing to %s failed: %m", path); + return ret; + } } - return 0; + + return decompress_and_write(rctx, data, offset, len, unencoded_file_len, + unencoded_len, unencoded_offset, + compression); } static struct btrfs_send_ops send_ops = { @@ -1209,6 +1441,12 @@ out: close(rctx->dest_dir_fd); rctx->dest_dir_fd = -1; } + if (rctx->zstd_dstream) + ZSTD_freeDStream(rctx->zstd_dstream); + if (rctx->zlib_stream) { + inflateEnd(rctx->zlib_stream); + free(rctx->zlib_stream); + } return ret; } @@ -1239,6 +1477,9 @@ static const char * const cmd_receive_usage[] = { "-m ROOTMOUNT the root mount point of the destination filesystem.", " If /proc is not accessible, use this to tell us where", " this file system is mounted.", + "--force-decompress", + " if the stream contains compressed data, always", + " decompress it instead of writing it with encoded I/O", "--dump dump stream metadata, one line per operation,", " does not require the MOUNT parameter", "-v deprecated, alias for global -v option", @@ -1282,12 +1523,16 @@ static int cmd_receive(const struct cmd_struct *cmd, int argc, char **argv) optind = 0; while (1) { int c; - enum { GETOPT_VAL_DUMP = 257 }; + enum { + GETOPT_VAL_DUMP = 257, + GETOPT_VAL_FORCE_DECOMPRESS, + }; static const struct option long_opts[] = { { "max-errors", required_argument, NULL, 'E' }, { "chroot", no_argument, NULL, 'C' }, { "dump", no_argument, NULL, GETOPT_VAL_DUMP }, { "quiet", no_argument, NULL, 'q' }, + { "force-decompress", no_argument, NULL, GETOPT_VAL_FORCE_DECOMPRESS }, { NULL, 0, NULL, 0 } }; @@ -1330,6 +1575,9 @@ static int cmd_receive(const struct cmd_struct *cmd, int argc, char **argv) case GETOPT_VAL_DUMP: dump = 1; break; + case GETOPT_VAL_FORCE_DECOMPRESS: + rctx.force_decompress = 1; + break; default: usage_unknown_option(cmd, argv); }