@@ -14,15 +14,27 @@
#include <target/target_core_base.h>
#include "dlm_ckv.h"
+enum dlm_lksb_state {
+ ONE_STAGE_SYNC,
+ READ_FIRST_STAGE,
+ READ_SECOND_STAGE,
+ WRITE_FIRST_STAGE,
+ WRITE_SECOND_STAGE,
+};
struct dlm_ckv_lksb {
struct dlm_lksb lksb;
struct completion compl;
+ enum dlm_lksb_state state;
};
struct dlm_ckv_lock {
struct dlm_ckv_bucket *bucket;
struct dlm_ckv_lksb lksb;
char name[DLM_RESNAME_MAXLEN];
+ dlm_ckv_async_cb async_cb;
+ void *cb_arg;
+ void *value;
+ size_t len;
};
struct dlm_ckv_kv {
@@ -97,8 +109,48 @@ static const struct dlm_lockspace_ops dlm_ckv_lockspace_ops = {
static void dlm_ast(void *astarg)
{
struct dlm_ckv_lksb *dlm_ckv_lksb = astarg;
+ struct dlm_ckv_lock *ckv_lock;
+ struct dlm_ckv_bucket *bucket;
+ int res;
+
+ ckv_lock = container_of(dlm_ckv_lksb, struct dlm_ckv_lock, lksb);
+ bucket = ckv_lock->bucket;
- complete(&dlm_ckv_lksb->compl);
+ switch (dlm_ckv_lksb->state) {
+ case ONE_STAGE_SYNC:
+ complete(&dlm_ckv_lksb->compl);
+ break;
+ case READ_FIRST_STAGE:
+ if (dlm_ckv_lksb->lksb.sb_flags & DLM_SBF_VALNOTVALID) {
+ pr_debug("%s LVB was invalid\n", ckv_lock->name);
+ memset(ckv_lock->value, 0, ckv_lock->len);
+ } else
+ memcpy(ckv_lock->value, dlm_ckv_lksb->lksb.sb_lvbptr,
+ ckv_lock->len);
+
+ dlm_ckv_lksb->state = READ_SECOND_STAGE;
+ res = dlm_lock(ckv_lock->bucket->ls, DLM_LOCK_NL,
+ &dlm_ckv_lksb->lksb, DLM_LKF_CONVERT, 0, 0, 0,
+ dlm_ast, dlm_ckv_lksb, NULL);
+ if (res)
+ ckv_lock->async_cb(ckv_lock->cb_arg, res);
+ break;
+ case WRITE_FIRST_STAGE:
+
+ dlm_ckv_lksb->state = WRITE_SECOND_STAGE;
+ res = dlm_lock(ckv_lock->bucket->ls, DLM_LOCK_NL,
+ &dlm_ckv_lksb->lksb,
+ DLM_LKF_VALBLK | DLM_LKF_CONVERT, 0, 0, 0,
+ dlm_ast, dlm_ckv_lksb, NULL);
+ if (res)
+ ckv_lock->async_cb(ckv_lock->cb_arg, res);
+ break;
+ case READ_SECOND_STAGE:
+ fallthrough;
+ case WRITE_SECOND_STAGE:
+ ckv_lock->async_cb(ckv_lock->cb_arg, dlm_ckv_lksb->lksb.sb_status);
+ break;
+ }
}
/*
@@ -135,6 +187,7 @@ static int dlm_ckv_lock_wait(dlm_lockspace_t *ls, int mode,
{
int res;
+ lksb->state = ONE_STAGE_SYNC;
res = dlm_lock(ls, mode, &lksb->lksb, flags,
(void *)name, name ? strlen(name) : 0, 0,
dlm_ast, lksb, bast);
@@ -164,6 +217,7 @@ static int dlm_ckv_unlock_wait(dlm_lockspace_t *ls, struct dlm_ckv_lksb *lksb)
{
int res;
+ lksb->state = ONE_STAGE_SYNC;
res = dlm_unlock(ls, lksb->lksb.sb_lkid, 0, &lksb->lksb, lksb);
if (res < 0)
goto out;
@@ -333,6 +387,32 @@ dlm_ckv_get(struct dlm_ckv_kv *kv, char *value, size_t len)
}
EXPORT_SYMBOL(dlm_ckv_get);
+int
+dlm_ckv_get_async(struct dlm_ckv_kv *kv, char *value, size_t len,
+ dlm_ckv_async_cb cb, void *cb_arg)
+{
+ struct dlm_ckv_lock *ckv_lock = &kv->lock;
+ struct dlm_ckv_bucket *bucket;
+ int res;
+
+ BUG_ON(!ckv_lock);
+ bucket = ckv_lock->bucket;
+
+ ckv_lock->lksb.state = READ_FIRST_STAGE;
+ ckv_lock->len = len;
+ ckv_lock->value = value;
+ ckv_lock->async_cb = cb;
+ ckv_lock->cb_arg = cb_arg;
+ res = dlm_lock(ckv_lock->bucket->ls, DLM_LOCK_CR, &ckv_lock->lksb.lksb,
+ DLM_LKF_VALBLK | DLM_LKF_CONVERT, NULL, 0, 0,
+ dlm_ast, &ckv_lock->lksb, NULL);
+ if (res)
+ pr_info("Can not get lock %s, rc=%d\n", ckv_lock->name, res);
+
+ return res;
+}
+EXPORT_SYMBOL(dlm_ckv_get_async);
+
int
dlm_ckv_set(struct dlm_ckv_kv *kv, const char *value, size_t len)
{
@@ -368,6 +448,34 @@ dlm_ckv_set(struct dlm_ckv_kv *kv, const char *value, size_t len)
}
EXPORT_SYMBOL(dlm_ckv_set);
+int
+dlm_ckv_set_async(struct dlm_ckv_kv *kv, char *value, size_t len,
+ dlm_ckv_async_cb cb, void *cb_arg)
+{
+ struct dlm_ckv_lock *ckv_lock = &kv->lock;
+ struct dlm_ckv_bucket *bucket;
+ int res;
+
+ BUG_ON(!ckv_lock);
+ bucket = ckv_lock->bucket;
+
+ ckv_lock->lksb.state = WRITE_FIRST_STAGE;
+ ckv_lock->len = len;
+ ckv_lock->value = value;
+ ckv_lock->async_cb = cb;
+ ckv_lock->cb_arg = cb_arg;
+ memcpy(ckv_lock->lksb.lksb.sb_lvbptr, ckv_lock->value, ckv_lock->len);
+
+ res = dlm_lock(ckv_lock->bucket->ls, DLM_LOCK_EX, &ckv_lock->lksb.lksb,
+ DLM_LKF_CONVERT, NULL, 0, 0,
+ dlm_ast, &ckv_lock->lksb, NULL);
+ if (res)
+ pr_info("Can not get lock %s\n", ckv_lock->name);
+
+ return res;
+}
+EXPORT_SYMBOL(dlm_ckv_set_async);
+
static void dlm_cvk_pre_n_bast(void *astarg, int mode)
{
struct dlm_ckv_lksb *lksb = astarg;
@@ -9,6 +9,8 @@ struct dlm_ckv_kv;
#define DLM_CKV_VALUE_MAX_SIZE 255
typedef void (*dlm_ckv_notify_cb)(void *userarg);
+typedef void (*dlm_ckv_async_cb)(void *userarg, int res);
+typedef void (*dlm_ckv_nodeleft_cb)(void *arg, int nodeid);
struct dlm_ckv_bucket *dlm_ckv_open_bucket(const char *name,
const char *cluster_name,
@@ -26,6 +28,10 @@ dlm_ckv_create_kv(struct dlm_ckv_bucket *bucket, const char *key);
void dlm_ckv_free_kv(struct dlm_ckv_kv *kv);
int dlm_ckv_get(struct dlm_ckv_kv *kv, char *value, size_t len);
int dlm_ckv_set(struct dlm_ckv_kv *kv, const char *value, size_t len);
+int dlm_ckv_get_async(struct dlm_ckv_kv *kv, char *value, size_t len,
+ dlm_ckv_async_cb cb, void *cb_arg);
+int dlm_ckv_set_async(struct dlm_ckv_kv *kv, char *value, size_t len,
+ dlm_ckv_async_cb cb, void *cb_arg);
struct dlm_ckv_notify *
dlm_ckv_create_notification(struct dlm_ckv_bucket *bucket, const char *name,
The sync versions of KV set/get functions may take too long time if user want to store/read a number of keys. Add async versions of KV get/set functions to allow the user to group the requests and to wait for completion of all requests together. Signed-off-by: Dmitry Bogdanov <d.bogdanov@yadro.com> --- drivers/target/dlm_ckv.c | 110 ++++++++++++++++++++++++++++++++++++++- drivers/target/dlm_ckv.h | 6 +++ 2 files changed, 115 insertions(+), 1 deletion(-)