Skip to content

Commit

Permalink
connection: adding watermarks to the read buffer. (#11170)
Browse files Browse the repository at this point in the history
Fixing an issue where every time a connection was readDisabled/readEnabled it would read from the socket, even if the buffer already contained sufficient data it should have triggered push back.

Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
  • Loading branch information
alyssawilk authored May 20, 2020
1 parent 72b930b commit 77cca6b
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 27 deletions.
3 changes: 3 additions & 0 deletions source/common/buffer/watermark_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class WatermarkBuffer : public OwnedImpl {
void setWatermarks(uint32_t watermark) { setWatermarks(watermark / 2, watermark); }
void setWatermarks(uint32_t low_watermark, uint32_t high_watermark);
uint32_t highWatermark() const { return high_watermark_; }
// Returns true if the high watermark callbacks have been called more recently
// than the low watermark callbacks.
bool highWatermarkTriggered() const { return above_high_watermark_called_; }

private:
void checkHighWatermark();
Expand Down
77 changes: 57 additions & 20 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt
: ConnectionImplBase(dispatcher, next_global_id_++),
transport_socket_(std::move(transport_socket)), socket_(std::move(socket)),
stream_info_(stream_info), filter_manager_(*this),
read_buffer_([this]() -> void { this->onReadBufferLowWatermark(); },
[this]() -> void { this->onReadBufferHighWatermark(); }),
write_buffer_(dispatcher.getWatermarkFactory().create(
[this]() -> void { this->onWriteBufferLowWatermark(); },
[this]() -> void { this->onWriteBufferHighWatermark(); })),
Expand Down Expand Up @@ -186,8 +188,13 @@ Connection::State ConnectionImpl::state() const {

void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); }

bool ConnectionImpl::consumerWantsToRead() {
return read_disable_count_ == 0 ||
(read_disable_count_ == 1 && read_buffer_.highWatermarkTriggered());
}

void ConnectionImpl::closeSocket(ConnectionEvent close_type) {
if (!ioHandle().isOpen()) {
if (!ConnectionImpl::ioHandle().isOpen()) {
return;
}

Expand Down Expand Up @@ -216,7 +223,8 @@ void ConnectionImpl::closeSocket(ConnectionEvent close_type) {

socket_->close();

raiseEvent(close_type);
// Call the base class directly as close() is called in the destructor.
ConnectionImpl::raiseEvent(close_type);
}

void ConnectionImpl::noDelay(bool enable) {
Expand Down Expand Up @@ -268,7 +276,7 @@ void ConnectionImpl::noDelay(bool enable) {
}

void ConnectionImpl::onRead(uint64_t read_buffer_size) {
if (read_disable_count_ != 0 || inDelayedClose()) {
if (inDelayedClose() || !consumerWantsToRead()) {
return;
}
ASSERT(ioHandle().isOpen());
Expand Down Expand Up @@ -311,8 +319,8 @@ void ConnectionImpl::readDisable(bool disable) {
ASSERT(state() == State::Open);
ASSERT(file_event_ != nullptr);

ENVOY_CONN_LOG(trace, "readDisable: enabled={} disable_count={} state={}", *this,
read_disable_count_, disable, static_cast<int>(state()));
ENVOY_CONN_LOG(trace, "readDisable: disable={} disable_count={} state={} buffer_length={}", *this,
disable, read_disable_count_, static_cast<int>(state()), read_buffer_.length());

// When we disable reads, we still allow for early close notifications (the equivalent of
// EPOLLRDHUP for an epoll backend). For backends that support it, this allows us to apply
Expand Down Expand Up @@ -341,25 +349,26 @@ void ConnectionImpl::readDisable(bool disable) {
file_event_->setEnabled(Event::FileReadyType::Write);
}
} else {
ASSERT(read_disable_count_ != 0);
--read_disable_count_;
if (read_disable_count_ != 0) {
// The socket should stay disabled.
return;
}
if (state() != State::Open || file_event_ == nullptr) {
// If readDisable is called on a closed connection, do not crash.
return;
}

// We never ask for both early close and read at the same time. If we are reading, we want to
// consume all available data.
file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write);
// If the connection has data buffered there's no guarantee there's also data in the kernel
// which will kick off the filter chain. Instead fake an event to make sure the buffered data
// gets processed regardless and ensure that we dispatch it via onRead.
if (read_buffer_.length() > 0) {
if (read_disable_count_ == 0) {
// We never ask for both early close and read at the same time. If we are reading, we want to
// consume all available data.
file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write);
}

if (consumerWantsToRead() && read_buffer_.length() > 0) {
// If the connection has data buffered there's no guarantee there's also data in the kernel
// which will kick off the filter chain. Alternately if the read buffer has data the fd could
// be read disabled. To handle these cases, fake an event to make sure the buffered data gets
// processed regardless and ensure that we dispatch it via onRead.
dispatch_buffered_data_ = true;
file_event_->activate(Event::FileReadyType::Read);
setReadBufferReady();
}
}
}
Expand Down Expand Up @@ -465,6 +474,21 @@ void ConnectionImpl::setBufferLimits(uint32_t limit) {
// would result in respecting the exact buffer limit.
if (limit > 0) {
static_cast<Buffer::WatermarkBuffer*>(write_buffer_.get())->setWatermarks(limit + 1);
read_buffer_.setWatermarks(limit + 1);
}
}

void ConnectionImpl::onReadBufferLowWatermark() {
ENVOY_CONN_LOG(debug, "onBelowReadBufferLowWatermark", *this);
if (state() == State::Open) {
readDisable(false);
}
}

void ConnectionImpl::onReadBufferHighWatermark() {
ENVOY_CONN_LOG(debug, "onAboveReadBufferHighWatermark", *this);
if (state() == State::Open) {
readDisable(true);
}
}

Expand Down Expand Up @@ -525,10 +549,24 @@ void ConnectionImpl::onFileEvent(uint32_t events) {
}

void ConnectionImpl::onReadReady() {
ENVOY_CONN_LOG(trace, "read ready", *this);
ENVOY_CONN_LOG(trace, "read ready. dispatch_buffered_data={}", *this, dispatch_buffered_data_);
const bool latched_dispatch_buffered_data = dispatch_buffered_data_;
dispatch_buffered_data_ = false;

ASSERT(!connecting_);

// We get here while read disabled in two ways.
// 1) There was a call to setReadBufferReady(), for example if a raw buffer socket ceded due to
// shouldDrainReadBuffer(). In this case we defer the event until the socket is read enabled.
// 2) The consumer of connection data called readDisable(true), and instead of reading from the
// socket we simply need to dispatch already read data.
if (read_disable_count_ != 0) {
if (latched_dispatch_buffered_data && consumerWantsToRead()) {
onRead(read_buffer_.length());
}
return;
}

IoResult result = transport_socket_->doRead(read_buffer_);
uint64_t new_buffer_size = read_buffer_.length();
updateReadBufferStats(result.bytes_processed_, new_buffer_size);
Expand All @@ -542,13 +580,12 @@ void ConnectionImpl::onReadReady() {

read_end_stream_ |= result.end_stream_read_;
if (result.bytes_processed_ != 0 || result.end_stream_read_ ||
(dispatch_buffered_data_ && read_buffer_.length() > 0)) {
(latched_dispatch_buffered_data && read_buffer_.length() > 0)) {
// Skip onRead if no bytes were processed unless we explicitly want to force onRead for
// buffered data. For instance, skip onRead if the connection was closed without producing
// more data.
onRead(new_buffer_size);
}
dispatch_buffered_data_ = false;

// The read callback may have already closed the connection.
if (result.action_ == PostIoAction::Close || bothSidesHalfClosed()) {
Expand Down
19 changes: 16 additions & 3 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
}

// Network::TransportSocketCallbacks
IoHandle& ioHandle() override { return socket_->ioHandle(); }
IoHandle& ioHandle() final { return socket_->ioHandle(); }
const IoHandle& ioHandle() const override { return socket_->ioHandle(); }
Connection& connection() override { return *this; }
void raiseEvent(ConnectionEvent event) override;
void raiseEvent(ConnectionEvent event) final;
// Should the read buffer be drained?
bool shouldDrainReadBuffer() override {
return read_buffer_limit_ > 0 && read_buffer_.length() >= read_buffer_limit_;
Expand All @@ -122,11 +122,22 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
static uint64_t nextGlobalIdForTest() { return next_global_id_; }

protected:
// A convenience function which returns true if
// 1) The read disable count is zero or
// 2) The read disable count is one due to the read buffer being overrun.
// In either case the consumer of the data would like to read from the buffer.
// If the read count is greater than one, or equal to one when the buffer is
// not overrun, then the consumer of the data has called readDisable, and does
// not want to read.
bool consumerWantsToRead();

// Network::ConnectionImplBase
void closeConnectionImmediately() override;

void closeSocket(ConnectionEvent close_type);

void onReadBufferLowWatermark();
void onReadBufferHighWatermark();
void onWriteBufferLowWatermark();
void onWriteBufferHighWatermark();

Expand All @@ -135,7 +146,9 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
StreamInfo::StreamInfo& stream_info_;
FilterManagerImpl filter_manager_;

Buffer::OwnedImpl read_buffer_;
// Ensure that if the consumer of the data from this connection isn't
// consuming, that the connection eventually stops reading from the wire.
Buffer::WatermarkBuffer read_buffer_;
// This must be a WatermarkBuffer, but as it is created by a factory the ConnectionImpl only has
// a generic pointer.
// It MUST be defined after the filter_manager_ as some filters may have callbacks that
Expand Down
134 changes: 130 additions & 4 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ TEST_P(ConnectionImplDeathTest, BadFd) {
".*assert failure: SOCKET_VALID\\(ConnectionImpl::ioHandle\\(\\)\\.fd\\(\\)\\).*");
}

class TestClientConnectionImpl : public Network::ClientConnectionImpl {
public:
using ClientConnectionImpl::ClientConnectionImpl;
Buffer::WatermarkBuffer& readBuffer() { return read_buffer_; }
};

class ConnectionImplTest : public testing::TestWithParam<Address::IpVersion> {
protected:
ConnectionImplTest() : api_(Api::createApiForTest(time_system_)), stream_info_(time_system_) {}
Expand All @@ -104,9 +110,9 @@ class ConnectionImplTest : public testing::TestWithParam<Address::IpVersion> {
socket_ = std::make_shared<Network::TcpListenSocket>(Network::Test::getAnyAddress(GetParam()),
nullptr, true);
listener_ = dispatcher_->createListener(socket_, listener_callbacks_, true);
client_connection_ = dispatcher_->createClientConnection(
socket_->localAddress(), source_address_, Network::Test::createRawBufferSocket(),
socket_options_);
client_connection_ = std::make_unique<Network::TestClientConnectionImpl>(
*dispatcher_, socket_->localAddress(), source_address_,
Network::Test::createRawBufferSocket(), socket_options_);
client_connection_->addConnectionCallbacks(client_callbacks_);
EXPECT_EQ(nullptr, client_connection_->ssl());
const Network::ClientConnection& const_connection = *client_connection_;
Expand Down Expand Up @@ -215,6 +221,9 @@ class ConnectionImplTest : public testing::TestWithParam<Address::IpVersion> {
return ConnectionMocks{std::move(dispatcher), timer, std::move(transport_socket), file_event,
&file_ready_cb_};
}
Network::TestClientConnectionImpl* testClientConnection() {
return dynamic_cast<Network::TestClientConnectionImpl*>(client_connection_.get());
}

Event::FileReadyCb file_ready_cb_;
Event::SimulatedTimeSystem time_system_;
Expand Down Expand Up @@ -742,7 +751,7 @@ TEST_P(ConnectionImplTest, HalfCloseNoEarlyCloseDetection) {
}

// Test that as watermark levels are changed, the appropriate callbacks are triggered.
TEST_P(ConnectionImplTest, Watermarks) {
TEST_P(ConnectionImplTest, WriteWatermarks) {
useMockBuffer();

setUpBasicConnection();
Expand Down Expand Up @@ -791,6 +800,123 @@ TEST_P(ConnectionImplTest, Watermarks) {
disconnect(false);
}

// Test that as watermark levels are changed, the appropriate callbacks are triggered.
TEST_P(ConnectionImplTest, ReadWatermarks) {

setUpBasicConnection();
client_connection_->setBufferLimits(2);
std::shared_ptr<MockReadFilter> client_read_filter(new NiceMock<MockReadFilter>());
client_connection_->addReadFilter(client_read_filter);
connect();

EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered());
EXPECT_TRUE(client_connection_->readEnabled());
// Add 4 bytes to the buffer and verify the connection becomes read disabled.
{
Buffer::OwnedImpl buffer("data");
server_connection_->write(buffer, false);
EXPECT_CALL(*client_read_filter, onData(_, false))
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus {
dispatcher_->exit();
return FilterStatus::StopIteration;
}));
dispatcher_->run(Event::Dispatcher::RunType::Block);

EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered());
EXPECT_FALSE(client_connection_->readEnabled());
}

// Drain 3 bytes from the buffer. This bring sit below the low watermark, and
// read enables, as well as triggering a kick for the remaining byte.
{
testClientConnection()->readBuffer().drain(3);
EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered());
EXPECT_TRUE(client_connection_->readEnabled());

EXPECT_CALL(*client_read_filter, onData(_, false));
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}

// Add 3 bytes to the buffer and verify the connection becomes read disabled
// again.
{
Buffer::OwnedImpl buffer("bye");
server_connection_->write(buffer, false);
EXPECT_CALL(*client_read_filter, onData(_, false))
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus {
dispatcher_->exit();
return FilterStatus::StopIteration;
}));
dispatcher_->run(Event::Dispatcher::RunType::Block);

EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered());
EXPECT_FALSE(client_connection_->readEnabled());
}

// Now have the consumer read disable.
// This time when the buffer is drained, there will be no kick as the consumer
// does not want to read.
{
client_connection_->readDisable(true);
testClientConnection()->readBuffer().drain(3);
EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered());
EXPECT_FALSE(client_connection_->readEnabled());

EXPECT_CALL(*client_read_filter, onData(_, false)).Times(0);
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}

// Now read enable again.
// Inside the onData call, readDisable and readEnable. This should trigger
// another kick on the next dispatcher loop, so onData gets called twice.
{
client_connection_->readDisable(false);
EXPECT_CALL(*client_read_filter, onData(_, false))
.Times(2)
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus {
client_connection_->readDisable(true);
client_connection_->readDisable(false);
return FilterStatus::StopIteration;
}))
.WillRepeatedly(Return(FilterStatus::StopIteration));
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}

