@@ -148,7 +148,7 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd,
e = &rb->events[rb->ring_cnt];
memset(e, 0, sizeof(*e));
- e->events = EPOLLIN;
+ e->events = EPOLLIN|EPOLLET;
e->data.fd = rb->ring_cnt;
if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, e) < 0) {
err = -errno;
@@ -260,7 +260,19 @@ static int64_t ringbuf_process_ring(struct ring *r)
cnt++;
}
- smp_store_release(r->consumer_pos, cons_pos);
+ /* This ordering is critical to ensure that an epoll
+ * notification gets sent in the case where the next
+ * iteration of this loop discovers that the consumer is
+ * caught up. If this store were performed using
+ * RELEASE, it'd be possible for the consumer to fail to
+ * see an updated producer position and for that
+ * producer to fail to see this write. By making this
+ * write SEQ_CST, we know that either the newly produced
+ * message will be visible to theconsumer, or the
+ * producer will discover that the consumer is caught
+ * up, and will ensure a notification is sent.
+ */
+ __atomic_store_n(r->consumer_pos, cons_pos, __ATOMIC_SEQ_CST);
}
} while (got_new_data);
done: