diff mbox

[RFC] block: add throttle block filter driver

Message ID 20170607130619.1545-1-el13635@mail.ntua.gr (mailing list archive)
State New, archived
Headers show

Commit Message

Manos Pitsidianakis June 7, 2017, 1:06 p.m. UTC
This is part of my work for my GSOC project this summer. 

I am not sure if the count parameter in  bdrv_co_pwrite_zeros and 
bdrv_co_pdiscard refers to sectors or bytes. Should this be handled differently?

The driver can only be instantiated in the command line for now like in the
following examples:
-drive driver=throttle,file.filename=foo.qcow2,iops-total=100,
iops-total-max=2000,iops-total-max-length=60,throttle-group=foobar
-drive driver=throttle,file.filename=foo.qcow2,iops-total=100,
throttle-group=foobar

This commit copies throttle group infrastructure from
block/throttle-groups.c to block/throttle.c to be used as a block filter
driver.

Signed-off-by: Manos Pitsidianakis <el13635@mail.ntua.gr>
---
 block/Makefile.objs      |   1 +
 block/throttle.c         | 747 +++++++++++++++++++++++++++++++++++++++++++++++
 include/block/throttle.h |  59 ++++
 3 files changed, 807 insertions(+)
 create mode 100644 block/throttle.c
 create mode 100644 include/block/throttle.h

Comments

Eric Blake June 7, 2017, 3:40 p.m. UTC | #1
On 06/07/2017 08:06 AM, Manos Pitsidianakis wrote:
> This is part of my work for my GSOC project this summer. 
> 
> I am not sure if the count parameter in  bdrv_co_pwrite_zeros and 
> bdrv_co_pdiscard refers to sectors or bytes.

Both refer to byte counts.  (We're trying to get rid of as many
sector-based interfaces as possible, but it's slow going to make sure
the conversions are still accurate).
Manos Pitsidianakis June 7, 2017, 4:06 p.m. UTC | #2
On Wed, Jun 07, 2017 at 10:40:08AM -0500, Eric Blake wrote:
>On 06/07/2017 08:06 AM, Manos Pitsidianakis wrote:
>> This is part of my work for my GSOC project this summer.
>>
>> I am not sure if the count parameter in  bdrv_co_pwrite_zeros and
>> bdrv_co_pdiscard refers to sectors or bytes.
>
>Both refer to byte counts.  (We're trying to get rid of as many
>sector-based interfaces as possible, but it's slow going to make sure
>the conversions are still accurate).

I see. Would changing the names in the BlockDriverState function 
signatures to reflect that be okay?
Eric Blake June 7, 2017, 9:38 p.m. UTC | #3
On 06/07/2017 11:06 AM, Manos Pitsidianakis wrote:
> On Wed, Jun 07, 2017 at 10:40:08AM -0500, Eric Blake wrote:
>> On 06/07/2017 08:06 AM, Manos Pitsidianakis wrote:
>>> This is part of my work for my GSOC project this summer.
>>>
>>> I am not sure if the count parameter in  bdrv_co_pwrite_zeros and
>>> bdrv_co_pdiscard refers to sectors or bytes.
>>
>> Both refer to byte counts.  (We're trying to get rid of as many
>> sector-based interfaces as possible, but it's slow going to make sure
>> the conversions are still accurate).
> 
> I see. Would changing the names in the BlockDriverState function
> signatures to reflect that be okay?

What names need changing?
Manos Pitsidianakis June 7, 2017, 10:08 p.m. UTC | #4
On Wed, Jun 07, 2017 at 04:38:43PM -0500, Eric Blake wrote:
>On 06/07/2017 11:06 AM, Manos Pitsidianakis wrote:
>> On Wed, Jun 07, 2017 at 10:40:08AM -0500, Eric Blake wrote:
>>> On 06/07/2017 08:06 AM, Manos Pitsidianakis wrote:
>>>> This is part of my work for my GSOC project this summer.
>>>>
>>>> I am not sure if the count parameter in  bdrv_co_pwrite_zeros and
>>>> bdrv_co_pdiscard refers to sectors or bytes.
>>>
>>> Both refer to byte counts.  (We're trying to get rid of as many
>>> sector-based interfaces as possible, but it's slow going to make sure
>>> the conversions are still accurate).
>>
>> I see. Would changing the names in the BlockDriverState function
>> signatures to reflect that be okay?
>
>What names need changing?

I meant, change "int count" to "int bytes" in the bdrv_co_pwrite_zeros 
and bdrv_co_pdiscard signatures inside struct BlockDriverState. 
>-- 
>Eric Blake, Principal Software Engineer
>Red Hat, Inc.           +1-919-301-3266
>Virtualization:  qemu.org | libvirt.org
>
Eric Blake June 7, 2017, 10:19 p.m. UTC | #5
On 06/07/2017 05:08 PM, Manos Pitsidianakis wrote:
> On Wed, Jun 07, 2017 at 04:38:43PM -0500, Eric Blake wrote:
>> On 06/07/2017 11:06 AM, Manos Pitsidianakis wrote:
>>> On Wed, Jun 07, 2017 at 10:40:08AM -0500, Eric Blake wrote:
>>>> On 06/07/2017 08:06 AM, Manos Pitsidianakis wrote:
>>>>> This is part of my work for my GSOC project this summer.
>>>>>
>>>>> I am not sure if the count parameter in  bdrv_co_pwrite_zeros and
>>>>> bdrv_co_pdiscard refers to sectors or bytes.
>>>>
>>>> Both refer to byte counts.  (We're trying to get rid of as many
>>>> sector-based interfaces as possible, but it's slow going to make sure
>>>> the conversions are still accurate).
>>>
>>> I see. Would changing the names in the BlockDriverState function
>>> signatures to reflect that be okay?
>>
>> What names need changing?
> 
> I meant, change "int count" to "int bytes" in the bdrv_co_pwrite_zeros
> and bdrv_co_pdiscard signatures inside struct BlockDriverState.

Sure, a patch to change parameter names to something that aids
legibility would be welcome.
Stefan Hajnoczi June 8, 2017, 7:22 a.m. UTC | #6
On Wed, Jun 07, 2017 at 04:06:19PM +0300, Manos Pitsidianakis wrote:
> +#define QEMU_OPT_IOPS_TOTAL "iops-total"
> +#define QEMU_OPT_IOPS_TOTAL_MAX "iops-total-max"
> +#define QEMU_OPT_IOPS_TOTAL_MAX_LENGTH "iops-total-max-length"
> +#define QEMU_OPT_IOPS_READ "iops-read"
> +#define QEMU_OPT_IOPS_READ_MAX "iops-read-max"
> +#define QEMU_OPT_IOPS_READ_MAX_LENGTH "iops-read-max-length"
> +#define QEMU_OPT_IOPS_WRITE "iops-write"
> +#define QEMU_OPT_IOPS_WRITE_MAX "iops-write-max"
> +#define QEMU_OPT_IOPS_WRITE_MAX_LENGTH "iops-write-max-length"
> +#define QEMU_OPT_BPS_TOTAL "bps-total"
> +#define QEMU_OPT_BPS_TOTAL_MAX "bps-total-max"
> +#define QEMU_OPT_BPS_TOTAL_MAX_LENGTH "bps-total-max-length"
> +#define QEMU_OPT_BPS_READ "bps-read"
> +#define QEMU_OPT_BPS_READ_MAX "bps-read-max"
> +#define QEMU_OPT_BPS_READ_MAX_LENGTH "bps-read-max-length"
> +#define QEMU_OPT_BPS_WRITE "bps-write"
> +#define QEMU_OPT_BPS_WRITE_MAX "bps-write-max"
> +#define QEMU_OPT_BPS_WRITE_MAX_LENGTH "bps-write-max-length"
> +#define QEMU_OPT_IOPS_SIZE "iops-size"
> +#define QEMU_OPT_THROTTLE_GROUP_NAME "throttle-group"

