Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connection: adding watermarks to the read buffer. #11170

Merged
merged 6 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions source/common/buffer/watermark_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class WatermarkBuffer : public OwnedImpl {
void setWatermarks(uint32_t watermark) { setWatermarks(watermark / 2, watermark); }
void setWatermarks(uint32_t low_watermark, uint32_t high_watermark);
uint32_t highWatermark() const { return high_watermark_; }
// Returns true if the high watermark callbacks have been called more recently
// than the low watermark callbacks.
bool highWatermarkTriggered() const { return above_high_watermark_called_; }

private:
void checkHighWatermark();
Expand Down
76 changes: 56 additions & 20 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt
: ConnectionImplBase(dispatcher, next_global_id_++),
transport_socket_(std::move(transport_socket)), socket_(std::move(socket)),
stream_info_(stream_info), filter_manager_(*this),
read_buffer_([this]() -> void { this->onReadBufferLowWatermark(); },
[this]() -> void { this->onReadBufferHighWatermark(); }),
write_buffer_(dispatcher.getWatermarkFactory().create(
[this]() -> void { this->onWriteBufferLowWatermark(); },
[this]() -> void { this->onWriteBufferHighWatermark(); })),
Expand Down Expand Up @@ -186,8 +188,13 @@ Connection::State ConnectionImpl::state() const {

void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); }

bool ConnectionImpl::consumerWantsToRead() {
return read_disable_count_ == 0 ||
(read_disable_count_ == 1 && read_buffer_.highWatermarkTriggered());
}

