@@ -1875,7 +1875,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
while (left > 0) {
// wait for data
- if (tcp_wait(sd, messenger->timeout) < 0)
+ if (tcp_read_wait(sd, messenger->timeout) < 0)
goto out_dethrottle;
// get a buffer
@@ -13,9 +13,6 @@ int tcp_read(int sd, char *buf, int len, int timeout)
if (sd < 0)
return -1;
- struct pollfd pfd;
- pfd.fd = sd;
- pfd.events = POLLIN | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR;
while (len > 0) {
if (g_conf.ms_inject_socket_failures && sd >= 0) {
@@ -25,24 +22,14 @@ int tcp_read(int sd, char *buf, int len, int timeout)
}
}
- if (poll(&pfd, 1, timeout) <= 0)
+ if (tcp_read_wait(sd, timeout) < 0)
return -1;
- if (!(pfd.revents & POLLIN))
- return -1;
+ int got = tcp_read_nonblocking(sd, buf, len);
- /*
- * although we turn on the MSG_DONTWAIT flag, we don't expect
- * receivng an EAGAIN, as we polled on the socket, so there
- * should be data waiting for us.
- */
- int got = ::recv( sd, buf, len, MSG_DONTWAIT );
- if (got <= 0) {
- //char buf[100];
- //generic_dout(0) << "tcp_read socket " << sd << " returned " << got
- //<< " errno " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
+ if (got < 0)
return -1;
- }
+
len -= got;
buf += got;
//generic_dout(DBL) << "tcp_read got " << got << ", " << len << " left" << dendl;
@@ -50,26 +37,51 @@ int tcp_read(int sd, char *buf, int len, int timeout)
return len;
}
-int tcp_wait(int sd, int timeout)
+int tcp_read_wait(int sd, int timeout)
{
if (sd < 0)
return -1;
struct pollfd pfd;
pfd.fd = sd;
- pfd.events = POLLIN | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR;
+ pfd.events = POLLIN | POLLRDHUP;
if (poll(&pfd, 1, timeout) <= 0)
return -1;
+ if (pfd.revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL))
+ return -1;
+
if (!(pfd.revents & POLLIN))
return -1;
return 0;
}
+/* This function can only be called if poll/select says there is
+ * data available. Otherwise we cannot properly interpret a
+ * read of 0 bytes.
+ */
int tcp_read_nonblocking(int sd, char *buf, int len)
{
- return ::recv(sd, buf, len, MSG_DONTWAIT);
+again:
+ int got = ::recv( sd, buf, len, MSG_DONTWAIT );
+ if (got < 0) {
+ if (errno == EAGAIN || errno == EINTR) {
+ goto again;
+ } else {
+ char buf[100];
+ generic_dout(10) << "tcp_read_nonblocking socket " << sd << " returned "
+ << got << " errno " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
+ return -1;
+ }
+ } else if (got == 0) {
+ /* poll() said there was data, but we didn't read any - peer
+ * sent a FIN. Maybe POLLRDHUP signals this, but this is
+ * standard socket behavior as documented by Stevens.
+ */
+ return -1;
+ }
+ return got;
}
int tcp_write(int sd, const char *buf, int len)
@@ -26,7 +26,7 @@ inline ostream& operator<<(ostream& out, const sockaddr_storage &ss)
}
extern int tcp_read(int sd, char *buf, int len, int timeout=-1);
-extern int tcp_wait(int sd, int timeout);
+extern int tcp_read_wait(int sd, int timeout);
extern int tcp_read_nonblocking(int sd, char *buf, int len);
extern int tcp_write(int sd, const char *buf, int len);