// Test the same logic for dispatched_buffered_data from the
// onReadReady() (read_disable_count_ != 0) path.
{
// Fill the buffer and verify the socket is read disabled.
Buffer::OwnedImpl buffer("bye");
server_connection_->write(buffer, false);
EXPECT_CALL(*client_read_filter, onData(_, false))
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus {
dispatcher_->exit();
return FilterStatus::StopIteration;
}));
dispatcher_->run(Event::Dispatcher::RunType::Block);
EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered());
EXPECT_FALSE(client_connection_->readEnabled());

// Read disable and read enable, to set dispatch_buffered_data_ true.
client_connection_->readDisable(true);
client_connection_->readDisable(false);
// Now event loop. This hits the early on-Read path. As above, read
// disable and read enable from inside the stack of onData, to ensure that
// dispatch_buffered_data_ works correctly.
EXPECT_CALL(*client_read_filter, onData(_, false))
.Times(2)
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus {
client_connection_->readDisable(true);
client_connection_->readDisable(false);
return FilterStatus::StopIteration;
}))
.WillRepeatedly(Return(FilterStatus::StopIteration));
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}

disconnect(true);
}

// Write some data to the connection. It will automatically attempt to flush
// it to the upstream file descriptor via a write() call to buffer_, which is
// configured to succeed and accept all bytes read.
Expand Down

0 comments on commit 77cca6b

Please sign in to comment.