@@ -78,6 +78,14 @@ static CURLMcode __curl_multi_socket_action(CURLM *multi_handle,
#define CURL_BLOCK_OPT_SSLVERIFY_DEFAULT true
#define CURL_BLOCK_OPT_TIMEOUT_DEFAULT 5
+/* Must be a non-zero power of 2. */
+#define CURL_BLOCK_SIZE (256 * 1024)
+
+/* Align "n" to the start of the containing block. */
+#define CURL_BLOCK_ALIGN(n) ((n) & ~(CURL_BLOCK_SIZE - 1))
+/* The offset of "n" within its' block. */
+#define CURL_BLOCK_OFFSET(n) ((n) & (CURL_BLOCK_SIZE - 1))
+
struct BDRVCURLState;
struct CURLState;
@@ -86,11 +94,18 @@ static bool libcurl_initialized;
typedef struct CURLAIOCB {
Coroutine *co;
QEMUIOVector *qiov;
+ uint64_t qiov_offset; /* Offset in qiov to place data. */
uint64_t offset;
uint64_t bytes;
int ret;
+ /*
+ * start and end indicate the subset of the surrounding
+ * CURL_BLOCK_SIZE sized block that is the subject of this
+ * IOCB. They are offsets from the beginning of the underlying
+ * buffer.
+ */
size_t start;
size_t end;
} CURLAIOCB;
@@ -110,7 +125,6 @@ typedef struct CURLState
char *orig_buf;
uint64_t buf_start;
size_t buf_off;
- size_t buf_len;
char range[128];
char errmsg[CURL_ERROR_SIZE];
char in_use;
@@ -259,11 +273,11 @@ static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque)
goto read_end;
}
- if (s->buf_off >= s->buf_len) {
+ if (s->buf_off >= CURL_BLOCK_SIZE) {
/* buffer full, read nothing */
goto read_end;
}
- realsize = MIN(realsize, s->buf_len - s->buf_off);
+ realsize = MIN(realsize, CURL_BLOCK_SIZE - s->buf_off);
memcpy(s->orig_buf + s->buf_off, ptr, realsize);
s->buf_off += realsize;
@@ -281,35 +295,44 @@ static bool curl_find_buf(BDRVCURLState *s, uint64_t start, uint64_t len,
uint64_t clamped_end = MIN(end, s->len);
uint64_t clamped_len = clamped_end - start;
- for (i=0; i<CURL_NUM_STATES; i++) {
+ for (i = 0; i < CURL_NUM_STATES; i++) {
CURLState *state = &s->states[i];
- uint64_t buf_end = (state->buf_start + state->buf_off);
- uint64_t buf_fend = (state->buf_start + state->buf_len);
+ /* The end of the currently valid data. */
+ uint64_t buf_end = state->buf_start + state->buf_off;
+ /* The end of the valid data when the IO completes. */
+ uint64_t buf_fend = state->buf_start + CURL_BLOCK_SIZE;
if (!state->orig_buf)
continue;
if (!state->buf_off)
continue;
- // Does the existing buffer cover our section?
+ /*
+ * Does the existing buffer cover our section?
+ */
if ((start >= state->buf_start) &&
(start <= buf_end) &&
(clamped_end >= state->buf_start) &&
(clamped_end <= buf_end))
{
- char *buf = state->orig_buf + (start - state->buf_start);
+ char *buf = state->orig_buf + CURL_BLOCK_OFFSET(start);
trace_curl_pending_hit(qemu_coroutine_self(),
start, len);
- qemu_iovec_from_buf(acb->qiov, 0, buf, clamped_len);
+ qemu_iovec_from_buf(acb->qiov, acb->qiov_offset, buf, clamped_len);
if (clamped_len < len) {
- qemu_iovec_memset(acb->qiov, clamped_len, 0, len - clamped_len);
+ qemu_iovec_memset(acb->qiov, acb->qiov_offset + clamped_len,
+ 0, len - clamped_len);
}
acb->ret = 0;
return true;
}
- // Wait for unfinished chunks
+ /*
+ * If an in-progress IO will provide the required data, wait
+ * for it to complete - the initiator will complete this
+ * aiocb.
+ */
if (state->in_use &&
(start >= state->buf_start) &&
(start <= buf_fend) &&
@@ -320,10 +343,10 @@ static bool curl_find_buf(BDRVCURLState *s, uint64_t start, uint64_t len,
trace_curl_pending_piggyback(qemu_coroutine_self(),
start, len);
- acb->start = start - state->buf_start;
+ acb->start = CURL_BLOCK_OFFSET(start);
acb->end = acb->start + clamped_len;
- for (j=0; j<CURL_NUM_ACB; j++) {
+ for (j = 0; j < CURL_NUM_ACB; j++) {
if (!state->acb[j]) {
state->acb[j] = acb;
return true;
@@ -377,7 +400,7 @@ static void curl_multi_check_completion(BDRVCURLState *s)
for (i = 0; i < CURL_NUM_ACB; i++) {
CURLAIOCB *acb = state->acb[i];
- if (acb == NULL) {
+ if (!acb) {
continue;
}
@@ -385,14 +408,15 @@ static void curl_multi_check_completion(BDRVCURLState *s)
/* Assert that we have read all data */
assert(state->buf_off >= acb->end);
- qemu_iovec_from_buf(acb->qiov, 0,
+ qemu_iovec_from_buf(acb->qiov, acb->qiov_offset,
state->orig_buf + acb->start,
acb->end - acb->start);
if (acb->end - acb->start < acb->bytes) {
size_t offset = acb->end - acb->start;
- qemu_iovec_memset(acb->qiov, offset, 0,
- acb->bytes - offset);
+
+ qemu_iovec_memset(acb->qiov, acb->qiov_offset + offset,
+ 0, acb->bytes - offset);
}
}
@@ -539,6 +563,7 @@ static int curl_init_state(BDRVCURLState *s, CURLState *state)
static void curl_clean_state(CURLState *s)
{
int j;
+
for (j = 0; j < CURL_NUM_ACB; j++) {
assert(!s->acb[j]);
}
@@ -856,18 +881,26 @@ static void curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb)
BDRVCURLState *s = bs->opaque;
- uint64_t start = acb->offset;
- uint64_t end;
+ /*
+ * Our caller must ensure that this request does not span two
+ * blocks.
+ */
+ assert(CURL_BLOCK_ALIGN(acb->offset) ==
+ CURL_BLOCK_ALIGN(acb->offset + acb->bytes - 1));
qemu_mutex_lock(&s->mutex);
- // In case we have the requested data already (e.g. read-ahead),
- // we can just call the callback and be done.
- if (curl_find_buf(s, start, acb->bytes, acb)) {
+ /*
+ * Check whether the requested data can be found in an existing or
+ * pending IO request.
+ */
+ if (curl_find_buf(s, acb->offset, acb->bytes, acb)) {
goto out;
}
- // No cache found, so let's start a new request
+ /*
+ * No cache found, so let's start a new request.
+ */
for (;;) {
state = curl_find_state(s);
if (state) {
@@ -882,16 +915,15 @@ static void curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb)
goto out;
}
- acb->start = 0;
- acb->end = MIN(acb->bytes, s->len - start);
+ acb->start = CURL_BLOCK_OFFSET(acb->offset);
+ acb->end = acb->start + MIN(acb->bytes, s->len - acb->offset);
state->buf_off = 0;
- g_free(state->orig_buf);
- state->buf_start = start;
- state->buf_len = MIN(acb->end, s->len - start);
- end = start + state->buf_len - 1;
- state->orig_buf = g_try_malloc(state->buf_len);
- if (state->buf_len && state->orig_buf == NULL) {
+ state->buf_start = CURL_BLOCK_ALIGN(acb->offset);
+ if (!state->orig_buf) {
+ state->orig_buf = g_try_malloc(CURL_BLOCK_SIZE);
+ }
+ if (!state->orig_buf) {
curl_clean_state(state);
acb->ret = -ENOMEM;
goto out;
@@ -899,8 +931,10 @@ static void curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb)
state->acb[0] = acb;
snprintf(state->range, 127, "%" PRIu64 "-%" PRIu64,
- s->offset + start, s->offset + end);
- trace_curl_setup_preadv(qemu_coroutine_self(), start, acb->bytes);
+ s->offset + state->buf_start,
+ s->offset + state->buf_start + CURL_BLOCK_SIZE);
+ trace_curl_setup_preadv(qemu_coroutine_self(), state->buf_start,
+ CURL_BLOCK_SIZE);
curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range);
if (curl_multi_add_handle(s->multi, state->curl) != CURLM_OK) {
@@ -921,21 +955,50 @@ out:
static int coroutine_fn curl_co_preadv(BlockDriverState *bs,
uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, int flags)
{
- CURLAIOCB acb = {
- .co = qemu_coroutine_self(),
- .ret = -EINPROGRESS,
- .qiov = qiov,
- .offset = offset,
- .bytes = bytes
- };
+ /*
+ * The lower layer does all IO in single CURL_BLOCK_SIZE sized and
+ * aligned chunks and cannot handle an IO that spans two blocks,
+ * so split the request here.
+ */
+ int ret = 0;
+ uint64_t qiov_offset = 0;
+ uint64_t off = offset;
trace_curl_co_preadv(qemu_coroutine_self(), offset, bytes);
- curl_setup_preadv(bs, &acb);
- while (acb.ret == -EINPROGRESS) {
- qemu_coroutine_yield();
+
+ while (bytes > 0) {
+ uint64_t len = MIN(bytes, CURL_BLOCK_SIZE - CURL_BLOCK_OFFSET(off));
+ CURLAIOCB acb = {
+ .co = qemu_coroutine_self(),
+ .ret = -EINPROGRESS,
+ .qiov = qiov,
+ .qiov_offset = qiov_offset,
+ .offset = off,
+ .bytes = len,
+ };
+
+ trace_curl_co_preadv_segment(qemu_coroutine_self(), off, len);
+
+ curl_setup_preadv(bs, &acb);
+ while (acb.ret == -EINPROGRESS) {
+ qemu_coroutine_yield();
+ }
+
+ ret = acb.ret;
+ if (ret != 0) {
+ return ret;
+ }
+
+ trace_curl_co_preadv_segment_done(qemu_coroutine_self());
+
+ qiov_offset += len;
+ off += len;
+ bytes -= len;
}
+
trace_curl_co_preadv_done(qemu_coroutine_self());
- return acb.ret;
+
+ return ret;
}
static void curl_close(BlockDriverState *bs)
@@ -200,6 +200,8 @@ curl_open(const char *file) "opening %s"
curl_open_size(uint64_t size) "size = %" PRIu64
curl_co_preadv(void *co, uint64_t offset, uint64_t bytes) "co %p requests 0x%" PRIx64 " + 0x%" PRIx64
curl_co_preadv_done(void *co) "co %p done"
+curl_co_preadv_segment(void *co, uint64_t offset, uint64_t bytes) "co %p requests 0x%" PRIx64 " + 0x%" PRIx64
+curl_co_preadv_segment_done(void *co) "co %p done"
curl_setup_preadv(void *co, uint64_t offset, uint64_t bytes) "co %p requests 0x%" PRIx64 " + 0x%" PRIx64
curl_pending_hit(void *co, uint64_t start, uint64_t len) "co %p finds 0x%" PRIx64 " + 0x%" PRIx64
curl_pending_piggyback(void *co, uint64_t start, uint64_t len) "co %p pending 0x%" PRIx64 " + 0x%" PRIx64
Do all IO requests to the remote server in 256kB chunks. Signed-off-by: David Edmondson <david.edmondson@oracle.com> --- block/curl.c | 151 ++++++++++++++++++++++++++++++++------------- block/trace-events | 2 + 2 files changed, 109 insertions(+), 44 deletions(-)