void ConnectionImpl::closeSocket(ConnectionEvent close_type) {
if (!ioHandle().isOpen()) {
if (!ConnectionImpl::ioHandle().isOpen()) {
return;
}

Expand Down Expand Up @@ -216,7 +223,8 @@ void ConnectionImpl::closeSocket(ConnectionEvent close_type) {

socket_->close();

raiseEvent(close_type);
// Call the base class directly as close() is called in the destructor.
ConnectionImpl::raiseEvent(close_type);
}

void ConnectionImpl::noDelay(bool enable) {
Expand Down Expand Up @@ -268,7 +276,7 @@ void ConnectionImpl::noDelay(bool enable) {
}

void ConnectionImpl::onRead(uint64_t read_buffer_size) {
if (read_disable_count_ != 0 || inDelayedClose()) {
if (inDelayedClose() || !consumerWantsToRead()) {
return;
}
ASSERT(ioHandle().isOpen());
Expand Down Expand Up @@ -311,8 +319,8 @@ void ConnectionImpl::readDisable(bool disable) {
ASSERT(state() == State::Open);
ASSERT(file_event_ != nullptr);

ENVOY_CONN_LOG(trace, "readDisable: enabled={} disable_count={} state={}", *this,
read_disable_count_, disable, static_cast<int>(state()));
ENVOY_CONN_LOG(trace, "readDisable: disable={} disable_count={} state={} buffer_length={}", *this,
disable, read_disable_count_, static_cast<int>(state()), read_buffer_.length());

// When we disable reads, we still allow for early close notifications (the equivalent of
// EPOLLRDHUP for an epoll backend). For backends that support it, this allows us to apply
Expand Down Expand Up @@ -342,24 +350,24 @@ void ConnectionImpl::readDisable(bool disable) {
}
} else {
--read_disable_count_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you are here can you ASSERT this is greater than 0 before decrementing? I'm surprised this was not already asserted.

if (read_disable_count_ != 0) {
// The socket should stay disabled.
return;
}
if (state() != State::Open || file_event_ == nullptr) {
// If readDisable is called on a closed connection, do not crash.
return;
}

// We never ask for both early close and read at the same time. If we are reading, we want to
// consume all available data.
file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write);
// If the connection has data buffered there's no guarantee there's also data in the kernel
// which will kick off the filter chain. Instead fake an event to make sure the buffered data
// gets processed regardless and ensure that we dispatch it via onRead.
if (read_buffer_.length() > 0) {
if (read_disable_count_ == 0) {
// We never ask for both early close and read at the same time. If we are reading, we want to
// consume all available data.
file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write);
}

if (consumerWantsToRead() && read_buffer_.length() > 0) {
// If the connection has data buffered there's no guarantee there's also data in the kernel
// which will kick off the filter chain. Alternately if the read buffer has data the fd could
// be read disabled. To handle these cases, fake an event to make sure the buffered data gets
// processed regardless and ensure that we dispatch it via onRead.
dispatch_buffered_data_ = true;
file_event_->activate(Event::FileReadyType::Read);
setReadBufferReady();
}
}
}
Expand Down Expand Up @@ -465,6 +473,21 @@ void ConnectionImpl::setBufferLimits(uint32_t limit) {
// would result in respecting the exact buffer limit.
if (limit > 0) {
static_cast<Buffer::WatermarkBuffer*>(write_buffer_.get())->setWatermarks(limit + 1);
read_buffer_.setWatermarks(limit + 1);
}
}

void ConnectionImpl::onReadBufferLowWatermark() {
ENVOY_CONN_LOG(debug, "onBelowReadBufferLowWatermark", *this);
if (state() == State::Open) {
readDisable(false);
}
}

void ConnectionImpl::onReadBufferHighWatermark() {
ENVOY_CONN_LOG(debug, "onAboveReadBufferHighWatermark", *this);
if (state() == State::Open) {
readDisable(true);
}
}

Expand Down Expand Up @@ -525,10 +548,24 @@ void ConnectionImpl::onFileEvent(uint32_t events) {
}

void ConnectionImpl::onReadReady() {
ENVOY_CONN_LOG(trace, "read ready", *this);
ENVOY_CONN_LOG(trace, "read ready. dispatch_buffered_data={}", *this, dispatch_buffered_data_);
const bool latched_dispatch_buffered_data = dispatch_buffered_data_;
dispatch_buffered_data_ = false;

ASSERT(!connecting_);

// We get here while read disabled in two ways.
// 1) There was a call to setReadBufferReady(), for example if a raw buffer socket ceded due to
// shouldDrainReadBuffer(). In this case we defer the event until the socket is read enabled.
// 2) The consumer of connection data called readDisable(true), and instead of reading from the
// socket we simply need to dispatch already read data.
if (read_disable_count_ != 0) {
if (latched_dispatch_buffered_data && consumerWantsToRead()) {
onRead(read_buffer_.length());
}
return;
}

IoResult result = transport_socket_->doRead(read_buffer_);
uint64_t new_buffer_size = read_buffer_.length();
updateReadBufferStats(result.bytes_processed_, new_buffer_size);
Expand All @@ -542,13 +579,12 @@ void ConnectionImpl::onReadReady() {

read_end_stream_ |= result.end_stream_read_;
if (result.bytes_processed_ != 0 || result.end_stream_read_ ||
(dispatch_buffered_data_ && read_buffer_.length() > 0)) {
(latched_dispatch_buffered_data && read_buffer_.length() > 0)) {
// Skip onRead if no bytes were processed unless we explicitly want to force onRead for
// buffered data. For instance, skip onRead if the connection was closed without producing
// more data.
onRead(new_buffer_size);
}
dispatch_buffered_data_ = false;

// The read callback may have already closed the connection.
if (result.action_ == PostIoAction::Close || bothSidesHalfClosed()) {
Expand Down
16 changes: 13 additions & 3 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
}

// Network::TransportSocketCallbacks
IoHandle& ioHandle() override { return socket_->ioHandle(); }
IoHandle& ioHandle() final { return socket_->ioHandle(); }
const IoHandle& ioHandle() const override { return socket_->ioHandle(); }
Connection& connection() override { return *this; }
void raiseEvent(ConnectionEvent event) override;
void raiseEvent(ConnectionEvent event) final;
// Should the read buffer be drained?
bool shouldDrainReadBuffer() override {
return read_buffer_limit_ > 0 && read_buffer_.length() >= read_buffer_limit_;
Expand All @@ -122,11 +122,19 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
static uint64_t nextGlobalIdForTest() { return next_global_id_; }

protected:
// A convenience function which returns true if
// 1) The read disable count is zero or
// 2) The read disable count is one, due to the read buffer being overrun.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a chance that the read disable count is > 1 and the buffer is overrun? Like if it got overrun but also disabled some other way? Or is the idea that if it is > 1 we will wait until the only reason is it's disabled due to read overrun? If that's the case can you add more comments just to make it completely clear?

// In either case the consumer of the data would like to read from the buffer.
bool consumerWantsToRead();

// Network::ConnectionImplBase
void closeConnectionImmediately() override;

void closeSocket(ConnectionEvent close_type);

void onReadBufferLowWatermark();
void onReadBufferHighWatermark();
void onWriteBufferLowWatermark();
void onWriteBufferHighWatermark();

Expand All @@ -135,7 +143,9 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
StreamInfo::StreamInfo& stream_info_;
FilterManagerImpl filter_manager_;

Buffer::OwnedImpl read_buffer_;
// Ensure that if the consumer of the data from this connection isn't
// consuming, that the connection eventually stops reading from the wire.
Buffer::WatermarkBuffer read_buffer_;
// This must be a WatermarkBuffer, but as it is created by a factory the ConnectionImpl only has
// a generic pointer.
// It MUST be defined after the filter_manager_ as some filters may have callbacks that
Expand Down
134 changes: 130 additions & 4 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ TEST_P(ConnectionImplDeathTest, BadFd) {
".*assert failure: SOCKET_VALID\\(ConnectionImpl::ioHandle\\(\\)\\.fd\\(\\)\\).*");
}

class TestClientConnectionImpl : public Network::ClientConnectionImpl {
public:
using ClientConnectionImpl::ClientConnectionImpl;
Buffer::WatermarkBuffer& readBuffer() { return read_buffer_; }
};

class ConnectionImplTest : public testing::TestWithParam<Address::IpVersion> {
protected:
ConnectionImplTest() : api_(Api::createApiForTest(time_system_)), stream_info_(time_system_) {}
Expand All @@ -104,9 +110,9 @@ class ConnectionImplTest : public testing::TestWithParam<Address::IpVersion> {
socket_ = std::make_shared<Network::TcpListenSocket>(Network::Test::getAnyAddress(GetParam()),
nullptr, true);
listener_ = dispatcher_->createListener(socket_, listener_callbacks_, true);
client_connection_ = dispatcher_->createClientConnection(
socket_->localAddress(), source_address_, Network::Test::createRawBufferSocket(),
socket_options_);
client_connection_ = std::make_unique<Network::TestClientConnectionImpl>(
*dispatcher_, socket_->localAddress(), source_address_,
Network::Test::createRawBufferSocket(), socket_options_);
client_connection_->addConnectionCallbacks(client_callbacks_);
EXPECT_EQ(nullptr, client_connection_->ssl());
const Network::ClientConnection& const_connection = *client_connection_;
Expand Down Expand Up @@ -215,6 +221,9 @@ class ConnectionImplTest : public testing::TestWithParam<Address::IpVersion> {
return ConnectionMocks{std::move(dispatcher), timer, std::move(transport_socket), file_event,
&file_ready_cb_};
}
Network::TestClientConnectionImpl* testClientConnection() {
return dynamic_cast<Network::TestClientConnectionImpl*>(client_connection_.get());
}

Event::FileReadyCb file_ready_cb_;
Event::SimulatedTimeSystem time_system_;
Expand Down Expand Up @@ -742,7 +751,7 @@ TEST_P(ConnectionImplTest, HalfCloseNoEarlyCloseDetection) {
}

// Test that as watermark levels are changed, the appropriate callbacks are triggered.
TEST_P(ConnectionImplTest, Watermarks) {
TEST_P(ConnectionImplTest, WriteWatermarks) {
useMockBuffer();

setUpBasicConnection();
Expand Down Expand Up @@ -791,6 +800,123 @@ TEST_P(ConnectionImplTest, Watermarks) {
disconnect(false);
}

// Test that as watermark levels are changed, the appropriate callbacks are triggered.
TEST_P(ConnectionImplTest, ReadWatermarks) {

setUpBasicConnection();
client_connection_->setBufferLimits(2);
std::shared_ptr<MockReadFilter> client_read_filter(new NiceMock<MockReadFilter>());
client_connection_->addReadFilter(client_read_filter);
connect();

EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered());
EXPECT_TRUE(client_connection_->readEnabled());
// Add 4 bytes to the buffer and verify the connection becomes read disabled.
{
Buffer::OwnedImpl buffer("data");
server_connection_->write(buffer, false);
EXPECT_CALL(*client_read_filter, onData(_, false))
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus {
dispatcher_->exit();
return FilterStatus::StopIteration;
}));
dispatcher_->run(Event::Dispatcher::RunType::Block);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth doing a second server_connection write before doing drains, and verify that there is no read from the client connection on dispatcher_->run()?


EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered());
EXPECT_FALSE(client_connection_->readEnabled());
}

// Drain 3 bytes from the buffer. This bring sit below the low watermark, and
// read enables, as well as triggering a kick for the remaining byte.
{
testClientConnection()->readBuffer().drain(3);
EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered());
EXPECT_TRUE(client_connection_->readEnabled());

EXPECT_CALL(*client_read_filter, onData(_, false));
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}

// Add 3 bytes to the buffer and verify the connection becomes read disabled
// again.
{
Buffer::OwnedImpl buffer("bye");
server_connection_->write(buffer, false);
EXPECT_CALL(*client_read_filter, onData(_, false))
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus {
dispatcher_->exit();
return FilterStatus::StopIteration;
}));
dispatcher_->run(Event::Dispatcher::RunType::Block);

EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered());
EXPECT_FALSE(client_connection_->readEnabled());
}

// Now have the consumer read disable.
// This time when the buffer is drained, there will be no kick as the consumer
// does not want to read.
{
client_connection_->readDisable(true);
testClientConnection()->readBuffer().drain(3);
EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered());
EXPECT_FALSE(client_connection_->readEnabled());

EXPECT_CALL(*client_read_filter, onData(_, false)).Times(0);
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}

// Now read enable again.
// Inside the onData call, readDisable and readEnable. This should trigger
// another kick on the next dispatcher loop, so onData gets called twice.
{
client_connection_->readDisable(false);
EXPECT_CALL(*client_read_filter, onData(_, false))
.Times(2)
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus {
client_connection_->readDisable(true);
client_connection_->readDisable(false);
return FilterStatus::StopIteration;
}))
.WillRepeatedly(Return(FilterStatus::StopIteration));
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}

