@@ -24,7 +24,7 @@
#include <rados/librados.h>
#include <signal.h>
-
+#include <pthread.h>
int eventfd(unsigned int initval, int flags);
@@ -50,6 +50,7 @@ int eventfd(unsigned int initval, int flags);
*/
#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
+#define MAX_QUEUE_SIZE 33554432 // 32MB
typedef struct RBDAIOCB {
BlockDriverAIOCB common;
@@ -79,6 +80,9 @@ typedef struct BDRVRBDState {
uint64_t size;
uint64_t objsize;
int qemu_aio_count;
+ uint64_t queuesize;
+ pthread_mutex_t *queue_mutex;
+ pthread_cond_t *queue_threshold;
} BDRVRBDState;
typedef struct rbd_obj_header_ondisk RbdHeader1;
@@ -334,6 +338,12 @@ static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
le64_to_cpus((uint64_t *) & header->image_size);
s->size = header->image_size;
s->objsize = 1 << header->options.order;
+ s->queuesize = 0;
+
+ s->queue_mutex = qemu_malloc(sizeof(pthread_mutex_t));
+ pthread_mutex_init(s->queue_mutex, NULL);
+ s->queue_threshold = qemu_malloc(sizeof(pthread_cond_t));
+ pthread_cond_init (s->queue_threshold, NULL);
s->efd = eventfd(0, 0);
if (s->efd < 0) {
@@ -356,6 +366,11 @@ static void rbd_close(BlockDriverState *bs)
{
BDRVRBDState *s = bs->opaque;
+ pthread_cond_destroy(s->queue_threshold);
+ qemu_free(s->queue_threshold);
+ pthread_mutex_destroy(s->queue_mutex);
+ qemu_free(s->queue_mutex);
+
rados_close_pool(s->pool);
rados_deinitialize();
}
@@ -443,6 +458,12 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
int i;
acb->aiocnt--;
+ acb->s->queuesize -= rcb->segsize;
+ if (acb->s->queuesize+rcb->segsize > MAX_QUEUE_SIZE && acb->s->queuesize <= MAX_QUEUE_SIZE) {
+ pthread_mutex_lock(acb->s->queue_mutex);
+ pthread_cond_signal(acb->s->queue_threshold);
+ pthread_mutex_unlock(acb->s->queue_mutex);
+ }
r = rados_aio_get_return_value(c);
rados_aio_release(c);
if (acb->write) {
@@ -560,6 +581,14 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
rcb->segsize = segsize;
rcb->buf = buf;
+ while (s->queuesize > MAX_QUEUE_SIZE) {
+ pthread_mutex_lock(s->queue_mutex);
+ pthread_cond_wait(s->queue_threshold, s->queue_mutex);
+ pthread_mutex_unlock(s->queue_mutex);
+ }
+
+ s->queuesize += segsize;
+
if (write) {
rados_aio_create_completion(rcb, NULL,
(rados_callback_t) rbd_finish_aiocb,