diff mbox series

[v6,4/4] block: apply COR-filter to block-stream jobs

Message ID 1597785880-431103-5-git-send-email-andrey.shinkevich@virtuozzo.com (mailing list archive)
State New, archived
Headers show
Series Apply COR-filter to the block-stream permanently | expand

Commit Message

Andrey Shinkevich Aug. 18, 2020, 9:24 p.m. UTC
The patch completes the series with the COR-filter insertion to any
block-stream operation. It also makes changes to the iotests 030.
The test case 'test_stream_parallel' was deleted due to multiple
errors.

Signed-off-by: Andrey Shinkevich <andrey.shinkevich@virtuozzo.com>
---
 block/stream.c             | 76 ++++++++++++++++++++++++++++++++--------------
 tests/qemu-iotests/030     | 50 +++---------------------------
 tests/qemu-iotests/030.out |  4 +--
 3 files changed, 61 insertions(+), 69 deletions(-)

Comments

Vladimir Sementsov-Ogievskiy Aug. 19, 2020, 10:46 a.m. UTC | #1
19.08.2020 00:24, Andrey Shinkevich wrote:
> The patch completes the series with the COR-filter insertion to any
> block-stream operation. It also makes changes to the iotests 030.
> The test case 'test_stream_parallel' was deleted due to multiple
> errors.

"case deleted due to errors" is a bad reasoning.

If you remove the case, you should give detailed explanation, why it is
removed, what is the problem with it, what are the consequences, what is
not supported anymore.

Also, good to note here, that adding a filter here makes possible to
implement discarding already copied regions from backing files during
the job, to reduce disk over-usage.