// Test the same logic for dispatched_buffered_data from the
// onReadReady() (read_disable_count_ != 0) path.
{
// Fill the buffer and verify the socket is read disabled.
Buffer::OwnedImpl buffer("bye");
server_connection_->write(buffer, false);
EXPECT_CALL(*client_read_filter, onData(_, false))
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus {
dispatcher_->exit();
return FilterStatus::StopIteration;
}));
dispatcher_->run(Event::Dispatcher::RunType::Block);
EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered());
EXPECT_FALSE(client_connection_->readEnabled());

// Read disable and read enable, to set dispatch_buffered_data_ true.
client_connection_->readDisable(true);
client_connection_->readDisable(false);
// Now event loop. This hits the early on-Read path. As above, read
// disable and read enable from inside the stack of onData, to ensure that
// dispatch_buffered_data_ works correctly.
EXPECT_CALL(*client_read_filter, onData(_, false))
.Times(2)
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus {
client_connection_->readDisable(true);
client_connection_->readDisable(false);
return FilterStatus::StopIteration;
}))
.WillRepeatedly(Return(FilterStatus::StopIteration));
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}

disconnect(true);
}

// Write some data to the connection. It will automatically attempt to flush
// it to the upstream file descriptor via a write() call to buffer_, which is
// configured to succeed and accept all bytes read.
Expand Down