diff mbox

[RFC,12/22] block/pcache: implement read cache to qiov and drop node during aio write

Message ID 20160825134421.20231-13-pbutsykin@virtuozzo.com (mailing list archive)
State New, archived
Headers show

Commit Message

Pavel Butsykin Aug. 25, 2016, 1:44 p.m. UTC
After applying this patch aio read can pick up data from the pcache.

for correct driver work added 3 things:
1. Filling the qiov out of the cache.
2. Request completes. This is necessary to inform the upper level,
that the data for the request has already been obtained.
3. Cache Invalidation in the aio write requests. This is a simple way to keep
the cache up-to-date.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 126 +++++++++++++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 118 insertions(+), 8 deletions(-)
diff mbox

Patch

diff --git a/block/pcache.c b/block/pcache.c
index a8a57e3..435f2b4 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -103,7 +103,9 @@  typedef struct PrefCacheAIOCB {
     struct {
         QTAILQ_HEAD(req_head, PrefCachePartReq) list;
         CoMutex lock;
+        uint32_t cnt;
     } requests;
+    QEMUBH   *bh;
     int      ret;
 } PrefCacheAIOCB;
 
@@ -217,6 +219,27 @@  static BlockNode *pcache_node_prev(BlockNode* node, RbNodeKey *key)
     return node;
 }
 
+static void *node_search(struct RbRoot *root, RbNodeKey *key)
+{
+    struct RbNode *rb_node = root->rb_node;
+
+    while (rb_node) {
+        BlockNode *node = container_of(rb_node, BlockNode, rb_node);
+        int32_t result = pcache_key_cmp(key, &node->key);
+        if (result == 0) {
+            return pcache_node_prev(node, key);
+        }
+        rb_node = result < 0 ? rb_node->rb_left : rb_node->rb_right;
+    }
+    return NULL;
+}
+
+static PCNode *pcache_node_search(struct RbRoot *root, RbNodeKey *key)
+{
+    PCNode *node = node_search(root, key);
+    return node == NULL ? NULL : pcache_node_ref(node);
+}
+
 static void *node_insert(struct RbRoot *root, BlockNode *node)
 {
     struct RbNode **new = &(root->rb_node), *parent = NULL;
@@ -320,6 +343,8 @@  static inline void push_node_request(PrefCacheAIOCB *acb, PCNode *node)
 {
     PrefCachePartReq *req = pcache_req_get(acb, node);
 
+    acb->requests.cnt++;
+
     QTAILQ_INSERT_HEAD(&acb->requests.list, req, entry);
 }
 
@@ -360,6 +385,38 @@  static bool pcache_node_find_and_create(PrefCacheAIOCB *acb, RbNodeKey *key,
     return true;
 }
 
+static uint64_t ranges_overlap_size(uint64_t node1, uint32_t size1,
+                                    uint64_t node2, uint32_t size2)
+{
+    return MIN(node1 + size1, node2 + size2) - MAX(node1, node2);
+}
+
+static void pcache_node_read(PrefCacheAIOCB *acb, PCNode* node)
+{
+    uint64_t qiov_offs = 0, node_offs = 0;
+    uint32_t size;
+    uint32_t copy;
+
+    if (acb->sector_num < node->cm.sector_num) {
+        qiov_offs = (node->cm.sector_num - acb->sector_num) << BDRV_SECTOR_BITS;
+    } else {
+        node_offs = (acb->sector_num - node->cm.sector_num) << BDRV_SECTOR_BITS;
+    }
+    size = ranges_overlap_size(acb->sector_num, acb->nb_sectors,
+                               node->cm.sector_num, node->cm.nb_sectors)
+           << BDRV_SECTOR_BITS;
+
+    assert(node->status == NODE_SUCCESS_STATUS ||
+           node->status == NODE_REMOVE_STATUS);
+    assert(node->data != NULL);
+
+    qemu_co_mutex_lock(&node->lock);
+    copy = \
+        qemu_iovec_from_buf(acb->qiov, qiov_offs, node->data + node_offs, size);
+    assert(copy == size);
+    qemu_co_mutex_unlock(&node->lock);
+}
+
 static inline void prefetch_init_key(PrefCacheAIOCB *acb, RbNodeKey* key)
 {
     key->num = acb->sector_num;
@@ -388,7 +445,7 @@  static void pcache_pickup_parts_of_cache(PrefCacheAIOCB *acb, PCNode *node,
             size -= up_size;
             num += up_size;
         }
-        /* XXX: node read */
+        pcache_node_read(acb, node);
         up_size = MIN(node->cm.sector_num + node->cm.nb_sectors - num, size);
 
         pcache_node_unref(acb->s, node);
@@ -429,13 +486,28 @@  static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
         node->cm.sector_num + node->cm.nb_sectors >= acb->sector_num +
                                                      acb->nb_sectors)
     {
-        /* XXX: node read */
+        pcache_node_read(acb, node);
         pcache_node_unref(acb->s, node);
         return PREFETCH_FULL_UP;
     }
     pcache_pickup_parts_of_cache(acb, node, key.num, key.size);
 
-    return PREFETCH_PART_UP;
+    return acb->requests.cnt == 0 ? PREFETCH_FULL_UP : PREFETCH_PART_UP;
+}
+
+static void pcache_aio_bh(void *opaque)
+{
+    PrefCacheAIOCB *acb = opaque;
+    qemu_bh_delete(acb->bh);
+    acb->common.cb(acb->common.opaque, 0);
+    qemu_aio_unref(acb);
+}
+
+static void complete_aio_request(PrefCacheAIOCB *acb)
+{
+    acb->bh = aio_bh_new(bdrv_get_aio_context(acb->common.bs),
+                         pcache_aio_bh, acb);
+    qemu_bh_schedule(acb->bh);
 }
 
 static void pcache_node_submit(PrefCachePartReq *req)
