@@ -77,6 +77,8 @@ struct rqspinlock_timeout {
u16 spin;
};
+#define RES_TIMEOUT_VAL 2
+
static noinline int check_timeout(struct rqspinlock_timeout *ts)
{
u64 time = ktime_get_mono_fast_ns();
@@ -305,12 +307,18 @@ int __lockfunc resilient_queued_spin_lock_slowpath(rqspinlock_t *lock, u32 val,
* head of the waitqueue.
*/
if (old & _Q_TAIL_MASK) {
+ int val;
+
prev = decode_tail(old, qnodes);
/* Link @node into the waitqueue. */
WRITE_ONCE(prev->next, node);
- arch_mcs_spin_lock_contended(&node->locked);
+ val = arch_mcs_spin_lock_contended(&node->locked);
+ if (val == RES_TIMEOUT_VAL) {
+ ret = -EDEADLK;
+ goto waitq_timeout;
+ }
/*
* While waiting for the MCS lock, the next pointer may have
@@ -334,7 +342,35 @@ int __lockfunc resilient_queued_spin_lock_slowpath(rqspinlock_t *lock, u32 val,
* sequentiality; this is because the set_locked() function below
* does not imply a full barrier.
*/
- val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));
+ RES_RESET_TIMEOUT(ts);
+ val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK) ||
+ RES_CHECK_TIMEOUT(ts, ret));
+
+waitq_timeout:
+ if (ret) {
+ /*
+ * If the tail is still pointing to us, then we are the final waiter,
+ * and are responsible for resetting the tail back to 0. Otherwise, if
+ * the cmpxchg operation fails, we signal the next waiter to take exit
+ * and try the same. For a waiter with tail node 'n':
+ *
+ * n,*,* -> 0,*,*
+ *
+ * When performing cmpxchg for the whole word (NR_CPUS > 16k), it is
+ * possible locked/pending bits keep changing and we see failures even
+ * when we remain the head of wait queue. However, eventually,
+ * pending bit owner will unset the pending bit, and new waiters
+ * will queue behind us. This will leave the lock owner in
+ * charge, and it will eventually either set locked bit to 0, or
+ * leave it as 1, allowing us to make progress.
+ */
+ if (!try_cmpxchg_tail(lock, tail, 0)) {
+ next = smp_cond_load_relaxed(&node->next, VAL);
+ WRITE_ONCE(next->locked, RES_TIMEOUT_VAL);
+ }
+ lockevent_inc(rqspinlock_lock_timeout);
+ goto release;
+ }
/*
* claim the lock:
@@ -379,6 +415,6 @@ int __lockfunc resilient_queued_spin_lock_slowpath(rqspinlock_t *lock, u32 val,
* release the node
*/
__this_cpu_dec(qnodes[0].mcs.count);
- return 0;
+ return ret;
}
EXPORT_SYMBOL(resilient_queued_spin_lock_slowpath);
new file mode 100644
@@ -0,0 +1,48 @@
+/* SPDX-License-Identifier: GPL-2.0-or-later */
+/*
+ * Resilient Queued Spin Lock defines
+ *
+ * (C) Copyright 2024 Meta Platforms, Inc. and affiliates.
+ *
+ * Authors: Kumar Kartikeya Dwivedi <memxor@gmail.com>
+ */
+#ifndef __LINUX_RQSPINLOCK_H
+#define __LINUX_RQSPINLOCK_H
+
+#include "qspinlock.h"
+
+/*
+ * try_cmpxchg_tail - Return result of cmpxchg of tail word with a new value
+ * @lock: Pointer to queued spinlock structure
+ * @tail: The tail to compare against
+ * @new_tail: The new queue tail code word
+ * Return: Bool to indicate whether the cmpxchg operation succeeded
+ *
+ * This is used by the head of the wait queue to clean up the queue.
+ * Provides relaxed ordering, since observers only rely on initialized
+ * state of the node which was made visible through the xchg_tail operation,
+ * i.e. through the smp_wmb preceding xchg_tail.
+ *
+ * We avoid using 16-bit cmpxchg, which is not available on all architectures.
+ */
+static __always_inline bool try_cmpxchg_tail(struct qspinlock *lock, u32 tail, u32 new_tail)
+{
+ u32 old, new;
+
+ old = atomic_read(&lock->val);
+ do {
+ /*
+ * Is the tail part we compare to already stale? Fail.
+ */
+ if ((old & _Q_TAIL_MASK) != tail)
+ return false;
+ /*
+ * Encode latest locked/pending state for new tail.
+ */
+ new = (old & _Q_LOCKED_PENDING_MASK) | new_tail;
+ } while (!atomic_try_cmpxchg_relaxed(&lock->val, &old, new));
+
+ return true;
+}
+
+#endif /* __LINUX_RQSPINLOCK_H */