@@ -1750,6 +1750,20 @@ Message *SimpleMessenger::Pipe::read_message()
unsigned data_off = le32_to_cpu(header.data_off);
if (data_len) {
int left = data_len;
+ int throttled = 0;
+
+ while (buffer_total_alloc.read() + data_len > (64*1024*1024)) {
+ struct timespec sleepy_time = {0, (1000 * 1000)}; // 1 msec
+ if (!throttled)
+ dout(4) << " pipe reader paused" << "; buffer_total_alloc "
+ << buffer_total_alloc.read() << dendl;
+ ::nanosleep(&sleepy_time, 0);
+ throttled++;
+ }
+ if (throttled)
+ dout(4) << " pipe reader unpaused; buffer_total_alloc "
+ << buffer_total_alloc.read() << dendl;
+
if (data_off & ~PAGE_MASK) {
// head
int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),