Please reuse include/qemu/throttle-options.h.

> +static QemuMutex throttle_groups_lock;
> +static QTAILQ_HEAD(, ThrottleGroup) throttle_groups =
> +    QTAILQ_HEAD_INITIALIZER(throttle_groups);

Please reuse block/throttle-groups.c.  I think throttle-groups.c doesn't
actually need BlockBackend so you can make the code more general.

Define the necessary state:

  typedef struct ThrottleGroupMember {
    /* Protected by AioContext lock.  */
    CoQueue      throttled_reqs[2];

    /* Nonzero if the I/O limits are currently being ignored; generally
     * it is zero.  */
    unsigned int io_limits_disabled;

    /* The following fields are protected by the ThrottleGroup lock.
     * See the ThrottleGroup documentation for details.
     * throttle_state tells us if I/O limits are configured. */
    ThrottleState *throttle_state;
    ThrottleTimers throttle_timers;
    unsigned       pending_reqs[2];
    QLIST_ENTRY(ThrottleGroupMember) round_robin;
  } ThrottleGroupMember;

Then refactor throttle-groups.c to use ThrottleGroupMember instead of
BlockBackend(Public).

> diff --git a/include/block/throttle.h b/include/block/throttle.h
> new file mode 100644
> index 0000000000..fed24f02a6
> --- /dev/null
> +++ b/include/block/throttle.h

What is the purpose of this header?

Header files in include/* are "public" APIs that other parts of QEMU can
use.  There is no user other than block/throttle.c so please keep it
private in the .c file.

> @@ -0,0 +1,59 @@
> +/* The ThrottleGroup structure (with its ThrottleState) is shared
> + * among different BlockDriverStates and it's independent from
> + * AioContext, so in order to use it from different threads it needs
> + * its own locking.
> + *
> + * This locking is however handled internally in block/throttle.c so it's
> + * transparent to outside users.
> + *
> + * The whole ThrottleGroup structure is private and invisible to
> + * outside users, that only use it through its ThrottleState.
> + *
> + * In addition to the ThrottleGroup structure, BlockDriverState has
> + * fields that need to be accessed by other members of the group and
> + * therefore also need to be protected by this lock. Once a
> + * BlockDriverState is registered in a group those fields can be accessed
> + * by other threads any time.
> + *
> + * Again, all this is handled internally in block/throttle.c and is mostly
> + * transparent to the outside. The 'throttle_timers' field however has an
> + * additional constraint because it may be temporarily invalid (see for example
> + * bdrv_set_aio_context()). Therefore block/throttle.c will access some
> + * other BlockDriverState's timers only after verifying that that BDS has
> + * throttled requests in the queue.
> + */
> +
> +typedef struct ThrottleGroup {
> +    char *name; /* This is constant during the lifetime of the group */
> +
> +    QemuMutex lock; /* This lock protects the following four fields */
> +    ThrottleState ts;
> +    QLIST_HEAD(, BDRVThrottleNodeState) head;
> +    struct BDRVThrottleNodeState *tokens[2];
> +    bool any_timer_armed[2];
> +
> +    /* These two are protected by the global throttle_groups_lock */
> +    unsigned refcount;
> +    QTAILQ_ENTRY(ThrottleGroup) list;
> +} ThrottleGroup;
> +
> +typedef struct BDRVThrottleNodeState {
> +    ThrottleGroup *throttle_group;
> +
> +    /* I/O throttling has its own locking, but also some fields are
> +     * protected by the AioContext lock.
> +     */
> +
> +    /* Protected by AioContext lock.  */
> +    CoQueue      throttled_reqs[2];
> +
> +    /* Nonzero if the I/O limits are currently being ignored; generally
> +     * it is zero.  */
> +    unsigned int io_limits_disabled;
> +
> +    ThrottleState *throttle_state;
> +    ThrottleTimers throttle_timers;
> +    unsigned       pending_reqs[2];
> +    QLIST_ENTRY(BDRVThrottleNodeState) round_robin;
> +
> +} BDRVThrottleNodeState;
> -- 
> 2.11.0
>
Manos Pitsidianakis June 8, 2017, 7:41 a.m. UTC | #7
On Thu, Jun 08, 2017 at 08:22:28AM +0100, Stefan Hajnoczi wrote:
>On Wed, Jun 07, 2017 at 04:06:19PM +0300, Manos Pitsidianakis wrote:
>> +#define QEMU_OPT_IOPS_TOTAL "iops-total"
>> +#define QEMU_OPT_IOPS_TOTAL_MAX "iops-total-max"
>> +#define QEMU_OPT_IOPS_TOTAL_MAX_LENGTH "iops-total-max-length"
>> +#define QEMU_OPT_IOPS_READ "iops-read"
>> +#define QEMU_OPT_IOPS_READ_MAX "iops-read-max"
>> +#define QEMU_OPT_IOPS_READ_MAX_LENGTH "iops-read-max-length"
>> +#define QEMU_OPT_IOPS_WRITE "iops-write"
>> +#define QEMU_OPT_IOPS_WRITE_MAX "iops-write-max"
>> +#define QEMU_OPT_IOPS_WRITE_MAX_LENGTH "iops-write-max-length"
>> +#define QEMU_OPT_BPS_TOTAL "bps-total"
>> +#define QEMU_OPT_BPS_TOTAL_MAX "bps-total-max"
>> +#define QEMU_OPT_BPS_TOTAL_MAX_LENGTH "bps-total-max-length"
>> +#define QEMU_OPT_BPS_READ "bps-read"
>> +#define QEMU_OPT_BPS_READ_MAX "bps-read-max"
>> +#define QEMU_OPT_BPS_READ_MAX_LENGTH "bps-read-max-length"
>> +#define QEMU_OPT_BPS_WRITE "bps-write"
>> +#define QEMU_OPT_BPS_WRITE_MAX "bps-write-max"
>> +#define QEMU_OPT_BPS_WRITE_MAX_LENGTH "bps-write-max-length"
>> +#define QEMU_OPT_IOPS_SIZE "iops-size"
>> +#define QEMU_OPT_THROTTLE_GROUP_NAME "throttle-group"
>
>Please reuse include/qemu/throttle-options.h.

