@@ -50,15 +50,41 @@ enum srpc_state {
SRPC_STATE_STOPPING,
};
+enum rpc_counter_32 {
+ SRPC_ERROR,
+ SRPC_RPC_SENT,
+ SRPC_RPC_RCVD,
+ SRPC_RPC_DROP,
+ SRPC_RPC_EXPIRED,
+ SRPC_COUNTER32_MAX,
+};
+
+enum rpc_counter_64 {
+ SRPC_BULK_GET,
+ SRPC_BULK_PUT,
+ SRPC_COUNTER64_MAX,
+};
+
static struct smoketest_rpc {
- spinlock_t rpc_glock; /* global lock */
- struct srpc_service *rpc_services[SRPC_SERVICE_MAX_ID + 1];
- lnet_handler_t rpc_lnet_handler; /* _the_ LNet event queue */
- enum srpc_state rpc_state;
- struct srpc_counters rpc_counters;
- u64 rpc_matchbits; /* matchbits counter */
+ spinlock_t rpc_glock; /* global lock */
+ struct srpc_service *rpc_services[SRPC_SERVICE_MAX_ID + 1];
+ lnet_handler_t rpc_lnet_handler; /* _the_ LNet event queue */
+ enum srpc_state rpc_state;
+ struct srpc_counters rpc_counters;
+ atomic_t rpc_counters32[SRPC_COUNTER32_MAX];
+ atomic64_t rpc_counters64[SRPC_COUNTER64_MAX];
+ atomic64_t rpc_matchbits; /* matchbits counter */
} srpc_data;
+#define RPC_STAT32(a) \
+ srpc_data.rpc_counters32[(a)]
+
+#define GET_RPC_STAT32(a) \
+ atomic_read(&srpc_data.rpc_counters32[(a)])
+
+#define GET_RPC_STAT64(a) \
+ atomic64_read(&srpc_data.rpc_counters64[(a)])
+
static inline int
srpc_serv_portal(int svc_id)
{
@@ -69,18 +95,17 @@ enum srpc_state {
/* forward ref's */
void srpc_handle_rpc(struct swi_workitem *wi);
-void srpc_get_counters(struct srpc_counters *cnt)
-{
- spin_lock(&srpc_data.rpc_glock);
- *cnt = srpc_data.rpc_counters;
- spin_unlock(&srpc_data.rpc_glock);
-}
-void srpc_set_counters(const struct srpc_counters *cnt)
+void srpc_get_counters(struct srpc_counters *cnt)
{
- spin_lock(&srpc_data.rpc_glock);
- srpc_data.rpc_counters = *cnt;
- spin_unlock(&srpc_data.rpc_glock);
+ cnt->errors = GET_RPC_STAT32(SRPC_ERROR);
+ cnt->rpcs_sent = GET_RPC_STAT32(SRPC_RPC_SENT);
+ cnt->rpcs_rcvd = GET_RPC_STAT32(SRPC_RPC_RCVD);
+ cnt->rpcs_dropped = GET_RPC_STAT32(SRPC_RPC_DROP);
+ cnt->rpcs_expired = GET_RPC_STAT32(SRPC_RPC_EXPIRED);
+
+ cnt->bulk_get = GET_RPC_STAT64(SRPC_BULK_GET);
+ cnt->bulk_put = GET_RPC_STAT64(SRPC_BULK_PUT);
}
static int
@@ -162,12 +187,7 @@ struct srpc_bulk *
static inline u64
srpc_next_id(void)
{
- u64 id;
-
- spin_lock(&srpc_data.rpc_glock);
- id = srpc_data.rpc_matchbits++;
- spin_unlock(&srpc_data.rpc_glock);
- return id;
+ return atomic64_inc_return(&srpc_data.rpc_matchbits);
}
static void
@@ -922,11 +942,8 @@ struct srpc_bulk *
rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
swi_state2str(rpc->srpc_wi.swi_state), status);
- if (status) {
- spin_lock(&srpc_data.rpc_glock);
- srpc_data.rpc_counters.rpcs_dropped++;
- spin_unlock(&srpc_data.rpc_glock);
- }
+ if (status)
+ atomic_inc(&RPC_STAT32(SRPC_RPC_DROP));
if (rpc->srpc_done)
(*rpc->srpc_done) (rpc);
@@ -1096,9 +1113,7 @@ struct srpc_bulk *
spin_unlock(&rpc->crpc_lock);
- spin_lock(&srpc_data.rpc_glock);
- srpc_data.rpc_counters.rpcs_expired++;
- spin_unlock(&srpc_data.rpc_glock);
+ atomic_inc(&RPC_STAT32(SRPC_RPC_EXPIRED));
}
static void
@@ -1431,11 +1446,10 @@ struct srpc_client_rpc *
if (ev->status) {
u32 errors;
- spin_lock(&srpc_data.rpc_glock);
if (ev->status != -ECANCELED) /* cancellation is not error */
- srpc_data.rpc_counters.errors++;
- errors = srpc_data.rpc_counters.errors;
- spin_unlock(&srpc_data.rpc_glock);
+ errors = atomic_inc_return(&RPC_STAT32(SRPC_ERROR));
+ else
+ errors = atomic_read(&RPC_STAT32(SRPC_ERROR));
CNETERR("LNet event status %d type %d, RPC errors %u\n",
ev->status, ev->type, errors);
@@ -1449,11 +1463,9 @@ struct srpc_client_rpc *
rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
LBUG();
case SRPC_REQUEST_SENT:
- if (!ev->status && ev->type != LNET_EVENT_UNLINK) {
- spin_lock(&srpc_data.rpc_glock);
- srpc_data.rpc_counters.rpcs_sent++;
- spin_unlock(&srpc_data.rpc_glock);
- }
+ if (!ev->status && ev->type != LNET_EVENT_UNLINK)
+ atomic_inc(&RPC_STAT32(SRPC_RPC_SENT));
+
/* fall through */
case SRPC_REPLY_RCVD:
case SRPC_BULK_REQ_RCVD:
@@ -1566,9 +1578,7 @@ struct srpc_client_rpc *
spin_unlock(&scd->scd_lock);
- spin_lock(&srpc_data.rpc_glock);
- srpc_data.rpc_counters.rpcs_rcvd++;
- spin_unlock(&srpc_data.rpc_glock);
+ atomic_inc(&RPC_STAT32(SRPC_RPC_RCVD));
break;
case SRPC_BULK_GET_RPLD:
@@ -1581,14 +1591,14 @@ struct srpc_client_rpc *
/* fall through */
case SRPC_BULK_PUT_SENT:
if (!ev->status && ev->type != LNET_EVENT_UNLINK) {
- spin_lock(&srpc_data.rpc_glock);
+ atomic64_t *data;
if (rpcev->ev_type == SRPC_BULK_GET_RPLD)
- srpc_data.rpc_counters.bulk_get += ev->mlength;
+ data = &srpc_data.rpc_counters64[SRPC_BULK_GET];
else
- srpc_data.rpc_counters.bulk_put += ev->mlength;
+ data = &srpc_data.rpc_counters64[SRPC_BULK_PUT];
- spin_unlock(&srpc_data.rpc_glock);
+ atomic64_add(ev->mlength, data);
}
/* fall through */
case SRPC_REPLY_SENT:
@@ -1619,7 +1629,8 @@ struct srpc_client_rpc *
/* 1 second pause to avoid timestamp reuse */
schedule_timeout_uninterruptible(HZ);
- srpc_data.rpc_matchbits = ((u64)ktime_get_real_seconds()) << 48;
+ atomic64_set(&srpc_data.rpc_matchbits,
+ ((u64)ktime_get_real_seconds() << 48));
srpc_data.rpc_state = SRPC_STATE_NONE;
@@ -452,7 +452,6 @@ struct srpc_bulk *srpc_alloc_bulk(int cpt, unsigned int off,
int srpc_service_add_buffers(struct srpc_service *sv, int nbuffer);
void srpc_service_remove_buffers(struct srpc_service *sv, int nbuffer);
void srpc_get_counters(struct srpc_counters *cnt);
-void srpc_set_counters(const struct srpc_counters *cnt);
extern struct workqueue_struct *lst_serial_wq;
extern struct workqueue_struct **lst_test_wq;