> 
> Signed-off-by: Andrey Shinkevich <andrey.shinkevich@virtuozzo.com>
> ---
>   block/stream.c             | 76 ++++++++++++++++++++++++++++++++--------------
>   tests/qemu-iotests/030     | 50 +++---------------------------
>   tests/qemu-iotests/030.out |  4 +--
>   3 files changed, 61 insertions(+), 69 deletions(-)
> 
> diff --git a/block/stream.c b/block/stream.c
> index 8bf6b6d..0b11979 100644
> --- a/block/stream.c
> +++ b/block/stream.c
> @@ -19,6 +19,7 @@
>   #include "qapi/qmp/qerror.h"
>   #include "qemu/ratelimit.h"
>   #include "sysemu/block-backend.h"
> +#include "block/copy-on-read.h"
>   
>   enum {
>       /*
> @@ -33,8 +34,11 @@ typedef struct StreamBlockJob {
>       BlockJob common;
>       BlockDriverState *base_overlay; /* COW overlay (stream from this) */
>       BlockDriverState *above_base;   /* Node directly above the base */
> +    BlockDriverState *cor_filter_bs;
> +    BlockDriverState *target_bs;
>       BlockdevOnError on_error;
>       char *backing_file_str;
> +    char *base_fmt;
>       bool bs_read_only;
>       bool chain_frozen;
>   } StreamBlockJob;
> @@ -53,34 +57,26 @@ static void stream_abort(Job *job)
>       StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
>   
>       if (s->chain_frozen) {
> -        BlockJob *bjob = &s->common;
> -        bdrv_unfreeze_backing_chain(blk_bs(bjob->blk), s->above_base);
> +        bdrv_unfreeze_backing_chain(s->cor_filter_bs, s->above_base);
>       }
>   }
>   
>   static int stream_prepare(Job *job)
>   {
>       StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
> -    BlockJob *bjob = &s->common;
> -    BlockDriverState *bs = blk_bs(bjob->blk);
> +    BlockDriverState *bs = s->target_bs;
>       BlockDriverState *unfiltered_bs = bdrv_skip_filters(bs);
>       BlockDriverState *base = bdrv_filter_or_cow_bs(s->above_base);
>       Error *local_err = NULL;
>       int ret = 0;
>   
> -    bdrv_unfreeze_backing_chain(bs, s->above_base);
> +    bdrv_unfreeze_backing_chain(s->cor_filter_bs, s->above_base);
>       s->chain_frozen = false;
>   
>       if (bdrv_cow_child(unfiltered_bs)) {
> -        const char *base_id = NULL, *base_fmt = NULL;
> -        if (base) {
> -            base_id = s->backing_file_str;
> -            if (base->drv) {
> -                base_fmt = base->drv->format_name;
> -            }
> -        }
>           bdrv_set_backing_hd(unfiltered_bs, base, &local_err);
> -        ret = bdrv_change_backing_file(unfiltered_bs, base_id, base_fmt);
> +        ret = bdrv_change_backing_file(unfiltered_bs, s->backing_file_str,
> +                                       s->base_fmt);
>           if (local_err) {
>               error_report_err(local_err);
>               return -EPERM;
> @@ -94,7 +90,9 @@ static void stream_clean(Job *job)
>   {
>       StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
>       BlockJob *bjob = &s->common;
> -    BlockDriverState *bs = blk_bs(bjob->blk);
> +    BlockDriverState *bs = s->target_bs;
> +
> +    bdrv_cor_filter_drop(s->cor_filter_bs);
>   
>       /* Reopen the image back in read-only mode if necessary */
>       if (s->bs_read_only) {
> @@ -104,13 +102,14 @@ static void stream_clean(Job *job)
>       }
>   
>       g_free(s->backing_file_str);
> +    g_free(s->base_fmt);
>   }
>   
>   static int coroutine_fn stream_run(Job *job, Error **errp)
>   {
>       StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
>       BlockBackend *blk = s->common.blk;
> -    BlockDriverState *bs = blk_bs(blk);
> +    BlockDriverState *bs = s->target_bs;
>       BlockDriverState *unfiltered_bs = bdrv_skip_filters(bs);
>       bool enable_cor = !bdrv_cow_child(s->base_overlay);
>       int64_t len;
> @@ -231,6 +230,12 @@ void stream_start(const char *job_id, BlockDriverState *bs,
>       int basic_flags = BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED;
>       BlockDriverState *base_overlay = bdrv_find_overlay(bs, base);
>       BlockDriverState *above_base;
> +    BlockDriverState *cor_filter_bs = NULL;
> +    char *base_fmt = NULL;
> +
> +    if (base && base->drv) {
> +        base_fmt = g_strdup(base->drv->format_name);
> +    }
>   
>       if (!base_overlay) {
>           error_setg(errp, "'%s' is not in the backing chain of '%s'",
> @@ -264,17 +269,36 @@ void stream_start(const char *job_id, BlockDriverState *bs,
>           }
>       }
>   
> -    /* Prevent concurrent jobs trying to modify the graph structure here, we
> -     * already have our own plans. Also don't allow resize as the image size is
> -     * queried only at the job start and then cached. */
> -    s = block_job_create(job_id, &stream_job_driver, NULL, bs,
> -                         basic_flags | BLK_PERM_GRAPH_MOD,
> -                         basic_flags | BLK_PERM_WRITE,
> +    cor_filter_bs = bdrv_cor_filter_append(bs, filter_node_name, errp);
> +    if (cor_filter_bs == NULL) {
> +        goto fail;
> +    }
> +
> +    if (bdrv_freeze_backing_chain(cor_filter_bs, bs, errp) < 0) {
> +        bdrv_cor_filter_drop(cor_filter_bs);
> +        cor_filter_bs = NULL;
> +        goto fail;
> +    }
> +
> +    s = block_job_create(job_id, &stream_job_driver, NULL, cor_filter_bs,
> +                         BLK_PERM_CONSISTENT_READ,
> +                         basic_flags | BLK_PERM_WRITE | BLK_PERM_GRAPH_MOD,
>                            speed, creation_flags, NULL, NULL, errp);
>       if (!s) {
>           goto fail;
>       }
>   
> +    /*
> +     * Prevent concurrent jobs trying to modify the graph structure here, we
> +     * already have our own plans. Also don't allow resize as the image size is
> +     * queried only at the job start and then cached.
> +     */
> +    if (block_job_add_bdrv(&s->common, "active node", bs,
> +                           basic_flags | BLK_PERM_GRAPH_MOD,
> +                           basic_flags | BLK_PERM_WRITE, &error_abort)) {
> +        goto fail;
> +    }
> +
>       /* Block all intermediate nodes between bs and base, because they will
>        * disappear from the chain after this operation. The streaming job reads
>        * every block only once, assuming that it doesn't change, so forbid writes
> @@ -294,6 +318,9 @@ void stream_start(const char *job_id, BlockDriverState *bs,
>   
>       s->base_overlay = base_overlay;
>       s->above_base = above_base;
> +    s->cor_filter_bs = cor_filter_bs;
> +    s->target_bs = bs;
> +    s->base_fmt = base_fmt;

it's wrong to keep base_fmt during the job, as base may change

>       s->backing_file_str = g_strdup(backing_file_str);
>       s->bs_read_only = bs_read_only;
>       s->chain_frozen = true;
> @@ -307,5 +334,10 @@ fail:
>       if (bs_read_only) {
>           bdrv_reopen_set_read_only(bs, true, NULL);
>       }
> -    bdrv_unfreeze_backing_chain(bs, above_base);
> +    if (cor_filter_bs) {
> +        bdrv_unfreeze_backing_chain(cor_filter_bs, above_base);
> +        bdrv_cor_filter_drop(cor_filter_bs);
> +    } else {
> +        bdrv_unfreeze_backing_chain(bs, above_base);
> +    }
>   }
> diff --git a/tests/qemu-iotests/030 b/tests/qemu-iotests/030
> index 1cdd7e2..fec9d89 100755
> --- a/tests/qemu-iotests/030
> +++ b/tests/qemu-iotests/030
> @@ -221,60 +221,20 @@ class TestParallelOps(iotests.QMPTestCase):
>           for img in self.imgs:
>               os.remove(img)
>   
> -    # Test that it's possible to run several block-stream operations
> -    # in parallel in the same snapshot chain
> -    def test_stream_parallel(self):
> -        self.assert_no_active_block_jobs()
> -
> -        # Check that the maps don't match before the streaming operations
> -        for i in range(2, self.num_imgs, 2):
> -            self.assertNotEqual(qemu_io('-f', iotests.imgfmt, '-rU', '-c', 'map', self.imgs[i]),
> -                                qemu_io('-f', iotests.imgfmt, '-rU', '-c', 'map', self.imgs[i-1]),
> -                                'image file map matches backing file before streaming')
> -
> -        # Create all streaming jobs
> -        pending_jobs = []
> -        for i in range(2, self.num_imgs, 2):
> -            node_name = 'node%d' % i
> -            job_id = 'stream-%s' % node_name
> -            pending_jobs.append(job_id)
> -            result = self.vm.qmp('block-stream', device=node_name, job_id=job_id, base=self.imgs[i-2], speed=512*1024)
> -            self.assert_qmp(result, 'return', {})
> -
> -        for job in pending_jobs:
> -            result = self.vm.qmp('block-job-set-speed', device=job, speed=0)
> -            self.assert_qmp(result, 'return', {})
> -
> -        # Wait for all jobs to be finished.
> -        while len(pending_jobs) > 0:
> -            for event in self.vm.get_qmp_events(wait=True):
> -                if event['event'] == 'BLOCK_JOB_COMPLETED':
> -                    job_id = self.dictpath(event, 'data/device')
> -                    self.assertTrue(job_id in pending_jobs)
> -                    self.assert_qmp_absent(event, 'data/error')
> -                    pending_jobs.remove(job_id)
> -
> -        self.assert_no_active_block_jobs()
> -        self.vm.shutdown()
> -
> -        # Check that all maps match now
> -        for i in range(2, self.num_imgs, 2):
> -            self.assertEqual(qemu_io('-f', iotests.imgfmt, '-c', 'map', self.imgs[i]),
> -                             qemu_io('-f', iotests.imgfmt, '-c', 'map', self.imgs[i-1]),
> -                             'image file map does not match backing file after streaming')
> -
>       # Test that it's not possible to perform two block-stream
>       # operations if there are nodes involved in both.
>       def test_overlapping_1(self):
>           self.assert_no_active_block_jobs()
>   
>           # Set a speed limit to make sure that this job blocks the rest
> -        result = self.vm.qmp('block-stream', device='node4', job_id='stream-node4', base=self.imgs[1], speed=1024*1024)
> +        result = self.vm.qmp('block-stream', device='node4',
> +                             job_id='stream-node4', base=self.imgs[1],
> +                             filter_node_name='stream-filter', speed=1024*1024)
>           self.assert_qmp(result, 'return', {})
>   
>           result = self.vm.qmp('block-stream', device='node5', job_id='stream-node5', base=self.imgs[2])
>           self.assert_qmp(result, 'error/desc',
> -            "Node 'node4' is busy: block device is in use by block job: stream")
> +            "Node 'stream-filter' is busy: block device is in use by block job: stream")
>   
>           result = self.vm.qmp('block-stream', device='node3', job_id='stream-node3', base=self.imgs[2])
>           self.assert_qmp(result, 'error/desc',
> @@ -287,7 +247,7 @@ class TestParallelOps(iotests.QMPTestCase):
>           # block-commit should also fail if it touches nodes used by the stream job
>           result = self.vm.qmp('block-commit', device='drive0', base=self.imgs[4], job_id='commit-node4')
>           self.assert_qmp(result, 'error/desc',
> -            "Node 'node4' is busy: block device is in use by block job: stream")
> +            "Node 'stream-filter' is busy: block device is in use by block job: stream")
>   
>           result = self.vm.qmp('block-commit', device='drive0', base=self.imgs[1], top=self.imgs[3], job_id='commit-node1')
>           self.assert_qmp(result, 'error/desc',
> diff --git a/tests/qemu-iotests/030.out b/tests/qemu-iotests/030.out
> index 6d9bee1..5eb508d 100644
> --- a/tests/qemu-iotests/030.out
> +++ b/tests/qemu-iotests/030.out
> @@ -1,5 +1,5 @@
> -...........................
> +..........................
>   ----------------------------------------------------------------------
> -Ran 27 tests
> +Ran 26 tests
>   
>   OK
> 

If we are going to drop this case.. Maybe, we should revert c624b015bf14f "block/stream: introduce a bottom node"? I see now, that it was bad idea..
Andrey Shinkevich Aug. 23, 2020, 7:28 p.m. UTC | #2
On 19.08.2020 13:46, Vladimir Sementsov-Ogievskiy wrote:
> 19.08.2020 00:24, Andrey Shinkevich wrote:
>> The patch completes the series with the COR-filter insertion to any
>> block-stream operation. It also makes changes to the iotests 030.
>> The test case 'test_stream_parallel' was deleted due to multiple
>> errors.
>
...
>
>>
>> Signed-off-by: Andrey Shinkevich <andrey.shinkevich@virtuozzo.com>
>> ---
>>   block/stream.c             | 76 
>> ++++++++++++++++++++++++++++++++--------------
>>   tests/qemu-iotests/030     | 50 +++---------------------------
>>   tests/qemu-iotests/030.out |  4 +--
>>   3 files changed, 61 insertions(+), 69 deletions(-)
>>
>> diff --git a/block/stream.c b/block/stream.c
>> index 8bf6b6d..0b11979 100644
>> --- a/block/stream.c
>> +++ b/block/stream.c
>> @@ -19,6 +19,7 @@
>>   #include "qapi/qmp/qerror.h"
>>   #include "qemu/ratelimit.h"
>>   #include "sysemu/block-backend.h"
>> +#include "block/copy-on-read.h"
>>     enum {
>>       /*
>> @@ -33,8 +34,11 @@ typedef struct StreamBlockJob {
>>       BlockJob common;
>>       BlockDriverState *base_overlay; /* COW overlay (stream from 
>> this) */
>>       BlockDriverState *above_base;   /* Node directly above the base */
>> +    BlockDriverState *cor_filter_bs;
>> +    BlockDriverState *target_bs;
>>       BlockdevOnError on_error;
>>       char *backing_file_str;
>> +    char *base_fmt;
>>       bool bs_read_only;
>>       bool chain_frozen;
>>   } StreamBlockJob;
>> @@ -53,34 +57,26 @@ static void stream_abort(Job *job)
>>       StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
>>         if (s->chain_frozen) {
>> -        BlockJob *bjob = &s->common;
>> -        bdrv_unfreeze_backing_chain(blk_bs(bjob->blk), s->above_base);
>> +        bdrv_unfreeze_backing_chain(s->cor_filter_bs, s->above_base);
>>       }
>>   }
>>     static int stream_prepare(Job *job)
>>   {
>>       StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
>> -    BlockJob *bjob = &s->common;
>> -    BlockDriverState *bs = blk_bs(bjob->blk);
>> +    BlockDriverState *bs = s->target_bs;
>>       BlockDriverState *unfiltered_bs = bdrv_skip_filters(bs);
>>       BlockDriverState *base = bdrv_filter_or_cow_bs(s->above_base);
>>       Error *local_err = NULL;
>>       int ret = 0;
>>   -    bdrv_unfreeze_backing_chain(bs, s->above_base);
>> +    bdrv_unfreeze_backing_chain(s->cor_filter_bs, s->above_base);
>>       s->chain_frozen = false;
>>         if (bdrv_cow_child(unfiltered_bs)) {
>> -        const char *base_id = NULL, *base_fmt = NULL;
>> -        if (base) {
>> -            base_id = s->backing_file_str;
>> -            if (base->drv) {
>> -                base_fmt = base->drv->format_name;
>> -            }
>> -        }
>>           bdrv_set_backing_hd(unfiltered_bs, base, &local_err);
>> -        ret = bdrv_change_backing_file(unfiltered_bs, base_id, 
>> base_fmt);
>> +        ret = bdrv_change_backing_file(unfiltered_bs, 
>> s->backing_file_str,
>> +                                       s->base_fmt);
>>           if (local_err) {
>>               error_report_err(local_err);
>>               return -EPERM;
>> @@ -94,7 +90,9 @@ static void stream_clean(Job *job)
>>   {
>>       StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
>>       BlockJob *bjob = &s->common;
>> -    BlockDriverState *bs = blk_bs(bjob->blk);
>> +    BlockDriverState *bs = s->target_bs;
>> +
>> +    bdrv_cor_filter_drop(s->cor_filter_bs);
>>         /* Reopen the image back in read-only mode if necessary */
>>       if (s->bs_read_only) {
>> @@ -104,13 +102,14 @@ static void stream_clean(Job *job)
>>       }
>>         g_free(s->backing_file_str);
>> +    g_free(s->base_fmt);
>>   }
>>     static int coroutine_fn stream_run(Job *job, Error **errp)
>>   {
>>       StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
>>       BlockBackend *blk = s->common.blk;
>> -    BlockDriverState *bs = blk_bs(blk);
>> +    BlockDriverState *bs = s->target_bs;
>>       BlockDriverState *unfiltered_bs = bdrv_skip_filters(bs);
>>       bool enable_cor = !bdrv_cow_child(s->base_overlay);
>>       int64_t len;
>> @@ -231,6 +230,12 @@ void stream_start(const char *job_id, 
>> BlockDriverState *bs,
>>       int basic_flags = BLK_PERM_CONSISTENT_READ | 
>> BLK_PERM_WRITE_UNCHANGED;
>>       BlockDriverState *base_overlay = bdrv_find_overlay(bs, base);
>>       BlockDriverState *above_base;
>> +    BlockDriverState *cor_filter_bs = NULL;
>> +    char *base_fmt = NULL;
>> +
>> +    if (base && base->drv) {
>> +        base_fmt = g_strdup(base->drv->format_name);
>> +    }
>>         if (!base_overlay) {
>>           error_setg(errp, "'%s' is not in the backing chain of '%s'",
>> @@ -264,17 +269,36 @@ void stream_start(const char *job_id, 
>> BlockDriverState *bs,
>>           }
>>       }
>>   -    /* Prevent concurrent jobs trying to modify the graph 
>> structure here, we
>> -     * already have our own plans. Also don't allow resize as the 
>> image size is
>> -     * queried only at the job start and then cached. */
>> -    s = block_job_create(job_id, &stream_job_driver, NULL, bs,
>> -                         basic_flags | BLK_PERM_GRAPH_MOD,
>> -                         basic_flags | BLK_PERM_WRITE,
>> +    cor_filter_bs = bdrv_cor_filter_append(bs, filter_node_name, errp);
>> +    if (cor_filter_bs == NULL) {
>> +        goto fail;
>> +    }
>> +
>> +    if (bdrv_freeze_backing_chain(cor_filter_bs, bs, errp) < 0) {
>> +        bdrv_cor_filter_drop(cor_filter_bs);
>> +        cor_filter_bs = NULL;
>> +        goto fail;
>> +    }
>> +
>> +    s = block_job_create(job_id, &stream_job_driver, NULL, 
>> cor_filter_bs,
>> +                         BLK_PERM_CONSISTENT_READ,
>> +                         basic_flags | BLK_PERM_WRITE | 
>> BLK_PERM_GRAPH_MOD,
>>                            speed, creation_flags, NULL, NULL, errp);
>>       if (!s) {
>>           goto fail;
>>       }
>>   +    /*
>> +     * Prevent concurrent jobs trying to modify the graph structure 
>> here, we
>> +     * already have our own plans. Also don't allow resize as the 
>> image size is
>> +     * queried only at the job start and then cached.
>> +     */
>> +    if (block_job_add_bdrv(&s->common, "active node", bs,
>> +                           basic_flags | BLK_PERM_GRAPH_MOD,
>> +                           basic_flags | BLK_PERM_WRITE, 
>> &error_abort)) {
>> +        goto fail;
>> +    }
>> +
>>       /* Block all intermediate nodes between bs and base, because 
>> they will
>>        * disappear from the chain after this operation. The streaming 
>> job reads
>>        * every block only once, assuming that it doesn't change, so 
>> forbid writes
>> @@ -294,6 +318,9 @@ void stream_start(const char *job_id, 
>> BlockDriverState *bs,
>>         s->base_overlay = base_overlay;
>>       s->above_base = above_base;
>> +    s->cor_filter_bs = cor_filter_bs;
>> +    s->target_bs = bs;
>> +    s->base_fmt = base_fmt;
>
> it's wrong to keep base_fmt during the job, as base may change


So, the format can differ from that of the backing_file_str given as the 
input parameter of the stream_start()...

...while the backing_file_str path is written to the QCOW2 header on a disk.

Andrey


>
>>       s->backing_file_str = g_strdup(backing_file_str);
>>       s->bs_read_only = bs_read_only;
>>       s->chain_frozen = true;
>> @@ -307,5 +334,10 @@ fail:
>>       if (bs_read_only) {
>>           bdrv_reopen_set_read_only(bs, true, NULL);
>>       }
>> -    bdrv_unfreeze_backing_chain(bs, above_base);
>> +    if (cor_filter_bs) {
>> +        bdrv_unfreeze_backing_chain(cor_filter_bs, above_base);
>> +        bdrv_cor_filter_drop(cor_filter_bs);
>> +    } else {
>> +        bdrv_unfreeze_backing_chain(bs, above_base);
>> +    }
>>   }
>> diff --git a/tests/qemu-iotests/030 b/tests/qemu-iotests/030
>> index 1cdd7e2..fec9d89 100755
>> --- a/tests/qemu-iotests/030
>> +++ b/tests/qemu-iotests/030
>> @@ -221,60 +221,20 @@ class TestParallelOps(iotests.QMPTestCase):
>>           for img in self.imgs:
>>               os.remove(img)
...
Vladimir Sementsov-Ogievskiy Aug. 24, 2020, 8:20 a.m. UTC | #3
23.08.2020 22:28, Andrey Shinkevich wrote:
> On 19.08.2020 13:46, Vladimir Sementsov-Ogievskiy wrote:
>> 19.08.2020 00:24, Andrey Shinkevich wrote:
>>> The patch completes the series with the COR-filter insertion to any
>>> block-stream operation. It also makes changes to the iotests 030.
>>> The test case 'test_stream_parallel' was deleted due to multiple
>>> errors.
>>
> ...
>>
>>>
>>> Signed-off-by: Andrey Shinkevich <andrey.shinkevich@virtuozzo.com>
>>> ---
>>>   block/stream.c             | 76 ++++++++++++++++++++++++++++++++--------------
>>>   tests/qemu-iotests/030     | 50 +++---------------------------
>>>   tests/qemu-iotests/030.out |  4 +--
>>>   3 files changed, 61 insertions(+), 69 deletions(-)
>>>
>>> diff --git a/block/stream.c b/block/stream.c
>>> index 8bf6b6d..0b11979 100644
>>> --- a/block/stream.c
>>> +++ b/block/stream.c
>>> @@ -19,6 +19,7 @@
>>>   #include "qapi/qmp/qerror.h"
>>>   #include "qemu/ratelimit.h"
>>>   #include "sysemu/block-backend.h"
>>> +#include "block/copy-on-read.h"
>>>     enum {
>>>       /*
>>> @@ -33,8 +34,11 @@ typedef struct StreamBlockJob {
>>>       BlockJob common;
>>>       BlockDriverState *base_overlay; /* COW overlay (stream from this) */
>>>       BlockDriverState *above_base;   /* Node directly above the base */
>>> +    BlockDriverState *cor_filter_bs;
>>> +    BlockDriverState *target_bs;
>>>       BlockdevOnError on_error;
>>>       char *backing_file_str;
>>> +    char *base_fmt;
>>>       bool bs_read_only;
>>>       bool chain_frozen;
>>>   } StreamBlockJob;
>>> @@ -53,34 +57,26 @@ static void stream_abort(Job *job)
>>>       StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
>>>         if (s->chain_frozen) {
>>> -        BlockJob *bjob = &s->common;
>>> -        bdrv_unfreeze_backing_chain(blk_bs(bjob->blk), s->above_base);
>>> +        bdrv_unfreeze_backing_chain(s->cor_filter_bs, s->above_base);
>>>       }
>>>   }
>>>     static int stream_prepare(Job *job)
>>>   {
>>>       StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
>>> -    BlockJob *bjob = &s->common;
>>> -    BlockDriverState *bs = blk_bs(bjob->blk);
>>> +    BlockDriverState *bs = s->target_bs;
>>>       BlockDriverState *unfiltered_bs = bdrv_skip_filters(bs);
>>>       BlockDriverState *base = bdrv_filter_or_cow_bs(s->above_base);
>>>       Error *local_err = NULL;
>>>       int ret = 0;
>>>   -    bdrv_unfreeze_backing_chain(bs, s->above_base);
>>> +    bdrv_unfreeze_backing_chain(s->cor_filter_bs, s->above_base);
>>>       s->chain_frozen = false;
>>>         if (bdrv_cow_child(unfiltered_bs)) {
>>> -        const char *base_id = NULL, *base_fmt = NULL;
>>> -        if (base) {
>>> -            base_id = s->backing_file_str;
>>> -            if (base->drv) {
>>> -                base_fmt = base->drv->format_name;
>>> -            }
>>> -        }
>>>           bdrv_set_backing_hd(unfiltered_bs, base, &local_err);
>>> -        ret = bdrv_change_backing_file(unfiltered_bs, base_id, base_fmt);
>>> +        ret = bdrv_change_backing_file(unfiltered_bs, s->backing_file_str,
>>> +                                       s->base_fmt);
>>>           if (local_err) {
>>>               error_report_err(local_err);
>>>               return -EPERM;
>>> @@ -94,7 +90,9 @@ static void stream_clean(Job *job)
>>>   {
>>>       StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
>>>       BlockJob *bjob = &s->common;
>>> -    BlockDriverState *bs = blk_bs(bjob->blk);
>>> +    BlockDriverState *bs = s->target_bs;
>>> +
>>> +    bdrv_cor_filter_drop(s->cor_filter_bs);
>>>         /* Reopen the image back in read-only mode if necessary */
>>>       if (s->bs_read_only) {
>>> @@ -104,13 +102,14 @@ static void stream_clean(Job *job)
>>>       }
>>>         g_free(s->backing_file_str);
>>> +    g_free(s->base_fmt);
>>>   }
>>>     static int coroutine_fn stream_run(Job *job, Error **errp)
>>>   {
>>>       StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
>>>       BlockBackend *blk = s->common.blk;
>>> -    BlockDriverState *bs = blk_bs(blk);
>>> +    BlockDriverState *bs = s->target_bs;
>>>       BlockDriverState *unfiltered_bs = bdrv_skip_filters(bs);
>>>       bool enable_cor = !bdrv_cow_child(s->base_overlay);
>>>       int64_t len;
>>> @@ -231,6 +230,12 @@ void stream_start(const char *job_id, BlockDriverState *bs,
>>>       int basic_flags = BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED;
>>>       BlockDriverState *base_overlay = bdrv_find_overlay(bs, base);
>>>       BlockDriverState *above_base;
>>> +    BlockDriverState *cor_filter_bs = NULL;
>>> +    char *base_fmt = NULL;
>>> +
>>> +    if (base && base->drv) {
>>> +        base_fmt = g_strdup(base->drv->format_name);
>>> +    }
>>>         if (!base_overlay) {
>>>           error_setg(errp, "'%s' is not in the backing chain of '%s'",
>>> @@ -264,17 +269,36 @@ void stream_start(const char *job_id, BlockDriverState *bs,
>>>           }
>>>       }
>>>   -    /* Prevent concurrent jobs trying to modify the graph structure here, we
>>> -     * already have our own plans. Also don't allow resize as the image size is
>>> -     * queried only at the job start and then cached. */
>>> -    s = block_job_create(job_id, &stream_job_driver, NULL, bs,
>>> -                         basic_flags | BLK_PERM_GRAPH_MOD,
>>> -                         basic_flags | BLK_PERM_WRITE,
>>> +    cor_filter_bs = bdrv_cor_filter_append(bs, filter_node_name, errp);
>>> +    if (cor_filter_bs == NULL) {
>>> +        goto fail;
>>> +    }
>>> +
>>> +    if (bdrv_freeze_backing_chain(cor_filter_bs, bs, errp) < 0) {
>>> +        bdrv_cor_filter_drop(cor_filter_bs);
>>> +        cor_filter_bs = NULL;
>>> +        goto fail;
>>> +    }
>>> +
>>> +    s = block_job_create(job_id, &stream_job_driver, NULL, cor_filter_bs,
>>> +                         BLK_PERM_CONSISTENT_READ,
>>> +                         basic_flags | BLK_PERM_WRITE | BLK_PERM_GRAPH_MOD,
>>>                            speed, creation_flags, NULL, NULL, errp);
>>>       if (!s) {
>>>           goto fail;
>>>       }
>>>   +    /*
>>> +     * Prevent concurrent jobs trying to modify the graph structure here, we
>>> +     * already have our own plans. Also don't allow resize as the image size is
>>> +     * queried only at the job start and then cached.
>>> +     */
>>> +    if (block_job_add_bdrv(&s->common, "active node", bs,
>>> +                           basic_flags | BLK_PERM_GRAPH_MOD,
>>> +                           basic_flags | BLK_PERM_WRITE, &error_abort)) {
>>> +        goto fail;
>>> +    }
>>> +
>>>       /* Block all intermediate nodes between bs and base, because they will
>>>        * disappear from the chain after this operation. The streaming job reads
>>>        * every block only once, assuming that it doesn't change, so forbid writes
>>> @@ -294,6 +318,9 @@ void stream_start(const char *job_id, BlockDriverState *bs,
>>>         s->base_overlay = base_overlay;
>>>       s->above_base = above_base;
>>> +    s->cor_filter_bs = cor_filter_bs;
>>> +    s->target_bs = bs;
>>> +    s->base_fmt = base_fmt;
>>
>> it's wrong to keep base_fmt during the job, as base may change
> 
> 
> So, the format can differ from that of the backing_file_str given as the input parameter of the stream_start()...
> 
> ...while the backing_file_str path is written to the QCOW2 header on a disk.
> 

Or better, let's try to revert c624b015bf14f and freeze base again.

> 
> 
>>
>>>       s->backing_file_str = g_strdup(backing_file_str);
>>>       s->bs_read_only = bs_read_only;
>>>       s->chain_frozen = true;
>>> @@ -307,5 +334,10 @@ fail:
>>>       if (bs_read_only) {
>>>           bdrv_reopen_set_read_only(bs, true, NULL);
>>>       }
>>> -    bdrv_unfreeze_backing_chain(bs, above_base);
>>> +    if (cor_filter_bs) {
>>> +        bdrv_unfreeze_backing_chain(cor_filter_bs, above_base);
>>> +        bdrv_cor_filter_drop(cor_filter_bs);
>>> +    } else {
>>> +        bdrv_unfreeze_backing_chain(bs, above_base);
>>> +    }
>>>   }
>>> diff --git a/tests/qemu-iotests/030 b/tests/qemu-iotests/030
>>> index 1cdd7e2..fec9d89 100755
>>> --- a/tests/qemu-iotests/030
>>> +++ b/tests/qemu-iotests/030
>>> @@ -221,60 +221,20 @@ class TestParallelOps(iotests.QMPTestCase):
>>>           for img in self.imgs:
>>>               os.remove(img)
> ...
Andrey Shinkevich Aug. 24, 2020, 8:49 a.m. UTC | #4
On 24.08.2020 11:20, Vladimir Sementsov-Ogievskiy wrote:
> 23.08.2020 22:28, Andrey Shinkevich wrote:
>> On 19.08.2020 13:46, Vladimir Sementsov-Ogievskiy wrote:
>>> 19.08.2020 00:24, Andrey Shinkevich wrote:
>>>> The patch completes the series with the COR-filter insertion to any
>>>> block-stream operation. It also makes changes to the iotests 030.
>>>> The test case 'test_stream_parallel' was deleted due to multiple
>>>> errors.
>>>
>> ...
>>>
>>>>
>>>> Signed-off-by: Andrey Shinkevich <andrey.shinkevich@virtuozzo.com>
>>>> ---
>>>>   block/stream.c             | 76 
>>>> ++++++++++++++++++++++++++++++++--------------
>>>>   tests/qemu-iotests/030     | 50 +++---------------------------
>>>>   tests/qemu-iotests/030.out |  4 +--
>>>>   3 files changed, 61 insertions(+), 69 deletions(-)
>>>>
>>>> diff --git a/block/stream.c b/block/stream.c
>>>> index 8bf6b6d..0b11979 100644
>>>> --- a/block/stream.c
>>>> +++ b/block/stream.c
>>>> @@ -19,6 +19,7 @@
>>>>   #include "qapi/qmp/qerror.h"
>>>>   #include "qemu/ratelimit.h"
>>>>   #include "sysemu/block-backend.h"
>>>> +#include "block/copy-on-read.h"
>>>>     enum {
>>>>       /*
>>>> @@ -33,8 +34,11 @@ typedef struct StreamBlockJob {
>>>>       BlockJob common;
>>>>       BlockDriverState *base_overlay; /* COW overlay (stream from 
>>>> this) */
>>>>       BlockDriverState *above_base;   /* Node directly above the 
>>>> base */
>>>> +    BlockDriverState *cor_filter_bs;
>>>> +    BlockDriverState *target_bs;
>>>>       BlockdevOnError on_error;
>>>>       char *backing_file_str;
>>>> +    char *base_fmt;
>>>>       bool bs_read_only;
>>>>       bool chain_frozen;
>>>>   } StreamBlockJob;
>>>> @@ -53,34 +57,26 @@ static void stream_abort(Job *job)
>>>>       StreamBlockJob *s = container_of(job, StreamBlockJob, 
>>>> common.job);
>>>>         if (s->chain_frozen) {
>>>> -        BlockJob *bjob = &s->common;
>>>> -        bdrv_unfreeze_backing_chain(blk_bs(bjob->blk), 
>>>> s->above_base);
>>>> +        bdrv_unfreeze_backing_chain(s->cor_filter_bs, s->above_base);
>>>>       }
>>>>   }
>>>>     static int stream_prepare(Job *job)
>>>>   {
>>>>       StreamBlockJob *s = container_of(job, StreamBlockJob, 
>>>> common.job);
>>>> -    BlockJob *bjob = &s->common;
>>>> -    BlockDriverState *bs = blk_bs(bjob->blk);
>>>> +    BlockDriverState *bs = s->target_bs;
>>>>       BlockDriverState *unfiltered_bs = bdrv_skip_filters(bs);
>>>>       BlockDriverState *base = bdrv_filter_or_cow_bs(s->above_base);
>>>>       Error *local_err = NULL;
>>>>       int ret = 0;
>>>>   -    bdrv_unfreeze_backing_chain(bs, s->above_base);
>>>> +    bdrv_unfreeze_backing_chain(s->cor_filter_bs, s->above_base);
>>>>       s->chain_frozen = false;
>>>>         if (bdrv_cow_child(unfiltered_bs)) {
>>>> -        const char *base_id = NULL, *base_fmt = NULL;
>>>> -        if (base) {
>>>> -            base_id = s->backing_file_str;
>>>> -            if (base->drv) {
>>>> -                base_fmt = base->drv->format_name;
>>>> -            }
>>>> -        }
>>>>           bdrv_set_backing_hd(unfiltered_bs, base, &local_err);
>>>> -        ret = bdrv_change_backing_file(unfiltered_bs, base_id, 
>>>> base_fmt);
>>>> +        ret = bdrv_change_backing_file(unfiltered_bs, 
>>>> s->backing_file_str,
>>>> +                                       s->base_fmt);
>>>>           if (local_err) {
>>>>               error_report_err(local_err);
>>>>               return -EPERM;
>>>> @@ -94,7 +90,9 @@ static void stream_clean(Job *job)
>>>>   {
>>>>       StreamBlockJob *s = container_of(job, StreamBlockJob, 
>>>> common.job);
>>>>       BlockJob *bjob = &s->common;
>>>> -    BlockDriverState *bs = blk_bs(bjob->blk);
>>>> +    BlockDriverState *bs = s->target_bs;
>>>> +
>>>> +    bdrv_cor_filter_drop(s->cor_filter_bs);
>>>>         /* Reopen the image back in read-only mode if necessary */
>>>>       if (s->bs_read_only) {
>>>> @@ -104,13 +102,14 @@ static void stream_clean(Job *job)
>>>>       }
>>>>         g_free(s->backing_file_str);
>>>> +    g_free(s->base_fmt);
>>>>   }
>>>>     static int coroutine_fn stream_run(Job *job, Error **errp)
>>>>   {
>>>>       StreamBlockJob *s = container_of(job, StreamBlockJob, 
>>>> common.job);
>>>>       BlockBackend *blk = s->common.blk;
>>>> -    BlockDriverState *bs = blk_bs(blk);
>>>> +    BlockDriverState *bs = s->target_bs;
>>>>       BlockDriverState *unfiltered_bs = bdrv_skip_filters(bs);
>>>>       bool enable_cor = !bdrv_cow_child(s->base_overlay);
>>>>       int64_t len;
>>>> @@ -231,6 +230,12 @@ void stream_start(const char *job_id, 
>>>> BlockDriverState *bs,
>>>>       int basic_flags = BLK_PERM_CONSISTENT_READ | 
>>>> BLK_PERM_WRITE_UNCHANGED;
>>>>       BlockDriverState *base_overlay = bdrv_find_overlay(bs, base);
>>>>       BlockDriverState *above_base;
>>>> +    BlockDriverState *cor_filter_bs = NULL;
>>>> +    char *base_fmt = NULL;
>>>> +
>>>> +    if (base && base->drv) {
>>>> +        base_fmt = g_strdup(base->drv->format_name);
>>>> +    }
>>>>         if (!base_overlay) {
>>>>           error_setg(errp, "'%s' is not in the backing chain of '%s'",
>>>> @@ -264,17 +269,36 @@ void stream_start(const char *job_id, 
>>>> BlockDriverState *bs,
>>>>           }
>>>>       }
>>>>   -    /* Prevent concurrent jobs trying to modify the graph 
>>>> structure here, we
>>>> -     * already have our own plans. Also don't allow resize as the 
>>>> image size is
>>>> -     * queried only at the job start and then cached. */
>>>> -    s = block_job_create(job_id, &stream_job_driver, NULL, bs,
>>>> -                         basic_flags | BLK_PERM_GRAPH_MOD,
>>>> -                         basic_flags | BLK_PERM_WRITE,
>>>> +    cor_filter_bs = bdrv_cor_filter_append(bs, filter_node_name, 
>>>> errp);
>>>> +    if (cor_filter_bs == NULL) {
>>>> +        goto fail;
>>>> +    }
>>>> +
>>>> +    if (bdrv_freeze_backing_chain(cor_filter_bs, bs, errp) < 0) {
>>>> +        bdrv_cor_filter_drop(cor_filter_bs);
>>>> +        cor_filter_bs = NULL;
>>>> +        goto fail;
>>>> +    }
>>>> +
>>>> +    s = block_job_create(job_id, &stream_job_driver, NULL, 
>>>> cor_filter_bs,
>>>> +                         BLK_PERM_CONSISTENT_READ,
>>>> +                         basic_flags | BLK_PERM_WRITE | 
>>>> BLK_PERM_GRAPH_MOD,
>>>>                            speed, creation_flags, NULL, NULL, errp);
>>>>       if (!s) {
>>>>           goto fail;
>>>>       }
>>>>   +    /*
>>>> +     * Prevent concurrent jobs trying to modify the graph 
>>>> structure here, we
>>>> +     * already have our own plans. Also don't allow resize as the 
>>>> image size is
>>>> +     * queried only at the job start and then cached.
>>>> +     */
>>>> +    if (block_job_add_bdrv(&s->common, "active node", bs,
>>>> +                           basic_flags | BLK_PERM_GRAPH_MOD,
>>>> +                           basic_flags | BLK_PERM_WRITE, 
>>>> &error_abort)) {
>>>> +        goto fail;
>>>> +    }
>>>> +
>>>>       /* Block all intermediate nodes between bs and base, because 
>>>> they will
>>>>        * disappear from the chain after this operation. The 
>>>> streaming job reads
>>>>        * every block only once, assuming that it doesn't change, so 
>>>> forbid writes
>>>> @@ -294,6 +318,9 @@ void stream_start(const char *job_id, 
>>>> BlockDriverState *bs,
>>>>         s->base_overlay = base_overlay;
>>>>       s->above_base = above_base;
>>>> +    s->cor_filter_bs = cor_filter_bs;
>>>> +    s->target_bs = bs;
>>>> +    s->base_fmt = base_fmt;
>>>
>>> it's wrong to keep base_fmt during the job, as base may change
>>
>>
>> So, the format can differ from that of the backing_file_str given as 
>> the input parameter of the stream_start()...
>>
>> ...while the backing_file_str path is written to the QCOW2 header on 
>> a disk.
>>
>
> Or better, let's try to revert c624b015bf14f and freeze base again.


The reversion won't help as the patch "stream: Deal with filters" did 
that work already. We have to freeze the base again. I guess it will 
discard the concept of the 'base_overlay' and the  'above_base'  
introduced by Max in the series "block: Deal withfilters".

Andrey


>
>>
>>
>>>
>>>>       s->backing_file_str = g_strdup(backing_file_str);
>>>>       s->bs_read_only = bs_read_only;
>>>>       s->chain_frozen = true;
>>>> @@ -307,5 +334,10 @@ fail:
>>>>       if (bs_read_only) {
>>>>           bdrv_reopen_set_read_only(bs, true, NULL);
>>>>       }
>>>> -    bdrv_unfreeze_backing_chain(bs, above_base);
>>>> +    if (cor_filter_bs) {
>>>> +        bdrv_unfreeze_backing_chain(cor_filter_bs, above_base);
>>>> +        bdrv_cor_filter_drop(cor_filter_bs);
>>>> +    } else {
>>>> +        bdrv_unfreeze_backing_chain(bs, above_base);
>>>> +    }
>>>>   }
>>>> diff --git a/tests/qemu-iotests/030 b/tests/qemu-iotests/030
>>>> index 1cdd7e2..fec9d89 100755
>>>> --- a/tests/qemu-iotests/030
>>>> +++ b/tests/qemu-iotests/030
>>>> @@ -221,60 +221,20 @@ class TestParallelOps(iotests.QMPTestCase):
>>>>           for img in self.imgs:
>>>>               os.remove(img)
>> ...
>
>
diff mbox series

Patch

diff --git a/block/stream.c b/block/stream.c
index 8bf6b6d..0b11979 100644
--- a/block/stream.c
+++ b/block/stream.c
@@ -19,6 +19,7 @@ 
 #include "qapi/qmp/qerror.h"
 #include "qemu/ratelimit.h"
 #include "sysemu/block-backend.h"
+#include "block/copy-on-read.h"
 
 enum {
     /*
@@ -33,8 +34,11 @@  typedef struct StreamBlockJob {
     BlockJob common;
     BlockDriverState *base_overlay; /* COW overlay (stream from this) */
     BlockDriverState *above_base;   /* Node directly above the base */
+    BlockDriverState *cor_filter_bs;
+    BlockDriverState *target_bs;
     BlockdevOnError on_error;
     char *backing_file_str;
+    char *base_fmt;
     bool bs_read_only;
     bool chain_frozen;
 } StreamBlockJob;
@@ -53,34 +57,26 @@  static void stream_abort(Job *job)
     StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
 
     if (s->chain_frozen) {
-        BlockJob *bjob = &s->common;
-        bdrv_unfreeze_backing_chain(blk_bs(bjob->blk), s->above_base);
+        bdrv_unfreeze_backing_chain(s->cor_filter_bs, s->above_base);
     }
 }
 
 static int stream_prepare(Job *job)
 {
     StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
-    BlockJob *bjob = &s->common;
-    BlockDriverState *bs = blk_bs(bjob->blk);
+    BlockDriverState *bs = s->target_bs;
     BlockDriverState *unfiltered_bs = bdrv_skip_filters(bs);
     BlockDriverState *base = bdrv_filter_or_cow_bs(s->above_base);
     Error *local_err = NULL;
     int ret = 0;
 
-    bdrv_unfreeze_backing_chain(bs, s->above_base);
+    bdrv_unfreeze_backing_chain(s->cor_filter_bs, s->above_base);
     s->chain_frozen = false;
 
     if (bdrv_cow_child(unfiltered_bs)) {
-        const char *base_id = NULL, *base_fmt = NULL;
-        if (base) {
-            base_id = s->backing_file_str;
-            if (base->drv) {
-                base_fmt = base->drv->format_name;
-            }
-        }
         bdrv_set_backing_hd(unfiltered_bs, base, &local_err);
-        ret = bdrv_change_backing_file(unfiltered_bs, base_id, base_fmt);
+        ret = bdrv_change_backing_file(unfiltered_bs, s->backing_file_str,
+                                       s->base_fmt);
         if (local_err) {
             error_report_err(local_err);
             return -EPERM;
@@ -94,7 +90,9 @@  static void stream_clean(Job *job)
 {
     StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
     BlockJob *bjob = &s->common;
-    BlockDriverState *bs = blk_bs(bjob->blk);
+    BlockDriverState *bs = s->target_bs;
+
+    bdrv_cor_filter_drop(s->cor_filter_bs);
 
     /* Reopen the image back in read-only mode if necessary */
     if (s->bs_read_only) {
@@ -104,13 +102,14 @@  static void stream_clean(Job *job)
     }
 
     g_free(s->backing_file_str);
+    g_free(s->base_fmt);
 }
 
 static int coroutine_fn stream_run(Job *job, Error **errp)
 {
     StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
     BlockBackend *blk = s->common.blk;
-    BlockDriverState *bs = blk_bs(blk);
+    BlockDriverState *bs = s->target_bs;
     BlockDriverState *unfiltered_bs = bdrv_skip_filters(bs);
     bool enable_cor = !bdrv_cow_child(s->base_overlay);
     int64_t len;
@@ -231,6 +230,12 @@  void stream_start(const char *job_id, BlockDriverState *bs,
     int basic_flags = BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED;
     BlockDriverState *base_overlay = bdrv_find_overlay(bs, base);
     BlockDriverState *above_base;
+    BlockDriverState *cor_filter_bs = NULL;
+    char *base_fmt = NULL;
+
+    if (base && base->drv) {
+        base_fmt = g_strdup(base->drv->format_name);
+    }
 
     if (!base_overlay) {
         error_setg(errp, "'%s' is not in the backing chain of '%s'",
@@ -264,17 +269,36 @@  void stream_start(const char *job_id, BlockDriverState *bs,
         }
     }
 
-    /* Prevent concurrent jobs trying to modify the graph structure here, we
-     * already have our own plans. Also don't allow resize as the image size is
-     * queried only at the job start and then cached. */
-    s = block_job_create(job_id, &stream_job_driver, NULL, bs,
-                         basic_flags | BLK_PERM_GRAPH_MOD,
-                         basic_flags | BLK_PERM_WRITE,
+    cor_filter_bs = bdrv_cor_filter_append(bs, filter_node_name, errp);
+    if (cor_filter_bs == NULL) {
+        goto fail;
+    }
+
+    if (bdrv_freeze_backing_chain(cor_filter_bs, bs, errp) < 0) {
+        bdrv_cor_filter_drop(cor_filter_bs);
+        cor_filter_bs = NULL;
+        goto fail;
+    }
+
+    s = block_job_create(job_id, &stream_job_driver, NULL, cor_filter_bs,
+                         BLK_PERM_CONSISTENT_READ,
+                         basic_flags | BLK_PERM_WRITE | BLK_PERM_GRAPH_MOD,
                          speed, creation_flags, NULL, NULL, errp);
     if (!s) {
         goto fail;
     }
 
+    /*
+     * Prevent concurrent jobs trying to modify the graph structure here, we
+     * already have our own plans. Also don't allow resize as the image size is
+     * queried only at the job start and then cached.
+     */
+    if (block_job_add_bdrv(&s->common, "active node", bs,
+                           basic_flags | BLK_PERM_GRAPH_MOD,
+                           basic_flags | BLK_PERM_WRITE, &error_abort)) {
+        goto fail;
+    }
+
     /* Block all intermediate nodes between bs and base, because they will
      * disappear from the chain after this operation. The streaming job reads
      * every block only once, assuming that it doesn't change, so forbid writes
@@ -294,6 +318,9 @@  void stream_start(const char *job_id, BlockDriverState *bs,
 
     s->base_overlay = base_overlay;
     s->above_base = above_base;
+    s->cor_filter_bs = cor_filter_bs;
+    s->target_bs = bs;
+    s->base_fmt = base_fmt;
     s->backing_file_str = g_strdup(backing_file_str);
     s->bs_read_only = bs_read_only;
     s->chain_frozen = true;
@@ -307,5 +334,10 @@  fail:
     if (bs_read_only) {
         bdrv_reopen_set_read_only(bs, true, NULL);
     }
-    bdrv_unfreeze_backing_chain(bs, above_base);
+    if (cor_filter_bs) {
+        bdrv_unfreeze_backing_chain(cor_filter_bs, above_base);
+        bdrv_cor_filter_drop(cor_filter_bs);
+    } else {
+        bdrv_unfreeze_backing_chain(bs, above_base);
+    }
 }
diff --git a/tests/qemu-iotests/030 b/tests/qemu-iotests/030
index 1cdd7e2..fec9d89 100755
--- a/tests/qemu-iotests/030
+++ b/tests/qemu-iotests/030
@@ -221,60 +221,20 @@  class TestParallelOps(iotests.QMPTestCase):
         for img in self.imgs:
             os.remove(img)
 
-    # Test that it's possible to run several block-stream operations
-    # in parallel in the same snapshot chain
-    def test_stream_parallel(self):
-        self.assert_no_active_block_jobs()
-
-        # Check that the maps don't match before the streaming operations
-        for i in range(2, self.num_imgs, 2):
-            self.assertNotEqual(qemu_io('-f', iotests.imgfmt, '-rU', '-c', 'map', self.imgs[i]),
-                                qemu_io('-f', iotests.imgfmt, '-rU', '-c', 'map', self.imgs[i-1]),
-                                'image file map matches backing file before streaming')
-
-        # Create all streaming jobs
-        pending_jobs = []
-        for i in range(2, self.num_imgs, 2):
-            node_name = 'node%d' % i
-            job_id = 'stream-%s' % node_name
-            pending_jobs.append(job_id)
-            result = self.vm.qmp('block-stream', device=node_name, job_id=job_id, base=self.imgs[i-2], speed=512*1024)
-            self.assert_qmp(result, 'return', {})
-
-        for job in pending_jobs:
-            result = self.vm.qmp('block-job-set-speed', device=job, speed=0)
-            self.assert_qmp(result, 'return', {})
-
-        # Wait for all jobs to be finished.
-        while len(pending_jobs) > 0:
-            for event in self.vm.get_qmp_events(wait=True):
-                if event['event'] == 'BLOCK_JOB_COMPLETED':
-                    job_id = self.dictpath(event, 'data/device')
-                    self.assertTrue(job_id in pending_jobs)
-                    self.assert_qmp_absent(event, 'data/error')
-                    pending_jobs.remove(job_id)
-
-        self.assert_no_active_block_jobs()
-        self.vm.shutdown()
-
-        # Check that all maps match now
-        for i in range(2, self.num_imgs, 2):
-            self.assertEqual(qemu_io('-f', iotests.imgfmt, '-c', 'map', self.imgs[i]),
-                             qemu_io('-f', iotests.imgfmt, '-c', 'map', self.imgs[i-1]),
-                             'image file map does not match backing file after streaming')
-
     # Test that it's not possible to perform two block-stream
     # operations if there are nodes involved in both.
     def test_overlapping_1(self):
         self.assert_no_active_block_jobs()
 
         # Set a speed limit to make sure that this job blocks the rest
-        result = self.vm.qmp('block-stream', device='node4', job_id='stream-node4', base=self.imgs[1], speed=1024*1024)
+        result = self.vm.qmp('block-stream', device='node4',
+                             job_id='stream-node4', base=self.imgs[1],
+                             filter_node_name='stream-filter', speed=1024*1024)
         self.assert_qmp(result, 'return', {})
 
         result = self.vm.qmp('block-stream', device='node5', job_id='stream-node5', base=self.imgs[2])
         self.assert_qmp(result, 'error/desc',
-            "Node 'node4' is busy: block device is in use by block job: stream")
+            "Node 'stream-filter' is busy: block device is in use by block job: stream")
 
         result = self.vm.qmp('block-stream', device='node3', job_id='stream-node3', base=self.imgs[2])
         self.assert_qmp(result, 'error/desc',
@@ -287,7 +247,7 @@  class TestParallelOps(iotests.QMPTestCase):
         # block-commit should also fail if it touches nodes used by the stream job
         result = self.vm.qmp('block-commit', device='drive0', base=self.imgs[4], job_id='commit-node4')
         self.assert_qmp(result, 'error/desc',
-            "Node 'node4' is busy: block device is in use by block job: stream")
+            "Node 'stream-filter' is busy: block device is in use by block job: stream")
 
         result = self.vm.qmp('block-commit', device='drive0', base=self.imgs[1], top=self.imgs[3], job_id='commit-node1')
         self.assert_qmp(result, 'error/desc',
diff --git a/tests/qemu-iotests/030.out b/tests/qemu-iotests/030.out
index 6d9bee1..5eb508d 100644
--- a/tests/qemu-iotests/030.out
+++ b/tests/qemu-iotests/030.out
@@ -1,5 +1,5 @@ 
-...........................
+..........................
 ----------------------------------------------------------------------
-Ran 27 tests
+Ran 26 tests
 
 OK