This would make the options too verbose, ie -drive 
driver=throttle,file=..,throttling.iops-total=..,throttling. Eventually 
of course throttle-options.h will be replaced.

However, this can't be done at this point anyway, since the 
throttling.[...] options are consumed by the already existing throttling 
code, and are unavailable for the driver.
>
>> +static QemuMutex throttle_groups_lock;
>> +static QTAILQ_HEAD(, ThrottleGroup) throttle_groups =
>> +    QTAILQ_HEAD_INITIALIZER(throttle_groups);
>
>Please reuse block/throttle-groups.c.  I think throttle-groups.c doesn't
>actually need BlockBackend so you can make the code more general.
>
>Define the necessary state:
>
>  typedef struct ThrottleGroupMember {
>    /* Protected by AioContext lock.  */
>    CoQueue      throttled_reqs[2];
>
>    /* Nonzero if the I/O limits are currently being ignored; generally
>     * it is zero.  */
>    unsigned int io_limits_disabled;
>
>    /* The following fields are protected by the ThrottleGroup lock.
>     * See the ThrottleGroup documentation for details.
>     * throttle_state tells us if I/O limits are configured. */
>    ThrottleState *throttle_state;
>    ThrottleTimers throttle_timers;
>    unsigned       pending_reqs[2];
>    QLIST_ENTRY(ThrottleGroupMember) round_robin;
>  } ThrottleGroupMember;
>
>Then refactor throttle-groups.c to use ThrottleGroupMember instead of
>BlockBackend(Public).

I've already done this, it was just in a different branch.

>
>> diff --git a/include/block/throttle.h b/include/block/throttle.h
>> new file mode 100644
>> index 0000000000..fed24f02a6
>> --- /dev/null
>> +++ b/include/block/throttle.h
>
>What is the purpose of this header?
>
>Header files in include/* are "public" APIs that other parts of QEMU can
>use.  There is no user other than block/throttle.c so please keep it
>private in the .c file.

Eventually the qmp/hmp/qom related stuff will be in a header file, and 
this is why I put them from the beginning separately. But yes, in this 
case it is not needed. Also, these probably should go to 
include/qemu/throttle.h instead of a separate header file.

>
>> @@ -0,0 +1,59 @@
>> +/* The ThrottleGroup structure (with its ThrottleState) is shared
>> + * among different BlockDriverStates and it's independent from
>> + * AioContext, so in order to use it from different threads it needs
>> + * its own locking.
>> + *
>> + * This locking is however handled internally in block/throttle.c so it's
>> + * transparent to outside users.
>> + *
>> + * The whole ThrottleGroup structure is private and invisible to
>> + * outside users, that only use it through its ThrottleState.
>> + *
>> + * In addition to the ThrottleGroup structure, BlockDriverState has
>> + * fields that need to be accessed by other members of the group and
>> + * therefore also need to be protected by this lock. Once a
>> + * BlockDriverState is registered in a group those fields can be accessed
>> + * by other threads any time.
>> + *
>> + * Again, all this is handled internally in block/throttle.c and is mostly
>> + * transparent to the outside. The 'throttle_timers' field however has an
>> + * additional constraint because it may be temporarily invalid (see for example
>> + * bdrv_set_aio_context()). Therefore block/throttle.c will access some
>> + * other BlockDriverState's timers only after verifying that that BDS has
>> + * throttled requests in the queue.
>> + */
>> +
>> +typedef struct ThrottleGroup {
>> +    char *name; /* This is constant during the lifetime of the group */
>> +
>> +    QemuMutex lock; /* This lock protects the following four fields */
>> +    ThrottleState ts;
>> +    QLIST_HEAD(, BDRVThrottleNodeState) head;
>> +    struct BDRVThrottleNodeState *tokens[2];
>> +    bool any_timer_armed[2];
>> +
>> +    /* These two are protected by the global throttle_groups_lock */
>> +    unsigned refcount;
>> +    QTAILQ_ENTRY(ThrottleGroup) list;
>> +} ThrottleGroup;
>> +
>> +typedef struct BDRVThrottleNodeState {
>> +    ThrottleGroup *throttle_group;
>> +
>> +    /* I/O throttling has its own locking, but also some fields are
>> +     * protected by the AioContext lock.
>> +     */
>> +
>> +    /* Protected by AioContext lock.  */
>> +    CoQueue      throttled_reqs[2];
>> +
>> +    /* Nonzero if the I/O limits are currently being ignored; generally
>> +     * it is zero.  */
>> +    unsigned int io_limits_disabled;
>> +
>> +    ThrottleState *throttle_state;
>> +    ThrottleTimers throttle_timers;
>> +    unsigned       pending_reqs[2];
>> +    QLIST_ENTRY(BDRVThrottleNodeState) round_robin;
>> +
>> +} BDRVThrottleNodeState;
>> --
>> 2.11.0
>>
Stefan Hajnoczi June 8, 2017, 10:32 a.m. UTC | #8
On Thu, Jun 08, 2017 at 10:41:49AM +0300, Manos Pitsidianakis wrote:
> On Thu, Jun 08, 2017 at 08:22:28AM +0100, Stefan Hajnoczi wrote:
> > On Wed, Jun 07, 2017 at 04:06:19PM +0300, Manos Pitsidianakis wrote:
> > > +#define QEMU_OPT_IOPS_TOTAL "iops-total"
> > > +#define QEMU_OPT_IOPS_TOTAL_MAX "iops-total-max"
> > > +#define QEMU_OPT_IOPS_TOTAL_MAX_LENGTH "iops-total-max-length"
> > > +#define QEMU_OPT_IOPS_READ "iops-read"
> > > +#define QEMU_OPT_IOPS_READ_MAX "iops-read-max"
> > > +#define QEMU_OPT_IOPS_READ_MAX_LENGTH "iops-read-max-length"
> > > +#define QEMU_OPT_IOPS_WRITE "iops-write"
> > > +#define QEMU_OPT_IOPS_WRITE_MAX "iops-write-max"
> > > +#define QEMU_OPT_IOPS_WRITE_MAX_LENGTH "iops-write-max-length"
> > > +#define QEMU_OPT_BPS_TOTAL "bps-total"
> > > +#define QEMU_OPT_BPS_TOTAL_MAX "bps-total-max"
> > > +#define QEMU_OPT_BPS_TOTAL_MAX_LENGTH "bps-total-max-length"
> > > +#define QEMU_OPT_BPS_READ "bps-read"
> > > +#define QEMU_OPT_BPS_READ_MAX "bps-read-max"
> > > +#define QEMU_OPT_BPS_READ_MAX_LENGTH "bps-read-max-length"
> > > +#define QEMU_OPT_BPS_WRITE "bps-write"
> > > +#define QEMU_OPT_BPS_WRITE_MAX "bps-write-max"
> > > +#define QEMU_OPT_BPS_WRITE_MAX_LENGTH "bps-write-max-length"
> > > +#define QEMU_OPT_IOPS_SIZE "iops-size"
> > > +#define QEMU_OPT_THROTTLE_GROUP_NAME "throttle-group"
> > 
> > Please reuse include/qemu/throttle-options.h.
> 
> This would make the options too verbose, ie -drive
> driver=throttle,file=..,throttling.iops-total=..,throttling. Eventually of
> course throttle-options.h will be replaced.
> 
> However, this can't be done at this point anyway, since the throttling.[...]
> options are consumed by the already existing throttling code, and are
> unavailable for the driver.

