@@ -43,6 +43,11 @@ typedef struct RbNodeKey {
uint32_t size;
} RbNodeKey;
+typedef struct ACBEntryLink {
+ QTAILQ_ENTRY(ACBEntryLink) entry;
+ struct PrefCacheAIOCB *acb;
+} ACBEntryLink;
+
typedef struct BlockNode {
struct RbNode rb_node;
union {
@@ -58,6 +63,10 @@ typedef struct BlockNode {
typedef struct PCNode {
BlockNode cm;
+ struct {
+ QTAILQ_HEAD(acb_head, ACBEntryLink) list;
+ uint32_t cnt;
+ } wait;
uint32_t status;
uint32_t ref;
uint8_t *data;
@@ -181,7 +190,6 @@ static inline PCNode *pcache_node_ref(PCNode *node)
{
assert(node->status == NODE_SUCCESS_STATUS ||
node->status == NODE_WAIT_STATUS);
- assert(atomic_read(&node->ref) == 0);/* XXX: only for sequential requests */
atomic_inc(&node->ref);
return node;
@@ -277,6 +285,8 @@ static inline void *pcache_node_alloc(RbNodeKey* key)
node->status = NODE_WAIT_STATUS;
qemu_co_mutex_init(&node->lock);
node->data = g_malloc(node->cm.nb_sectors << BDRV_SECTOR_BITS);
+ node->wait.cnt = 0;
+ QTAILQ_INIT(&node->wait.list);
return node;
}
@@ -308,15 +318,33 @@ static void pcache_node_drop(BDRVPCacheState *s, PCNode *node)
pcache_node_unref(s, node);
}
+static inline PCNode *pcache_get_most_unused_node(BDRVPCacheState *s)
+{
+ PCNode *node;
+ assert(!QTAILQ_EMPTY(&s->pcache.lru.list));
+
+ qemu_co_mutex_lock(&s->pcache.lru.lock);
+ node = PCNODE(QTAILQ_LAST(&s->pcache.lru.list, lru_head));
+ pcache_node_ref(node);
+ qemu_co_mutex_unlock(&s->pcache.lru.lock);
+
+ return node;
+}
+
static void pcache_try_shrink(BDRVPCacheState *s)
{
while (s->pcache.curr_size > s->cfg_cache_size) {
- qemu_co_mutex_lock(&s->pcache.lru.lock);
- assert(!QTAILQ_EMPTY(&s->pcache.lru.list));
- PCNode *rmv_node = PCNODE(QTAILQ_LAST(&s->pcache.lru.list, lru_head));
- qemu_co_mutex_unlock(&s->pcache.lru.lock);
+ PCNode *rmv_node;
+ /* it can happen if all nodes are waiting */
+ if (QTAILQ_EMPTY(&s->pcache.lru.list)) {
+ DPRINTF("lru list is empty, but curr_size: %d\n",
+ s->pcache.curr_size);
+ break;
+ }
+ rmv_node = pcache_get_most_unused_node(s);
pcache_node_drop(s, rmv_node);
+ pcache_node_unref(s, rmv_node);
#ifdef PCACHE_DEBUG
atomic_inc(&s->shrink_cnt_node);
#endif
@@ -392,7 +420,7 @@ static uint64_t ranges_overlap_size(uint64_t node1, uint32_t size1,
return MIN(node1 + size1, node2 + size2) - MAX(node1, node2);
}
-static void pcache_node_read(PrefCacheAIOCB *acb, PCNode* node)
+static inline void pcache_node_read_buf(PrefCacheAIOCB *acb, PCNode* node)
{
uint64_t qiov_offs = 0, node_offs = 0;
uint32_t size;
@@ -407,15 +435,41 @@ static void pcache_node_read(PrefCacheAIOCB *acb, PCNode* node)
node->cm.sector_num, node->cm.nb_sectors)
<< BDRV_SECTOR_BITS;
+ qemu_co_mutex_lock(&node->lock); /* XXX: use rw lock */
+ copy = \
+ qemu_iovec_from_buf(acb->qiov, qiov_offs, node->data + node_offs, size);
+ qemu_co_mutex_unlock(&node->lock);
+ assert(copy == size);
+}
+
+static inline void pcache_node_read_wait(PrefCacheAIOCB *acb, PCNode *node)
+{
+ ACBEntryLink *link = g_slice_alloc(sizeof(*link));
+ link->acb = acb;
+
+ atomic_inc(&node->wait.cnt);
+ QTAILQ_INSERT_HEAD(&node->wait.list, link, entry);
+ acb->ref++;
+}
+
+static void pcache_node_read(PrefCacheAIOCB *acb, PCNode* node)
+{
assert(node->status == NODE_SUCCESS_STATUS ||
+ node->status == NODE_WAIT_STATUS ||
node->status == NODE_REMOVE_STATUS);
assert(node->data != NULL);
qemu_co_mutex_lock(&node->lock);
- copy = \
- qemu_iovec_from_buf(acb->qiov, qiov_offs, node->data + node_offs, size);
- assert(copy == size);
+ if (node->status == NODE_WAIT_STATUS) {
+ pcache_node_read_wait(acb, node);
+ qemu_co_mutex_unlock(&node->lock);
+
+ return;
+ }
qemu_co_mutex_unlock(&node->lock);
+
+ pcache_node_read_buf(acb, node);
+ pcache_node_unref(acb->s, node);
}
static inline void prefetch_init_key(PrefCacheAIOCB *acb, RbNodeKey* key)
@@ -446,10 +500,11 @@ static void pcache_pickup_parts_of_cache(PrefCacheAIOCB *acb, PCNode *node,
size -= up_size;
num += up_size;
}
- pcache_node_read(acb, node);
up_size = MIN(node->cm.sector_num + node->cm.nb_sectors - num, size);
-
- pcache_node_unref(acb->s, node);
+ pcache_node_read(acb, node); /* don't use node after pcache_node_read,
+ * node maybe free.
+ */
+ node = NULL;
size -= up_size;
num += up_size;
@@ -488,7 +543,6 @@ static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
acb->nb_sectors)
{
pcache_node_read(acb, node);
- pcache_node_unref(acb->s, node);
return PREFETCH_FULL_UP;
}
pcache_pickup_parts_of_cache(acb, node, key.num, key.size);
@@ -513,6 +567,31 @@ static void complete_aio_request(PrefCacheAIOCB *acb)
}
}
+static void pcache_complete_acb_wait_queue(BDRVPCacheState *s, PCNode *node)
+{
+ ACBEntryLink *link, *next;
+
+ if (atomic_read(&node->wait.cnt) == 0) {
+ return;
+ }
+
+ QTAILQ_FOREACH_SAFE(link, &node->wait.list, entry, next) {
+ PrefCacheAIOCB *wait_acb = link->acb;
+
+ QTAILQ_REMOVE(&node->wait.list, link, entry);
+ g_slice_free1(sizeof(*link), link);
+
+ pcache_node_read_buf(wait_acb, node);
+
+ assert(node->ref != 0);
+ pcache_node_unref(s, node);
+
+ complete_aio_request(wait_acb);
+ atomic_dec(&node->wait.cnt);
+ }
+ assert(atomic_read(&node->wait.cnt) == 0);
+}
+
static void pcache_node_submit(PrefCachePartReq *req)
{
PCNode *node = req->node;
@@ -539,14 +618,17 @@ static void pcache_merge_requests(PrefCacheAIOCB *acb)
qemu_co_mutex_lock(&acb->requests.lock);
QTAILQ_FOREACH_SAFE(req, &acb->requests.list, entry, next) {
+ PCNode *node = req->node;
QTAILQ_REMOVE(&acb->requests.list, req, entry);
assert(req != NULL);
- assert(req->node->status == NODE_WAIT_STATUS);
+ assert(node->status == NODE_WAIT_STATUS);
pcache_node_submit(req);
- pcache_node_read(acb, req->node);
+ pcache_node_read_buf(acb, node);
+
+ pcache_complete_acb_wait_queue(acb->s, node);
pcache_node_unref(acb->s, req->node);
@@ -559,22 +641,27 @@ static void pcache_try_node_drop(PrefCacheAIOCB *acb)
{
BDRVPCacheState *s = acb->s;
RbNodeKey key;
+ PCNode *node;
+ uint64_t end_offs = acb->sector_num + acb->nb_sectors;
- prefetch_init_key(acb, &key);
-
+ key.num = acb->sector_num;
do {
- PCNode *node;
- qemu_co_mutex_lock(&s->pcache.tree.lock);
+ key.size = end_offs - key.num;
+
+ qemu_co_mutex_lock(&s->pcache.tree.lock); /* XXX: use get_next_node */
node = pcache_node_search(&s->pcache.tree.root, &key);
qemu_co_mutex_unlock(&s->pcache.tree.lock);
if (node == NULL) {
- break;
+ return;
}
-
- pcache_node_drop(s, node);
+ if (node->status != NODE_WAIT_STATUS) {
+ assert(node->status == NODE_SUCCESS_STATUS);
+ pcache_node_drop(s, node);
+ }
+ key.num = node->cm.sector_num + node->cm.nb_sectors;
pcache_node_unref(s, node);
- } while (true);
+ } while (end_offs > key.num);
}
static void pcache_aio_cb(void *opaque, int ret)
@@ -586,6 +673,8 @@ static void pcache_aio_cb(void *opaque, int ret)
return;
}
pcache_merge_requests(acb);
+ } else { /* QEMU_AIO_WRITE */
+ pcache_try_node_drop(acb); /* XXX: use write through */
}
complete_aio_request(acb);
@@ -649,7 +738,6 @@ static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
{
PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
opaque, QEMU_AIO_WRITE);
- pcache_try_node_drop(acb); /* XXX: use write through */
bdrv_aio_writev(bs->file, sector_num, qiov, nb_sectors,
pcache_aio_cb, acb);
Now we can't drop nodes until aio write request will not be completed, because there is no guarantee that in the interval of time between the start request and its completion can be cached overlapping chunk of blocks and some data in the cache will be irrelevant. Also became possible when aio write corresponds to PCNode with status NODE_WAIT_STATUS, if we drop the nodes in aio callback, then these nodes can be skipped because there is a guarantee that at the time of processing aio read for pending node data on the disk will be relevant. Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com> --- block/pcache.c | 136 +++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 112 insertions(+), 24 deletions(-)