@@ -110,17 +110,12 @@ struct ceph_msg_pos {
/*
* ceph_connection state bit flags
- *
- * QUEUED and BUSY are used together to ensure that only a single
- * thread is currently opening, reading or writing data to the socket.
*/
#define LOSSYTX 0 /* we can close channel or drop messages on errors */
#define CONNECTING 1
#define NEGOTIATING 2
#define KEEPALIVE_PENDING 3
#define WRITE_PENDING 4 /* we have data ready to send */
-#define QUEUED 5 /* there is work queued on this connection */
-#define BUSY 6 /* work is being done */
#define STANDBY 8 /* no outgoing messages, socket closed. we keep
* the ceph_connection around to maintain shared
* state with the peer. */
@@ -96,7 +96,7 @@ struct workqueue_struct *ceph_msgr_wq;
int ceph_msgr_init(void)
{
- ceph_msgr_wq = create_workqueue("ceph-msgr");
+ ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
if (!ceph_msgr_wq) {
pr_err("msgr_init failed to create workqueue\n");
return -ENOMEM;
@@ -1920,20 +1920,6 @@ bad_tag:
/*
* Atomically queue work on a connection. Bump @con reference to
* avoid races with connection teardown.
- *
- * There is some trickery going on with QUEUED and BUSY because we
- * only want a _single_ thread operating on each connection at any
- * point in time, but we want to use all available CPUs.
- *
- * The worker thread only proceeds if it can atomically set BUSY. It
- * clears QUEUED and does it's thing. When it thinks it's done, it
- * clears BUSY, then rechecks QUEUED.. if it's set again, it loops
- * (tries again to set BUSY).
- *
- * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we
- * try to queue work. If that fails (work is already queued, or BUSY)
- * we give up (work also already being done or is queued) but leave QUEUED
- * set so that the worker thread will loop if necessary.
*/
static void queue_con(struct ceph_connection *con)
{
@@ -1948,11 +1934,7 @@ static void queue_con(struct ceph_connection *con)
return;
}
- set_bit(QUEUED, &con->state);
- if (test_bit(BUSY, &con->state)) {
- dout("queue_con %p - already BUSY\n", con);
- con->ops->put(con);
- } else if (!queue_work(ceph_msgr_wq, &con->work.work)) {
+ if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
dout("queue_con %p - already queued\n", con);
con->ops->put(con);
} else {
@@ -1967,15 +1949,6 @@ static void con_work(struct work_struct *work)
{
struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work);
- int backoff = 0;
-
-more:
- if (test_and_set_bit(BUSY, &con->state) != 0) {
- dout("con_work %p BUSY already set\n", con);
- goto out;
- }
- dout("con_work %p start, clearing QUEUED\n", con);
- clear_bit(QUEUED, &con->state);
mutex_lock(&con->mutex);
@@ -1994,28 +1967,13 @@ more:
try_read(con) < 0 ||
try_write(con) < 0) {
mutex_unlock(&con->mutex);
- backoff = 1;
ceph_fault(con); /* error/fault path */
goto done_unlocked;
}
done:
mutex_unlock(&con->mutex);
-
done_unlocked:
- clear_bit(BUSY, &con->state);
- dout("con->state=%lu\n", con->state);
- if (test_bit(QUEUED, &con->state)) {
- if (!backoff || test_bit(OPENING, &con->state)) {
- dout("con_work %p QUEUED reset, looping\n", con);
- goto more;
- }
- dout("con_work %p QUEUED reset, but just faulted\n", con);
- clear_bit(QUEUED, &con->state);
- }
- dout("con_work %p done\n", con);
-
-out:
con->ops->put(con);
}