There are ways of reusing include/qemu/throttle-options.h, I'm just not
sure if they are worth it.

In block/block-backend.c:

  #define THROTTLE_OPTION_PREFIX "throttling."
  #include "qemu/throttle-options.h"

In block/throttle.c:

  #include "qemu/throttle-options.h"

In include/qemu/throttle-options.h:

  #define THROTTLE_OPTION_IOPS_TOTAL (THROTTLE_OPTION_PREFIX "iops-total")

  #define THROTTLE_OPTS \
        { \
            .name = THROTTLE_OPTION_IOPS_TOTAL,\
            .type = QEMU_OPT_NUMBER,\
            .help = "limit total I/O operations per second",\
        },{ \

to generate either "throttling.iops-total" or "iops-total", depending on
the environment in which the file was included.

This reuses the same .h file but it's also a little ugly because of the
C macro (ab)use.

> > > diff --git a/include/block/throttle.h b/include/block/throttle.h
> > > new file mode 100644
> > > index 0000000000..fed24f02a6
> > > --- /dev/null
> > > +++ b/include/block/throttle.h
> > 
> > What is the purpose of this header?
> > 
> > Header files in include/* are "public" APIs that other parts of QEMU can
> > use.  There is no user other than block/throttle.c so please keep it
> > private in the .c file.
> 
> Eventually the qmp/hmp/qom related stuff will be in a header file, and this
> is why I put them from the beginning separately. But yes, in this case it is
> not needed. Also, these probably should go to include/qemu/throttle.h
> instead of a separate header file.

Patches must be self-contained because reviewers don't necessarily know
future plans and if those future patches are never merged then the
source tree is in an incomplete state.

There are rare cases where it's really necessary for a patch to add
something that is unused.  In that case the intention should be clear
from the patch or commit description (e.g. "let's change this now so
the next patch can take advantage of it").
diff mbox

Patch

diff --git a/block/Makefile.objs b/block/Makefile.objs
index ea955302c8..bb811a4d01 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -25,6 +25,7 @@  block-obj-y += accounting.o dirty-bitmap.o
 block-obj-y += write-threshold.o
 block-obj-y += backup.o
 block-obj-$(CONFIG_REPLICATION) += replication.o
+block-obj-y += throttle.o
 
 block-obj-y += crypto.o
 
diff --git a/block/throttle.c b/block/throttle.c
new file mode 100644
index 0000000000..e4536a09ed
--- /dev/null
+++ b/block/throttle.c
@@ -0,0 +1,747 @@ 
+/*
+ * QEMU block throttling filter driver infrastructure
+ *
+ * Copyright (C) Nodalink, EURL. 2014
+ * Copyright (C) Igalia, S.L. 2015
+ *
+ * Authors:
+ *   BenoƮt Canet <benoit.canet@nodalink.com>
+ *   Alberto Garcia <berto@igalia.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation; either version 2 or
+ * (at your option) version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "qemu/osdep.h"
+#include "sysemu/block-backend.h"
+#include "block/throttle-groups.h"
+#include "qemu/queue.h"
+#include "qemu/thread.h"
+#include "sysemu/qtest.h"
+#include "monitor/monitor.h"
+#include "qapi/error.h"
+#include "block/throttle.h"
+#include "qapi/qmp/qdict.h"
+
+#define QEMU_OPT_IOPS_TOTAL "iops-total"
+#define QEMU_OPT_IOPS_TOTAL_MAX "iops-total-max"
+#define QEMU_OPT_IOPS_TOTAL_MAX_LENGTH "iops-total-max-length"
+#define QEMU_OPT_IOPS_READ "iops-read"
+#define QEMU_OPT_IOPS_READ_MAX "iops-read-max"
+#define QEMU_OPT_IOPS_READ_MAX_LENGTH "iops-read-max-length"
+#define QEMU_OPT_IOPS_WRITE "iops-write"
+#define QEMU_OPT_IOPS_WRITE_MAX "iops-write-max"
+#define QEMU_OPT_IOPS_WRITE_MAX_LENGTH "iops-write-max-length"
+#define QEMU_OPT_BPS_TOTAL "bps-total"
+#define QEMU_OPT_BPS_TOTAL_MAX "bps-total-max"
+#define QEMU_OPT_BPS_TOTAL_MAX_LENGTH "bps-total-max-length"
+#define QEMU_OPT_BPS_READ "bps-read"
+#define QEMU_OPT_BPS_READ_MAX "bps-read-max"
+#define QEMU_OPT_BPS_READ_MAX_LENGTH "bps-read-max-length"
+#define QEMU_OPT_BPS_WRITE "bps-write"
+#define QEMU_OPT_BPS_WRITE_MAX "bps-write-max"
+#define QEMU_OPT_BPS_WRITE_MAX_LENGTH "bps-write-max-length"
+#define QEMU_OPT_IOPS_SIZE "iops-size"
+#define QEMU_OPT_THROTTLE_GROUP_NAME "throttle-group"
+
+
+static QemuMutex throttle_groups_lock;
+static QTAILQ_HEAD(, ThrottleGroup) throttle_groups =
+    QTAILQ_HEAD_INITIALIZER(throttle_groups);
+
+static QemuOptsList throttle_opts = {
+    .name = "throttle",
+    .head = QTAILQ_HEAD_INITIALIZER(throttle_opts.head),
+    .desc = {
+        {
+            .name = QEMU_OPT_IOPS_TOTAL,
+            .type = QEMU_OPT_NUMBER,
+            .help = "throttling.iops-total",
+        },
+        {
+            .name = QEMU_OPT_IOPS_TOTAL_MAX,
+            .type = QEMU_OPT_NUMBER,
+            .help = "throttling.iops-total-max",
+        },
+        {
+            .name = QEMU_OPT_IOPS_TOTAL_MAX_LENGTH,
+            .type = QEMU_OPT_NUMBER,
+            .help = "throttling.iops-total-max-length",
+        },
+        {
+            .name = QEMU_OPT_IOPS_READ,
+            .type = QEMU_OPT_NUMBER,
+            .help = "throttling.iops-read",
+        },
+        {
+            .name = QEMU_OPT_IOPS_READ_MAX,
+            .type = QEMU_OPT_NUMBER,
+            .help = "throttling.iops-read-max",
+        },
+        {
+            .name = QEMU_OPT_IOPS_READ_MAX_LENGTH,
+            .type = QEMU_OPT_NUMBER,
+            .help = "throttling.iops-read-max-length",
+        },
+        {
+            .name = QEMU_OPT_IOPS_WRITE,
+            .type = QEMU_OPT_NUMBER,
+            .help = "throttling.iops-write",
+        },
+        {
+            .name = QEMU_OPT_IOPS_WRITE_MAX,
+            .type = QEMU_OPT_NUMBER,
+            .help = "throttling.iops-write-max",
+        },
+        {
+            .name = QEMU_OPT_IOPS_WRITE_MAX_LENGTH,
+            .type = QEMU_OPT_NUMBER,
+            .help = "throttling.iops-write-max-length",
+        },
+        {
+            .name = QEMU_OPT_BPS_TOTAL,
+            .type = QEMU_OPT_NUMBER,
+            .help = "trottling.bps-total",
+        },
+        {
+            .name = QEMU_OPT_BPS_TOTAL_MAX,
+            .type = QEMU_OPT_NUMBER,
+            .help = "trottling.bps-total-max",
+        },
+        {
+            .name = QEMU_OPT_BPS_TOTAL_MAX_LENGTH,
+            .type = QEMU_OPT_NUMBER,
+            .help = "trottling.bps-total-max-length",
+        },
+        {
+            .name = QEMU_OPT_BPS_READ,
+            .type = QEMU_OPT_NUMBER,
+            .help = "throttling.bps-read",
+        },
+        {
+            .name = QEMU_OPT_BPS_READ_MAX,
+            .type = QEMU_OPT_NUMBER,
+            .help = "throttling.bps-read-max",
+        },
+        {
+            .name = QEMU_OPT_BPS_READ_MAX_LENGTH,
+            .type = QEMU_OPT_NUMBER,
+            .help = "throttling.bps-read-max-length",
+        },
+        {
+            .name = QEMU_OPT_BPS_WRITE,
+            .type = QEMU_OPT_NUMBER,
+            .help = "throttling.bps-write",
+        },
+        {
+            .name = QEMU_OPT_BPS_WRITE_MAX,
+            .type = QEMU_OPT_NUMBER,
+            .help = "throttling.bps-write-max",
+        },
+        {
+            .name = QEMU_OPT_BPS_WRITE_MAX_LENGTH,
+            .type = QEMU_OPT_NUMBER,
+            .help = "throttling.bps-write-max-length",
+        },
+        {
+            .name = QEMU_OPT_IOPS_SIZE,
+            .type = QEMU_OPT_NUMBER,
+            .help = "throttling.iops-size",
+        },
+        {
+            .name = QEMU_OPT_THROTTLE_GROUP_NAME,
+            .type = QEMU_OPT_STRING,
+            .help = "Throttle group name",
+        },
+            { /* end of list */ }
+        },
+};
+
+/* Return the next BDRVThrottleNodeState in the round-robin sequence,
+ * simulating a circular list.
+ *
+ * This assumes that tg->lock is held.
+ *
+ * @s: the current BlockDriverstate's BDRVThrottleNodeState
+ * @ret: the next BDRVThrottleNodeState in the sequence
+ */
+static BDRVThrottleNodeState *throttle_group_next_bds(BDRVThrottleNodeState *s)
+{
+    ThrottleGroup *tg = s->throttle_group;
+    BDRVThrottleNodeState *next = QLIST_NEXT(s, round_robin);
+
+    if (!next) {
+        next = QLIST_FIRST(&tg->head);
+    }
+
+    return next;
+}
+
+/*
+ * Return whether a BlockDriverState has pending requests.
+ *
+ * This assumes that tg->lock is held.
+ *
+ * @s: the BlockDriverState's BDRVThrottleNodeState
+ * @is_write:  the type of operation (read/write)
+ * @ret:       whether the BDRVThrottleNodeState has pending requests.
+ */
+static inline bool bds_has_pending_reqs(BDRVThrottleNodeState *s,
+                                        bool is_write)
+{
+    return s->pending_reqs[is_write];
+}
+
+/* Return the next BlockDriverState in the round-robin sequence with pending
+ * I/O requests.
+ *
+ * This assumes that tg->lock is held.
+ *
+ * @bs:        the current BlockDriverState's BDRVThrottleNodeState
+ * @is_write:  the type of operation (read/write)
+ * @ret:       the next BDRVThrottleNodeState with pending requests, or bs if
+ * there is none.
+ */
+static BDRVThrottleNodeState *next_throttle_token(BDRVThrottleNodeState *s,
+                                                    bool is_write)
+{
+    ThrottleGroup *tg = s->throttle_group;
+    BDRVThrottleNodeState *token, *start;
+
+    start = token = tg->tokens[is_write];
+
+    /* get next bs round in round robin style */
+    token = throttle_group_next_bds(token);
+    while (token != start && !bds_has_pending_reqs(token, is_write)) {
+        token = throttle_group_next_bds(token);
+    }
+
+    /* If no IO are queued for scheduling on the next round robin token
+     * then decide the token is the current bs because chances are
+     * the current bs get the current request queued.
+     */
+    if (token == start && !bds_has_pending_reqs(token, is_write)) {
+        token = s;
+    }
+
+    /* Either we return the original BB, or one with pending requests */
+    assert(token == s || bds_has_pending_reqs(token, is_write));
+
+    return token;
+}
+
+/* Increments the reference count of a ThrottleGroup given its name.
+ *
+ * If no ThrottleGroup is found with the given name a new one is
+ * created.
+ *
+ * @name: the name of the ThrottleGroup
+ * @ret:  the ThrottleState member of the ThrottleGroup
+ */
+static ThrottleState *bdrv_throttle_group_incref(const char *name)
+{
+    ThrottleGroup *tg = NULL;
+    ThrottleGroup *iter;
+
+    qemu_mutex_lock(&throttle_groups_lock);
+
+    /* Look for an existing group with that name */
+    QTAILQ_FOREACH(iter, &throttle_groups, list) {
+        if (!strcmp(name, iter->name)) {
+            tg = iter;
+            break;
+        }
+    }
+
+    /* Create a new one if not found */
+    if (!tg) {
+        tg = g_new0(ThrottleGroup, 1);
+        tg->name = g_strdup(name);
+        qemu_mutex_init(&tg->lock);
+        throttle_init(&tg->ts);
+        QLIST_INIT(&tg->head);
+
+        QTAILQ_INSERT_TAIL(&throttle_groups, tg, list);
+    }
+
+    tg->refcount++;
+
+    qemu_mutex_unlock(&throttle_groups_lock);
+
+    return &tg->ts;
+}
+
+/* Decrease the reference count of a ThrottleGroup.
+ *
+ * When the reference count reaches zero the ThrottleGroup is
+ * destroyed.
+ *
+ * @ts:  The ThrottleGroup to unref, given by its ThrottleState member
+ */
+static void bdrv_throttle_group_unref(ThrottleState *ts)
+{
+    ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
+
+    qemu_mutex_lock(&throttle_groups_lock);
+    if (--tg->refcount == 0) {
+        QTAILQ_REMOVE(&throttle_groups, tg, list);
+        qemu_mutex_destroy(&tg->lock);
+        g_free(tg->name);
+        g_free(tg);
+    }
+    qemu_mutex_unlock(&throttle_groups_lock);
+}
+
+
+/* Check if the next I/O request for a BDRVThrottleNodeState needs to be
+ * throttled or not. If there's no timer set in this group, set one and
+ * update the token accordingly.
+ *
+ * This assumes that tg->lock is held.
+ *
+ * @s:        the current BDRVThrottleNodeState
+ * @is_write:   the type of operation (read/write)
+ * @ret:        whether the I/O request needs to be throttled or not
+ */
+static bool throttle_group_schedule_timer(BDRVThrottleNodeState *s,
+                                                    bool is_write)
+{
+    ThrottleGroup *tg = s->throttle_group;
+    ThrottleTimers *tt = &s->throttle_timers;
+    ThrottleState *ts = &tg->ts;
+    bool must_wait;
+
+    if (s->io_limits_disabled) {
+        return false;
+    }
+
+    /* Check if any of the timers in this group is already armed */
+    if (tg->any_timer_armed[is_write]) {
+        return true;
+    }
+
+    must_wait = throttle_schedule_timer(ts, tt, is_write);
+
+    /* If a timer just got armed, set s as the current token */
+    if (must_wait) {
+        tg->tokens[is_write] = s;
+        tg->any_timer_armed[is_write] = true;
+    }
+
+    return must_wait;
+}
+
+/* Look for the next pending I/O request and schedule it.
+ *
+ * This assumes that tg->lock is held.
+ *
+ * @bs:       the current BlockDriverState
+ * @is_write:  the type of operation (read/write)
+ */
+static void schedule_next_request(BlockDriverState *bs, bool is_write)
+{
+    BDRVThrottleNodeState *s = bs->opaque, *token = NULL;
+    ThrottleGroup *tg = s->throttle_group;
+    bool must_wait;
+
+    /* Check if there's any pending request to schedule next */
+    token = next_throttle_token(s, is_write);
+    if (!bds_has_pending_reqs(token, is_write)) {
+        return;
+    }
+
+    /* Set a timer for the request if it needs to be throttled */
+    must_wait = throttle_group_schedule_timer(token, is_write);
+
+    /* If it doesn't have to wait, queue it for immediate execution */
+    if (!must_wait) {
+        /* Give preference to requests from the current BDS */
+        if (qemu_in_coroutine() &&
+            qemu_co_queue_next(&s->throttled_reqs[is_write])) {
+            token = s;
+        } else {
+            ThrottleTimers *tt = &token->throttle_timers;
+            int64_t now = qemu_clock_get_ns(tt->clock_type);
+            timer_mod(tt->timers[is_write], now + 1);
+            tg->any_timer_armed[is_write] = true;
+        }
+        tg->tokens[is_write] = token;
+    }
+}
+
+/* ThrottleTimers callback. This wakes up a request that was waiting
+ * because it had been throttled.
+ *
+ * @bs:       the BlockDriverState whose request had been throttled
+ * @is_write:  the type of operation (read/write)
+ */
+static void timer_cb(BlockDriverState *bs, bool is_write)
+{
+    BDRVThrottleNodeState *s = bs->opaque;
+    ThrottleGroup *tg = s->throttle_group;
+    bool empty_queue;
+
+    /* The timer has just been fired, so we can update the flag */
+    qemu_mutex_lock(&tg->lock);
+    tg->any_timer_armed[is_write] = false;
+    qemu_mutex_unlock(&tg->lock);
+
+    /* Run the request that was waiting for this timer */
+    aio_context_acquire(bdrv_get_aio_context(bs));
+    empty_queue = !qemu_co_enter_next(&s->throttled_reqs[is_write]);
+    aio_context_release(bdrv_get_aio_context(bs));
+
+    /* If the request queue was empty then we have to take care of
+     * scheduling the next one */
+    if (empty_queue) {
+        qemu_mutex_lock(&tg->lock);
+        schedule_next_request(bs, is_write);
+        qemu_mutex_unlock(&tg->lock);
+    }
+}
+
+static void read_timer_cb(void *opaque)
+{
+    timer_cb(opaque, false);
+}
+
+static void write_timer_cb(void *opaque)
+{
+    timer_cb(opaque, true);
+}
+
+/* Unregister a BlockDriverState from its group, removing it from the list,
+ * destroying the timers and setting the throttle_state pointer to NULL.
+ *
+ * The BlockDriverState must not have pending throttled requests, so the caller
+ * has to drain them first.
+ *
+ * The group will be destroyed if it's empty after this operation.
+ *
+ * @bs: the BlockDriverState to remove
+ */
+static void throttle_node_unregister_bs(BlockDriverState *bs)
+{
+    BDRVThrottleNodeState *s = bs->opaque;
+    ThrottleGroup *tg = s->throttle_group;
+    int i;
+
+    assert(s->pending_reqs[0] == 0 && s->pending_reqs[1] == 0);
+    assert(qemu_co_queue_empty(&s->throttled_reqs[0]));
+    assert(qemu_co_queue_empty(&s->throttled_reqs[1]));
+
+    qemu_mutex_lock(&tg->lock);
+    for (i = 0; i < 2; i++) {
+        if (tg->tokens[i] == s) {
+            BDRVThrottleNodeState *token = throttle_group_next_bds(s);
+            /* Take care of the case where this is the last BlockDriverState in
+             * the group */
+            if (token == s) {
+                token = NULL;
+            }
+            tg->tokens[i] = token;
+        }
+    }
+
+    /* remove the current BDS from the list */
+    QLIST_REMOVE(s, round_robin);
+    throttle_timers_destroy(&s->throttle_timers);
+    qemu_mutex_unlock(&tg->lock);
+
+    bdrv_throttle_group_unref(&tg->ts);
+    s->throttle_state = NULL;
+}
+
+
+/* Register a BlockDriverState in the throttling group, also initializing its
+ * timers and updating its throttle_state pointer to point to it. If a
+ * throttling group with that name does not exist yet, it will be created.
+ *
+ * @bs:       the BlockDriverState to insert
+ * @groupname: the name of the group
+ */
+static void throttle_node_register_bs(BlockDriverState *bs,
+                                      const char *groupname)
+{
+    int i;
+    BDRVThrottleNodeState *s = bs->opaque;
+    ThrottleState *ts = bdrv_throttle_group_incref(groupname);
+    ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
+    int clock_type = QEMU_CLOCK_REALTIME;
+
+    if (qtest_enabled()) {
+        /* For testing block IO throttling only */
+        clock_type = QEMU_CLOCK_VIRTUAL;
+    }
+
+    s->throttle_state = ts;
+    s->throttle_group = tg;
+
+    qemu_mutex_lock(&tg->lock);
+    /* If the ThrottleGroup is new set this BlockDriverState as the token */
+    for (i = 0; i < 2; i++) {
+        if (!tg->tokens[i]) {
+            tg->tokens[i] = s;
+        }
+    }
+
+    QLIST_INSERT_HEAD(&tg->head, s, round_robin);
+    throttle_timers_init(&s->throttle_timers,
+                         bdrv_get_aio_context(bs),
+                         clock_type,
+                         read_timer_cb,
+                         write_timer_cb,
+                         bs);
+    qemu_mutex_unlock(&tg->lock);
+}
+
+static int throttle_open(BlockDriverState *bs, QDict *options,
+                            int flags, Error **errp)
+{
+    int ret = 0;
+    BDRVThrottleNodeState *s = bs->opaque;
+    Error *local_err = NULL;
+    QemuOpts *opts = NULL;
+    const char *groupname = NULL;
+
+    bs->file = bdrv_open_child(NULL, options, "file",
+                                           bs, &child_file, false, &local_err);
+
+    if (local_err) {
+        ret = -EINVAL;
+        error_propagate(errp, local_err);
+        return ret;
+    }
+
+    qdict_flatten(options);
+    opts = qemu_opts_create(&throttle_opts, NULL, 0, &error_abort);
+    qemu_opts_absorb_qdict(opts, options, &local_err);
+    if (local_err) {
+        error_propagate(errp, local_err);
+        ret = -EINVAL;
+        goto open_ret;
+    }
+
+    groupname = qemu_opt_get(opts, QEMU_OPT_THROTTLE_GROUP_NAME);
+    if (!groupname) {
+        groupname = bdrv_get_device_or_node_name(bs);
+        if (!groupname) {
+            error_setg(&local_err,
+                       "A group name must be specified for this device.");
+            error_propagate(errp, local_err);
+            ret = -EINVAL;
+            goto open_ret;
+        }
+    }
+
+    throttle_node_register_bs(bs, groupname);
+    ThrottleConfig *throttle_cfg = &s->throttle_state->cfg;
+
+    throttle_config_init(throttle_cfg);
+    throttle_cfg->buckets[THROTTLE_BPS_TOTAL].avg =
+        qemu_opt_get_number(opts, QEMU_OPT_BPS_TOTAL, 0);
+    throttle_cfg->buckets[THROTTLE_BPS_READ].avg  =
+        qemu_opt_get_number(opts, QEMU_OPT_BPS_READ, 0);
+    throttle_cfg->buckets[THROTTLE_BPS_WRITE].avg =
+        qemu_opt_get_number(opts, QEMU_OPT_BPS_WRITE, 0);
+    throttle_cfg->buckets[THROTTLE_OPS_TOTAL].avg =
+        qemu_opt_get_number(opts, QEMU_OPT_IOPS_TOTAL, 0);
+    throttle_cfg->buckets[THROTTLE_OPS_READ].avg =
+        qemu_opt_get_number(opts, QEMU_OPT_IOPS_READ, 0);
+    throttle_cfg->buckets[THROTTLE_OPS_WRITE].avg =
+        qemu_opt_get_number(opts, QEMU_OPT_IOPS_WRITE, 0);
+
+    throttle_cfg->buckets[THROTTLE_BPS_TOTAL].max =
+        qemu_opt_get_number(opts, QEMU_OPT_BPS_TOTAL_MAX, 0);
+    throttle_cfg->buckets[THROTTLE_BPS_READ].max  =
+        qemu_opt_get_number(opts, QEMU_OPT_BPS_READ_MAX, 0);
+    throttle_cfg->buckets[THROTTLE_BPS_WRITE].max =
+        qemu_opt_get_number(opts, QEMU_OPT_BPS_WRITE_MAX, 0);
+    throttle_cfg->buckets[THROTTLE_OPS_TOTAL].max =
+        qemu_opt_get_number(opts, QEMU_OPT_IOPS_TOTAL_MAX, 0);
+    throttle_cfg->buckets[THROTTLE_OPS_READ].max =
+        qemu_opt_get_number(opts, QEMU_OPT_IOPS_READ_MAX, 0);
+    throttle_cfg->buckets[THROTTLE_OPS_WRITE].max =
+        qemu_opt_get_number(opts, QEMU_OPT_IOPS_WRITE_MAX, 0);
+
+    throttle_cfg->buckets[THROTTLE_BPS_TOTAL].burst_length =
+        qemu_opt_get_number(opts, QEMU_OPT_BPS_TOTAL_MAX_LENGTH, 1);
+    throttle_cfg->buckets[THROTTLE_BPS_READ].burst_length  =
+        qemu_opt_get_number(opts, QEMU_OPT_BPS_READ_MAX_LENGTH, 1);
+    throttle_cfg->buckets[THROTTLE_BPS_WRITE].burst_length =
+        qemu_opt_get_number(opts, QEMU_OPT_BPS_WRITE_MAX_LENGTH, 1);
+    throttle_cfg->buckets[THROTTLE_OPS_TOTAL].burst_length =
+        qemu_opt_get_number(opts, QEMU_OPT_IOPS_TOTAL_MAX_LENGTH, 1);
+    throttle_cfg->buckets[THROTTLE_OPS_READ].burst_length =
+        qemu_opt_get_number(opts, QEMU_OPT_IOPS_READ_MAX_LENGTH, 1);
+    throttle_cfg->buckets[THROTTLE_OPS_WRITE].burst_length =
+        qemu_opt_get_number(opts, QEMU_OPT_IOPS_WRITE_MAX_LENGTH, 1);
+
+    throttle_cfg->op_size =
+        qemu_opt_get_number(opts, QEMU_OPT_IOPS_SIZE, 0);
+
+    if (!throttle_is_valid(throttle_cfg, &local_err)) {
+        error_propagate(errp, local_err);
+        throttle_node_unregister_bs(bs);
+        ret = -EINVAL;
+        goto open_ret;
+    }
+
+    qemu_co_queue_init(&s->throttled_reqs[0]);
+    qemu_co_queue_init(&s->throttled_reqs[1]);
+
+    throttle_timers_attach_aio_context(&s->throttle_timers,
+                                       bdrv_get_aio_context(bs));
+
+open_ret:
+    qemu_opts_del(opts);
+    return ret;
+}
+
+static void throttle_close(BlockDriverState *bs)
+{
+    throttle_node_unregister_bs(bs);
+    return;
+}
+
+
+static int64_t throttle_getlength(BlockDriverState *bs)
+{
+    return bdrv_getlength(bs->file->bs);;
+}
+
+/* Check if an I/O request needs to be throttled, wait and set a timer
+ * if necessary, and schedule the next request using a round robin
+ * algorithm.
+ *
+ * @bs:       the current BlockDriverState
+ * @bytes:     the number of bytes for this I/O
+ * @is_write:  the type of operation (read/write)
+ */
+static void coroutine_fn throttle_co_io_limits_intercept(BlockDriverState *bs,
+                                                    unsigned int bytes,
+                                                    bool is_write)
+{
+    bool must_wait;
+    BDRVThrottleNodeState *s = bs->opaque, *token;
+    ThrottleGroup *tg = s->throttle_group;
+    qemu_mutex_lock(&tg->lock);
+
+    /* First we check if this I/O has to be throttled. */
+    token = next_throttle_token(s, is_write);
+    must_wait = throttle_group_schedule_timer(token, is_write);
+
+    /* Wait if there's a timer set or queued requests of this type */
+    if (must_wait || s->pending_reqs[is_write]) {
+        s->pending_reqs[is_write]++;
+        qemu_mutex_unlock(&tg->lock);
+        qemu_co_queue_wait(&s->throttled_reqs[is_write], NULL);
+        qemu_mutex_lock(&tg->lock);
+        s->pending_reqs[is_write]--;
+    }
+
+    /* The I/O will be executed, so do the accounting */
+    throttle_account(s->throttle_state, is_write, bytes);
+
+    /* Schedule the next request */
+    schedule_next_request(bs, is_write);
+
+    qemu_mutex_unlock(&tg->lock);
+}
+
+static int coroutine_fn throttle_co_preadv(BlockDriverState *bs, uint64_t offset,
+                                            uint64_t bytes, QEMUIOVector *qiov,
+                                            int flags)
+{
+
+    throttle_co_io_limits_intercept(bs, bytes, false);
+
+    return bdrv_co_preadv(bs->file, offset, bytes, qiov, flags);
+}
+
+static int coroutine_fn throttle_co_pwritev(BlockDriverState *bs, uint64_t offset,
+                                            uint64_t bytes, QEMUIOVector *qiov,
+                                            int flags)
+{
+    throttle_co_io_limits_intercept(bs, bytes, true);
+
+    return bdrv_co_preadv(bs->file, offset, bytes, qiov, flags);
+}
+
+static int coroutine_fn throttle_co_pwrite_zeroes(BlockDriverState *bs,
+        int64_t offset, int count, BdrvRequestFlags flags)
+{
+    throttle_co_io_limits_intercept(bs, count, true);
+
+    return bdrv_co_pwrite_zeroes(bs->file, offset, count, flags);
+}
+
+static int coroutine_fn throttle_co_pdiscard(BlockDriverState *bs,
+        int64_t offset, int count)
+{
+    throttle_co_io_limits_intercept(bs, count, true);
+
+    return bdrv_co_pdiscard(bs->file->bs, offset, count);
+}
+
+static int throttle_co_flush(BlockDriverState *bs)
+{
+    return bdrv_co_flush(bs->file->bs);
+}
+
+static void throttle_detach_aio_context(BlockDriverState *bs)
+{
+    BDRVThrottleNodeState *s = bs->opaque;
+    throttle_timers_detach_aio_context(&s->throttle_timers);
+}
+
+static void throttle_attach_aio_context(BlockDriverState *bs,
+                                    AioContext *new_context)
+{
+    BDRVThrottleNodeState *s = bs->opaque;
+    throttle_timers_attach_aio_context(&s->throttle_timers, new_context);
+}
+static BlockDriver bdrv_throttle = {
+    .format_name                        =   "throttle",
+    .protocol_name                      =   "throttle",
+    .instance_size                      =   sizeof(BDRVThrottleNodeState),
+
+    .bdrv_file_open                     =   throttle_open,
+    .bdrv_close                         =   throttle_close,
+    .bdrv_co_flush                      =   throttle_co_flush,
+
+    .bdrv_child_perm                    =   bdrv_filter_default_perms,
+
+    .bdrv_getlength                     =   throttle_getlength,
+
+    .bdrv_co_preadv                     =   throttle_co_preadv,
+    .bdrv_co_pwritev                    =   throttle_co_pwritev,
+
+    .bdrv_co_pwrite_zeroes              =   throttle_co_pwrite_zeroes,
+    .bdrv_co_pdiscard                   =   throttle_co_pdiscard,
+
+    .bdrv_recurse_is_first_non_filter   =   bdrv_recurse_is_first_non_filter,
+
+    .bdrv_attach_aio_context            =   throttle_attach_aio_context,
+    .bdrv_detach_aio_context            =   throttle_detach_aio_context,
+
+    .is_filter                          = true,
+};
+
+static void bdrv_throttle_init(void)
+{
+    qemu_mutex_init(&throttle_groups_lock);
+    bdrv_register(&bdrv_throttle);
+}
+
+block_init(bdrv_throttle_init);
diff --git a/include/block/throttle.h b/include/block/throttle.h
new file mode 100644
index 0000000000..fed24f02a6
--- /dev/null
+++ b/include/block/throttle.h
@@ -0,0 +1,59 @@ 
+/* The ThrottleGroup structure (with its ThrottleState) is shared
+ * among different BlockDriverStates and it's independent from
+ * AioContext, so in order to use it from different threads it needs
+ * its own locking.
+ *
+ * This locking is however handled internally in block/throttle.c so it's
+ * transparent to outside users.
+ *
+ * The whole ThrottleGroup structure is private and invisible to
+ * outside users, that only use it through its ThrottleState.
+ *
+ * In addition to the ThrottleGroup structure, BlockDriverState has
+ * fields that need to be accessed by other members of the group and
+ * therefore also need to be protected by this lock. Once a
+ * BlockDriverState is registered in a group those fields can be accessed
+ * by other threads any time.
+ *
+ * Again, all this is handled internally in block/throttle.c and is mostly
+ * transparent to the outside. The 'throttle_timers' field however has an
+ * additional constraint because it may be temporarily invalid (see for example
+ * bdrv_set_aio_context()). Therefore block/throttle.c will access some
+ * other BlockDriverState's timers only after verifying that that BDS has
+ * throttled requests in the queue.
+ */
+
+typedef struct ThrottleGroup {
+    char *name; /* This is constant during the lifetime of the group */
+
+    QemuMutex lock; /* This lock protects the following four fields */
+    ThrottleState ts;
+    QLIST_HEAD(, BDRVThrottleNodeState) head;
+    struct BDRVThrottleNodeState *tokens[2];
+    bool any_timer_armed[2];
+
+    /* These two are protected by the global throttle_groups_lock */
+    unsigned refcount;
+    QTAILQ_ENTRY(ThrottleGroup) list;
+} ThrottleGroup;
+
+typedef struct BDRVThrottleNodeState {
+    ThrottleGroup *throttle_group;
+
+    /* I/O throttling has its own locking, but also some fields are
+     * protected by the AioContext lock.
+     */
+
+    /* Protected by AioContext lock.  */
+    CoQueue      throttled_reqs[2];
+
+    /* Nonzero if the I/O limits are currently being ignored; generally
+     * it is zero.  */
+    unsigned int io_limits_disabled;
+
+    ThrottleState *throttle_state;
+    ThrottleTimers throttle_timers;
+    unsigned       pending_reqs[2];
+    QLIST_ENTRY(BDRVThrottleNodeState) round_robin;
+
+} BDRVThrottleNodeState;