@@ -471,7 +543,7 @@  static void pcache_merge_requests(PrefCacheAIOCB *acb)
 
         pcache_node_submit(req);
 
-        /* XXX: pcache read */
+        pcache_node_read(acb, req->node);
 
         pcache_node_unref(acb->s, req->node);
 
@@ -480,11 +552,36 @@  static void pcache_merge_requests(PrefCacheAIOCB *acb)
     qemu_co_mutex_unlock(&acb->requests.lock);
 }
 
+static void pcache_try_node_drop(PrefCacheAIOCB *acb)
+{
+    BDRVPCacheState *s = acb->s;
+    RbNodeKey key;
+
+    prefetch_init_key(acb, &key);
+
+    do {
+        PCNode *node;
+        qemu_co_mutex_lock(&s->pcache.tree.lock);
+        node = pcache_node_search(&s->pcache.tree.root, &key);
+        qemu_co_mutex_unlock(&s->pcache.tree.lock);
+        if (node == NULL) {
+            break;
+        }
+
+        pcache_node_drop(s, node);
+
+        pcache_node_unref(s, node);
+    } while (true);
+}
+
 static void pcache_aio_cb(void *opaque, int ret)
 {
     PrefCacheAIOCB *acb = opaque;
 
     if (acb->aio_type & QEMU_AIO_READ) {
+        if (atomic_dec_fetch(&acb->requests.cnt) > 0) {
+            return;
+        }
         pcache_merge_requests(acb);
     }
 
@@ -503,6 +600,7 @@  static PrefCacheAIOCB *pcache_aio_get(BlockDriverState *bs, int64_t sector_num,
     acb->s = bs->opaque;
     acb->sector_num = sector_num;
     acb->nb_sectors = nb_sectors;
+    acb->requests.cnt = 0;
     acb->qiov = qiov;
     acb->aio_type = type;
     acb->ret = 0;
@@ -522,10 +620,21 @@  static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
 {
     PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
                                          opaque, QEMU_AIO_READ);
-    pcache_prefetch(acb);
-
-    bdrv_aio_readv(bs->file, sector_num, qiov, nb_sectors,
-                   pcache_aio_cb, acb);
+    int32_t status = pcache_prefetch(acb);
+    if (status == PREFETCH_FULL_UP) {
+        assert(acb->requests.cnt == 0);
+        complete_aio_request(acb);
+    } else {
+        PrefCachePartReq *req;
+        assert(acb->requests.cnt != 0);
+
+        qemu_co_mutex_lock(&acb->requests.lock);
+        QTAILQ_FOREACH(req, &acb->requests.list, entry) {
+            bdrv_aio_readv(bs->file, req->sector_num, &req->qiov,
+                           req->nb_sectors, pcache_aio_cb, acb);
+        }
+        qemu_co_mutex_unlock(&acb->requests.lock);
+    }
     return &acb->common;
 }
 
@@ -538,6 +647,7 @@  static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
 {
     PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
                                          opaque, QEMU_AIO_WRITE);
+    pcache_try_node_drop(acb); /* XXX: use write through */
 
     bdrv_aio_writev(bs->file, sector_num, qiov, nb_sectors,
                     pcache_aio_cb, acb);