@@ -445,9 +445,14 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp)
if (!cp->cp_transport_data)
return;
- /* make sure lingering queued work won't try to ref the conn */
- cancel_delayed_work_sync(&cp->cp_send_w);
- cancel_delayed_work_sync(&cp->cp_recv_w);
+ /* make sure lingering queued work won't try to ref the
+ * conn. If there is work queued, we cancel it (and set the
+ * bit to avoid any re-queueing)
+ */
+ if (test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags))
+ cancel_delayed_work_sync(&cp->cp_send_w);
+ if (test_and_set_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags))
+ cancel_delayed_work_sync(&cp->cp_recv_w);
rds_conn_path_drop(cp, true);
flush_work(&cp->cp_down_w);
@@ -457,7 +457,7 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp)
(must_wake ||
(can_wait && rds_ib_ring_low(&ic->i_recv_ring)) ||
rds_ib_ring_empty(&ic->i_recv_ring))) {
- queue_delayed_work(rds_wq, &conn->c_recv_w, 1);
+ rds_cond_queue_recv_work(conn->c_path + 0, 1);
}
if (can_wait)
cond_resched();
@@ -419,7 +419,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits)
atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
- queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+ rds_cond_queue_send_work(conn->c_path + 0, 0);
WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
@@ -90,6 +90,8 @@ enum {
#define RDS_IN_XMIT 2
#define RDS_RECV_REFILL 3
#define RDS_DESTROY_PENDING 4
+#define RDS_SEND_WORK_QUEUED 5
+#define RDS_RECV_WORK_QUEUED 6
/* Max number of multipaths per RDS connection. Must be a power of 2 */
#define RDS_MPATH_WORKERS 8
@@ -791,6 +793,37 @@ void __rds_conn_path_error(struct rds_conn_path *cp, const char *, ...);
#define rds_conn_path_error(cp, fmt...) \
__rds_conn_path_error(cp, KERN_WARNING "RDS: " fmt)
+extern struct workqueue_struct *rds_wq;
+static inline void rds_cond_queue_send_work(struct rds_conn_path *cp, unsigned long delay)
+{
+ if (!test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags))
+ queue_delayed_work(rds_wq, &cp->cp_send_w, delay);
+}
+
+static inline void rds_clear_queued_send_work_bit(struct rds_conn_path *cp)
+{
+ /* clear_bit() does not imply a memory barrier */
+ smp_mb__before_atomic();
+ clear_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags);
+ /* clear_bit() does not imply a memory barrier */
+ smp_mb__after_atomic();
+}
+
+static inline void rds_cond_queue_recv_work(struct rds_conn_path *cp, unsigned long delay)
+{
+ if (!test_and_set_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags))
+ queue_delayed_work(rds_wq, &cp->cp_recv_w, delay);
+}
+
+static inline void rds_clear_queued_recv_work_bit(struct rds_conn_path *cp)
+{
+ /* clear_bit() does not imply a memory barrier */
+ smp_mb__before_atomic();
+ clear_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags);
+ /* clear_bit() does not imply a memory barrier */
+ smp_mb__after_atomic();
+}
+
static inline int
rds_conn_path_transition(struct rds_conn_path *cp, int old, int new)
{
@@ -458,7 +458,7 @@ int rds_send_xmit(struct rds_conn_path *cp)
if (rds_destroy_pending(cp->cp_conn))
ret = -ENETUNREACH;
else
- queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+ rds_cond_queue_send_work(cp, 1);
rcu_read_unlock();
} else if (raced) {
rds_stats_inc(s_send_lock_queue_raced);
@@ -1380,7 +1380,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
if (rds_destroy_pending(cpath->cp_conn))
ret = -ENETUNREACH;
else
- queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
+ rds_cond_queue_send_work(cpath, 1);
rcu_read_unlock();
}
if (ret)
@@ -1473,7 +1473,7 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport,
/* schedule the send work on rds_wq */
rcu_read_lock();
if (!rds_destroy_pending(cp->cp_conn))
- queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+ rds_cond_queue_send_work(cp, 0);
rcu_read_unlock();
rds_message_put(rm);
@@ -168,8 +168,16 @@ void rds_tcp_reset_callbacks(struct socket *sock,
atomic_set(&cp->cp_state, RDS_CONN_RESETTING);
wait_event(cp->cp_waitq, !test_bit(RDS_IN_XMIT, &cp->cp_flags));
/* reset receive side state for rds_tcp_data_recv() for osock */
- cancel_delayed_work_sync(&cp->cp_send_w);
- cancel_delayed_work_sync(&cp->cp_recv_w);
+
+ /* make sure lingering queued work won't try to ref the
+ * conn. If there is work queued, we cancel it (and set the bit
+ * to avoid any re-queueing)
+ */
+
+ if (test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags))
+ cancel_delayed_work_sync(&cp->cp_send_w);
+ if (test_and_set_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags))
+ cancel_delayed_work_sync(&cp->cp_recv_w);
lock_sock(osock->sk);
if (tc->t_tinc) {
rds_inc_put(&tc->t_tinc->ti_inc);
@@ -327,7 +327,7 @@ void rds_tcp_data_ready(struct sock *sk)
if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) {
rcu_read_lock();
if (!rds_destroy_pending(cp->cp_conn))
- queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+ rds_cond_queue_recv_work(cp, 0);
rcu_read_unlock();
}
out:
@@ -201,7 +201,7 @@ void rds_tcp_write_space(struct sock *sk)
rcu_read_lock();
if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf &&
!rds_destroy_pending(cp->cp_conn))
- queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+ rds_cond_queue_send_work(cp, 0);
rcu_read_unlock();
out:
@@ -89,8 +89,10 @@ void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
set_bit(0, &cp->cp_conn->c_map_queued);
rcu_read_lock();
if (!rds_destroy_pending(cp->cp_conn)) {
- queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
- queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+ rds_clear_queued_send_work_bit(cp);
+ rds_cond_queue_send_work(cp, 0);
+ rds_clear_queued_recv_work_bit(cp);
+ rds_cond_queue_recv_work(cp, 0);
}
rcu_read_unlock();
cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION;
@@ -193,9 +195,11 @@ void rds_send_worker(struct work_struct *work)
struct rds_conn_path *cp = container_of(work,
struct rds_conn_path,
cp_send_w.work);
+ unsigned long delay;
int ret;
if (rds_conn_path_state(cp) == RDS_CONN_UP) {
+ rds_clear_queued_send_work_bit(cp);
clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
ret = rds_send_xmit(cp);
cond_resched();
@@ -203,15 +207,17 @@ void rds_send_worker(struct work_struct *work)
switch (ret) {
case -EAGAIN:
rds_stats_inc(s_send_immediate_retry);
- queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+ delay = 0;
break;
case -ENOMEM:
rds_stats_inc(s_send_delayed_retry);
- queue_delayed_work(rds_wq, &cp->cp_send_w, 2);
+ delay = 2;
break;
default:
- break;
+ return;
}
+
+ rds_cond_queue_send_work(cp, delay);
}
}
@@ -220,23 +226,26 @@ void rds_recv_worker(struct work_struct *work)
struct rds_conn_path *cp = container_of(work,
struct rds_conn_path,
cp_recv_w.work);
+ unsigned long delay;
int ret;
if (rds_conn_path_state(cp) == RDS_CONN_UP) {
+ rds_clear_queued_recv_work_bit(cp);
ret = cp->cp_conn->c_trans->recv_path(cp);
rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
switch (ret) {
case -EAGAIN:
rds_stats_inc(s_recv_immediate_retry);
- queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+ delay = 0;
break;
case -ENOMEM:
rds_stats_inc(s_recv_delayed_retry);
- queue_delayed_work(rds_wq, &cp->cp_recv_w, 2);
+ delay = 2;
break;
default:
- break;
+ return;
}
+ rds_cond_queue_recv_work(cp, delay);
}
}