@@ -33,7 +33,9 @@
#include "PacketTimes.h"
#include "Switches.h"
-TicksTime s_startTime, s_endTime;
+extern CRITICAL_SECTION thread_exit_lock;
+extern os_thread_t *thread_pid_array;
+TicksTime g_c_startTime, g_c_endTime;
//==============================================================================
//==============================================================================
@@ -119,7 +121,7 @@ void client_statistics(int serverNo, Message *pMsgRequest)
/* Print total statistic that is independent on server count */
if (SERVER_NO == 0) {
- TicksDuration totalRunTime = s_endTime - s_startTime;
+ TicksDuration totalRunTime = g_c_endTime - g_c_startTime;
if (g_skipCount) {
log_msg_file2(f, "[Total Run] RunTime=%.3lf sec; Warm up time=%" PRIu32 " msec; SentMessages=%" PRIu64 "; ReceivedMessages=%" PRIu64 "; SkippedMessages=%" PRIu64 "",
totalRunTime.toDecimalUsec()/1000000, g_pApp->m_const_params.warmup_msec, sendCount, receiveCount, g_skipCount);
@@ -261,24 +263,34 @@ void client_statistics(int serverNo, Message *pMsgRequest)
}
//------------------------------------------------------------------------------
-void stream_statistics(Message *pMsgRequest)
+void stream_statistics(struct handler_info *p_info)
{
- TicksDuration totalRunTime = s_endTime - s_startTime;
+ TicksDuration totalRunTime = p_info->c_endTime - p_info->c_startTime;
+ uint64_t sendCount = p_info->sendCount;
+ char prefix[20];
+
+ if (g_pApp->m_const_params.mthread) {
+ if (p_info->id)
+ snprintf(prefix, sizeof prefix, "[TID: %d] ", p_info->id);
+ else
+ snprintf(prefix, sizeof prefix, "[TID: ALL] ");
+ }
+ else {
+ prefix[0] = '\0';
+ }
if (totalRunTime <= TicksDuration::TICKS0) return;
if (!g_pApp->m_const_params.b_stream) return;
- const uint64_t sendCount = pMsgRequest->getSequenceCounter();
-
// Send only mode!
if (g_skipCount) {
- log_msg("Total of %" PRIu64 " messages sent in %.3lf sec (%" PRIu64 " messages skipped)\n",
- sendCount, totalRunTime.toDecimalUsec()/1000000, g_skipCount);
+ log_msg("%sTotal of %" PRIu64 " messages sent in %.3lf sec (%" PRIu64 " messages skipped)\n",
+ prefix, sendCount, totalRunTime.toDecimalUsec()/1000000, g_skipCount);
}
else
{
- log_msg("Total of %" PRIu64 " messages sent in %.3lf sec\n",
- sendCount, totalRunTime.toDecimalUsec()/1000000);
+ log_msg("%sTotal of %" PRIu64 " messages sent in %.3lf sec\n",
+ prefix, sendCount, totalRunTime.toDecimalUsec()/1000000);
}
if (g_pApp->m_const_params.mps != MPS_MAX) {
if (g_pApp->m_const_params.msg_size_range)
@@ -308,17 +320,17 @@ void stream_statistics(Message *pMsgRequest)
int total_line_ip_data = g_pApp->m_const_params.msg_size;
double MBps = ((double)msgps * total_line_ip_data)/1024/1024; /* No including IP + UDP Headers per fragment */
if (ip_frags_per_msg == 1)
- log_msg("Summary: Message Rate is %d [msg/sec]", msgps);
+ log_msg("%sSummary: Message Rate is %d [msg/sec]", prefix, msgps);
else
- log_msg("Summary: Message Rate is %d [msg/sec], Packet Rate is about %d [pkt/sec] (%d ip frags / msg)", msgps, pktps, ip_frags_per_msg);
+ log_msg("%sSummary: Message Rate is %d [msg/sec], Packet Rate is about %d [pkt/sec] (%d ip frags / msg)", prefix, msgps, pktps, ip_frags_per_msg);
if (g_pApp->m_const_params.giga_size){
- log_msg("Summary: BandWidth is %.3f GBps (%.3f Gbps)", MBps/1000, MBps*8/1000);
+ log_msg("%sSummary: BandWidth is %.3f GBps (%.3f Gbps)", prefix, MBps/1000, MBps*8/1000);
}
else if (g_pApp->m_const_params.increase_output_precision){
- log_msg("Summary: BandWidth is %.9f GBps (%.9f Gbps)", MBps, MBps*8);
+ log_msg("%sSummary: BandWidth is %.9f GBps (%.9f Gbps)", prefix, MBps, MBps*8);
}
else{
- log_msg("Summary: BandWidth is %.3f MBps (%.3f Mbps)", MBps, MBps*8);
+ log_msg("%sSummary: BandWidth is %.3f MBps (%.3f Mbps)", prefix, MBps, MBps*8);
}
}
@@ -329,7 +341,7 @@ void client_sig_handler(int signum)
log_msg("Test end (interrupted by signal %d)", signum);
return;
}
- s_endTime.setNowNonInline();
+ g_c_endTime.setNowNonInline();
g_b_exit = true;
// Just in case not Activity updates where logged add a '\n'
@@ -370,13 +382,15 @@ ClientBase::~ClientBase()
delete m_pMsgRequest;
}
+
//------------------------------------------------------------------------------
template <class IoType, class SwitchDataIntegrity, class SwitchActivityInfo, class SwitchCycleDuration, class SwitchMsgSize , class PongModeCare >
-Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize , PongModeCare>::Client(int _fd_min, int _fd_max, int _fd_num):
+Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize , PongModeCare>::Client(struct handler_info *_p_info):
ClientBase(),
- m_ioHandler(_fd_min, _fd_max, _fd_num),
+ m_ioHandler(_p_info->fd_min, _p_info->fd_max, _p_info->fd_num),
m_pongModeCare(m_pMsgRequest)
{
+ p_info = _p_info;
os_thread_init (&m_receiverTid);
}
@@ -408,6 +422,8 @@ template <class IoType, class SwitchDataIntegrity, class SwitchActivityInfo, cla
void Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize , PongModeCare>
::cleanupAfterLoop()
{
+ p_info->c_endTime.setNowNonInline();
+
usleep(100*1000);//0.1 sec - wait for rx packets for last sends (in normal flow)
if (m_receiverTid.tid) {
os_thread_kill(&m_receiverTid);
@@ -426,7 +442,9 @@ void Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration
}
else if (g_pApp->m_const_params.b_stream)
{
- stream_statistics(m_pMsgRequest);
+ p_info->sendCount = m_pMsgRequest->getSequenceCounter();
+
+ stream_statistics(p_info);
}
else
{
@@ -581,9 +599,12 @@ int Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration,
}
if (rc == SOCKPERF_ERR_NONE) {
- s_startTime.setNowNonInline();
- g_lastTicks = s_startTime;
- g_cycleStartTime = s_startTime - g_pApp->m_const_params.cycleDuration;
+ p_info->c_startTime.setNowNonInline();
+ if (g_c_startTime == TicksTime::TICKS0) {
+ g_c_startTime = p_info->c_startTime;
+ g_lastTicks = p_info->c_startTime;
+ g_cycleStartTime = p_info->c_startTime - g_pApp->m_const_params.cycleDuration;
+ }
}
}
}
@@ -635,7 +656,7 @@ void Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration
::doPlayback()
{
usleep(100*1000);//wait for receiver thread to start (since we don't use warmup) //TODO: configure!
- s_startTime.setNowNonInline();//reduce code size by calling non inline func from slow path
+ p_info->c_startTime.setNowNonInline();//reduce code size by calling non inline func from slow path
const PlaybackVector &pv = * g_pApp->m_const_params.pPlaybackVector;
size_t i = 0;
@@ -655,7 +676,7 @@ void Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration
m_switchActivityInfo.execute(m_pMsgRequest->getSequenceCounter());
}
g_cycle_wait_loop_counter++; //for silenting waring at the end
- s_endTime.setNowNonInline();//reduce code size by calling non inline func from slow path
+ p_info->c_endTime.setNowNonInline();//reduce code size by calling non inline func from slow path
usleep(20*1000);//wait for reply of last packet //TODO: configure!
g_b_exit = true;
}
@@ -684,61 +705,61 @@ void Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration
//------------------------------------------------------------------------------
template <class IoType, class SwitchDataIntegrity, class SwitchActivityInfo, class SwitchCycleDuration, class SwitchMsgSize, class PongModeCare>
-void client_handler(int _fd_min, int _fd_max, int _fd_num) {
- Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize, PongModeCare> c(_fd_min, _fd_max, _fd_num);
+void client_handler(struct handler_info *_p_info) {
+ Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize, PongModeCare> c(_p_info);
c.doHandler();
}
//------------------------------------------------------------------------------
template <class IoType, class SwitchDataIntegrity, class SwitchActivityInfo, class SwitchCycleDuration, class SwitchMsgSize>
-void client_handler(int _fd_min, int _fd_max, int _fd_num) {
+void client_handler(struct handler_info *_p_info) {
if (g_pApp->m_const_params.b_stream)
- client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize, PongModeNever> (_fd_min, _fd_max, _fd_num);
+ client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize, PongModeNever> (_p_info);
else if (g_pApp->m_const_params.reply_every == 1)
- client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize, PongModeAlways> (_fd_min, _fd_max, _fd_num);
+ client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize, PongModeAlways> (_p_info);
else
- client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize, PongModeNormal> (_fd_min, _fd_max, _fd_num);
+ client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize, PongModeNormal> (_p_info);
}
//------------------------------------------------------------------------------
template <class IoType, class SwitchDataIntegrity, class SwitchActivityInfo, class SwitchCycleDuration>
-void client_handler(int _fd_min, int _fd_max, int _fd_num) {
+void client_handler(struct handler_info *_p_info) {
if (g_pApp->m_const_params.msg_size_range > 0)
- client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchOnMsgSize> (_fd_min, _fd_max, _fd_num);
+ client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchOnMsgSize> (_p_info);
else
- client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchOff> (_fd_min, _fd_max, _fd_num);
+ client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchOff> (_p_info);
}
//------------------------------------------------------------------------------
template <class IoType, class SwitchDataIntegrity, class SwitchActivityInfo>
-void client_handler(int _fd_min, int _fd_max, int _fd_num) {
+void client_handler(struct handler_info *_p_info) {
if (g_pApp->m_const_params.cycleDuration > TicksDuration::TICKS0) {
if (g_pApp->m_const_params.dummy_mps) {
- client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchOnDummySend> (_fd_min, _fd_max, _fd_num);
+ client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchOnDummySend> (_p_info);
} else {
- client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchOnCycleDuration> (_fd_min, _fd_max, _fd_num);
+ client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchOnCycleDuration> (_p_info);
}
}
else
- client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchOff> (_fd_min, _fd_max, _fd_num);
+ client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchOff> (_p_info);
}
//------------------------------------------------------------------------------
template <class IoType, class SwitchDataIntegrity>
-void client_handler(int _fd_min, int _fd_max, int _fd_num) {
+void client_handler(struct handler_info *_p_info) {
if (g_pApp->m_const_params.packetrate_stats_print_ratio > 0)
- client_handler<IoType, SwitchDataIntegrity, SwitchOnActivityInfo> (_fd_min, _fd_max, _fd_num);
+ client_handler<IoType, SwitchDataIntegrity, SwitchOnActivityInfo> (_p_info);
else
- client_handler<IoType, SwitchDataIntegrity, SwitchOff> (_fd_min, _fd_max, _fd_num);
+ client_handler<IoType, SwitchDataIntegrity, SwitchOff> (_p_info);
}
//------------------------------------------------------------------------------
template <class IoType>
-void client_handler(int _fd_min, int _fd_max, int _fd_num) {
+void client_handler(struct handler_info *_p_info) {
if (g_pApp->m_const_params.data_integrity)
- client_handler<IoType, SwitchOnDataIntegrity> (_fd_min, _fd_max, _fd_num);
+ client_handler<IoType, SwitchOnDataIntegrity> (_p_info);
else
- client_handler<IoType, SwitchOff> (_fd_min, _fd_max, _fd_num);
+ client_handler<IoType, SwitchOff> (_p_info);
}
//------------------------------------------------------------------------------
@@ -748,29 +769,29 @@ void client_handler(handler_info *p_info)
switch (g_pApp->m_const_params.fd_handler_type) {
case SELECT:
{
- client_handler<IoSelect> (p_info->fd_min, p_info->fd_max, p_info->fd_num);
+ client_handler<IoSelect> (p_info);
break;
}
case RECVFROM:
{
- client_handler<IoRecvfrom> (p_info->fd_min, p_info->fd_max, p_info->fd_num);
+ client_handler<IoRecvfrom> (p_info);
break;
}
case RECVFROMMUX:
{
- client_handler<IoRecvfromMUX> (p_info->fd_min, p_info->fd_max, p_info->fd_num);
+ client_handler<IoRecvfromMUX> (p_info);
break;
}
#ifndef WIN32
case POLL:
{
- client_handler<IoPoll> (p_info->fd_min, p_info->fd_max, p_info->fd_num);
+ client_handler<IoPoll> (p_info);
break;
}
#ifndef __FreeBSD__
case EPOLL:
{
- client_handler<IoEpoll> (p_info->fd_min, p_info->fd_max, p_info->fd_num);
+ client_handler<IoEpoll> (p_info);
break;
}
#endif
@@ -783,3 +804,26 @@ void client_handler(handler_info *p_info)
}
}
}
+
+void *client_handler_multi_thread(void *arg)
+{
+ struct handler_info *p_info = (handler_info *)arg;
+
+ if (p_info) {
+ client_handler(p_info);
+
+ /* Mark this thread as complete (the first index is reserved for main thread) */
+ {
+ int i = p_info->id + 1;
+ if (p_info->id < g_pApp->m_const_params.threads_num) {
+ if (thread_pid_array && thread_pid_array[i].tid && (thread_pid_array[i].tid == os_getthread().tid)) {
+ ENTER_CRITICAL(&thread_exit_lock);
+ thread_pid_array[i].tid = 0;
+ LEAVE_CRITICAL(&thread_exit_lock);
+ }
+ }
+ }
+ }
+
+ return 0;
+}
@@ -53,6 +53,7 @@ private:
os_thread_t m_receiverTid;
IoType m_ioHandler;
addr_to_id m_ServerList;
+ struct handler_info *p_info;
SwitchDataIntegrity m_switchDataIntegrity;
SwitchActivityInfo m_switchActivityInfo;
@@ -61,7 +62,7 @@ private:
PongModeCare m_pongModeCare; // has msg_sendto() method and can be one of: PongModeNormal, PongModeAlways, PongModeNever
public:
- Client(int _fd_min, int _fd_max, int _fd_num);
+ Client(struct handler_info *_p_info);
virtual ~Client();
void doHandler();
void client_receiver_thread();
@@ -467,6 +467,14 @@ typedef struct handler_info {
int fd_min; /**< minimum descriptor (fd) */
int fd_max; /**< maximum socket descriptor (fd) */
int fd_num; /**< number of socket descriptors */
+ /* These are all of the stats relevant to a single thread's streaming
+ * I/O performance. When running a throughput test as client and
+ * running in multiple threads, we sum these up to across the
+ * threads to get a total
+ */
+ uint64_t sendCount;
+ TicksTime c_startTime;
+ TicksTime c_endTime;
}handler_info;
typedef struct clt_session_info {
@@ -91,8 +91,10 @@ CRITICAL_SECTION thread_exit_lock;
os_thread_t *thread_pid_array = NULL;
// forward declarations from Client.cpp & Server.cpp
+extern void stream_statistics(struct handler_info *p_info);
extern void client_sig_handler(int signum);
extern void client_handler(handler_info *);
+extern void *client_handler_multi_thread(void *);
extern void server_sig_handler(int signum);
extern void server_handler(handler_info *);
extern void *server_handler_multi_thread(void *);
@@ -362,8 +364,8 @@ void select_per_thread(void *(handler)(void *), int _fd_num) {
int last_fds = 0;
handler_info *handler_info_array = NULL;
- handler_info_array = (handler_info*)MALLOC(sizeof(handler_info) * g_pApp->m_const_params.threads_num);
- memset(handler_info_array, 0, sizeof(handler_info) * g_pApp->m_const_params.threads_num);
+ handler_info_array = (handler_info*)MALLOC(sizeof(handler_info) * (g_pApp->m_const_params.threads_num + 1));
+ memset(handler_info_array, 0, sizeof(handler_info) * (g_pApp->m_const_params.threads_num + 1));
if (!handler_info_array) {
log_err("Failed to allocate memory for handler_info_arr");
rc = SOCKPERF_ERR_NO_MEMORY;
@@ -390,7 +392,7 @@ void select_per_thread(void *(handler)(void *), int _fd_num) {
num_of_remainded_fds = _fd_num % g_pApp->m_const_params.threads_num;
fd_num = _fd_num / g_pApp->m_const_params.threads_num;
- for (i = 0; i < g_pApp->m_const_params.threads_num; i++) {
+ for (i = 1; i <= g_pApp->m_const_params.threads_num; i++) {
handler_info *cur_handler_info = (handler_info_array + i);
/* Set ID of handler (thread) */
@@ -420,7 +422,7 @@ void select_per_thread(void *(handler)(void *), int _fd_num) {
rc = SOCKPERF_ERR_FATAL;
break;
}
- thread_pid_array[i + 1].tid = thread.tid;
+ thread_pid_array[i].tid = thread.tid;
last_fds = cur_handler_info->fd_max + 1;
}
@@ -429,6 +431,17 @@ void select_per_thread(void *(handler)(void *), int _fd_num) {
sleep(1);
}
+ /* If we are stopped by a timer, we need to wait for the
+ * sending threads to complete and fill out their p_info
+ * structs or our totals are off. We might have been the
+ * thread that took the timer interrupt, and os_thread_join
+ * below isn't waiting for us to get results, so just sleep
+ * for a little extra time
+ */
+ if (g_pApp->m_const_params.b_stream) {
+ sleep(1);
+ }
+
/* Stop all launched threads (the first index is reserved for main thread) */
for (i = 1; i <= g_pApp->m_const_params.threads_num; i++) {
os_thread_t cur_thread_pid;
@@ -448,6 +461,31 @@ void select_per_thread(void *(handler)(void *), int _fd_num) {
DELETE_CRITICAL(&thread_exit_lock);
}
+ /* Print out stream stats for all threads combined */
+ if (g_pApp->m_const_params.b_stream && g_pApp->m_const_params.mthread &&
+ g_pApp->m_const_params.threads_num > 1) {
+ struct handler_info *p0 = handler_info_array;
+ struct handler_info *t;
+ int threads = g_pApp->m_const_params.threads_num;
+ TicksDuration threadRunTime, totalRunTime;
+
+ totalRunTime = TicksDuration::TICKS0;
+ /* Sum up the totals fields */
+ for (i = 1; i <= threads; i++) {
+ t = handler_info_array + i;
+ p0->sendCount += t->sendCount;
+ threadRunTime = t->c_endTime - t->c_startTime;
+ totalRunTime += threadRunTime;
+ }
+ /* average out the runtimes across the threads */
+ totalRunTime /= threads;
+ p0->c_startTime = t->c_startTime;
+ p0->c_endTime = t->c_startTime + totalRunTime;
+
+ /* print it out */
+ stream_statistics(p0);
+ }
+
/* Free thread info allocated data */
if (handler_info_array) {
FREE(handler_info_array);
@@ -3471,7 +3509,12 @@ void do_test()
#endif
switch (s_user_params.mode) {
case MODE_CLIENT:
- client_handler(&info);
+ if (s_user_params.b_stream && s_user_params.mthread) {
+ select_per_thread(client_handler_multi_thread, fd_num);
+ }
+ else {
+ client_handler(&info);
+ }
break;
case MODE_SERVER:
if (s_user_params.mthread) {
We add fields to the handler_info struct and use that to store the results of individual threads since the thread destructor would otherwise delete our performance data before we could sum it up for a total. Otherwise, we reuse most of the implementation of the server thread code. Signed-off-by: Doug Ledford <dledford@redhat.com> --- src/Client.cpp | 140 ++++++++++++++++++++++++++++++++++++------------------- src/Client.h | 3 +- src/Defs.h | 8 ++++ src/SockPerf.cpp | 53 +++++++++++++++++++-- 4 files changed, 150 insertions(+), 54 deletions(-)