From 77cca6b40775fbce1b93eb3ae6189b6e9dede208 Mon Sep 17 00:00:00 2001 From: alyssawilk Date: Wed, 20 May 2020 16:57:23 -0400 Subject: [PATCH] connection: adding watermarks to the read buffer. (#11170) Fixing an issue where every time a connection was readDisabled/readEnabled it would read from the socket, even if the buffer already contained sufficient data it should have triggered push back. Signed-off-by: Alyssa Wilk --- source/common/buffer/watermark_buffer.h | 3 + source/common/network/connection_impl.cc | 77 ++++++++--- source/common/network/connection_impl.h | 19 ++- test/common/network/connection_impl_test.cc | 134 +++++++++++++++++++- 4 files changed, 206 insertions(+), 27 deletions(-) diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h index 827d1a51bccf..5bc111a4e1e3 100644 --- a/source/common/buffer/watermark_buffer.h +++ b/source/common/buffer/watermark_buffer.h @@ -38,6 +38,9 @@ class WatermarkBuffer : public OwnedImpl { void setWatermarks(uint32_t watermark) { setWatermarks(watermark / 2, watermark); } void setWatermarks(uint32_t low_watermark, uint32_t high_watermark); uint32_t highWatermark() const { return high_watermark_; } + // Returns true if the high watermark callbacks have been called more recently + // than the low watermark callbacks. + bool highWatermarkTriggered() const { return above_high_watermark_called_; } private: void checkHighWatermark(); diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index f44aee154ead..12961773a7ee 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -48,6 +48,8 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt : ConnectionImplBase(dispatcher, next_global_id_++), transport_socket_(std::move(transport_socket)), socket_(std::move(socket)), stream_info_(stream_info), filter_manager_(*this), + read_buffer_([this]() -> void { this->onReadBufferLowWatermark(); }, + [this]() -> void { this->onReadBufferHighWatermark(); }), write_buffer_(dispatcher.getWatermarkFactory().create( [this]() -> void { this->onWriteBufferLowWatermark(); }, [this]() -> void { this->onWriteBufferHighWatermark(); })), @@ -186,8 +188,13 @@ Connection::State ConnectionImpl::state() const { void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); } +bool ConnectionImpl::consumerWantsToRead() { + return read_disable_count_ == 0 || + (read_disable_count_ == 1 && read_buffer_.highWatermarkTriggered()); +} + void ConnectionImpl::closeSocket(ConnectionEvent close_type) { - if (!ioHandle().isOpen()) { + if (!ConnectionImpl::ioHandle().isOpen()) { return; } @@ -216,7 +223,8 @@ void ConnectionImpl::closeSocket(ConnectionEvent close_type) { socket_->close(); - raiseEvent(close_type); + // Call the base class directly as close() is called in the destructor. + ConnectionImpl::raiseEvent(close_type); } void ConnectionImpl::noDelay(bool enable) { @@ -268,7 +276,7 @@ void ConnectionImpl::noDelay(bool enable) { } void ConnectionImpl::onRead(uint64_t read_buffer_size) { - if (read_disable_count_ != 0 || inDelayedClose()) { + if (inDelayedClose() || !consumerWantsToRead()) { return; } ASSERT(ioHandle().isOpen()); @@ -311,8 +319,8 @@ void ConnectionImpl::readDisable(bool disable) { ASSERT(state() == State::Open); ASSERT(file_event_ != nullptr); - ENVOY_CONN_LOG(trace, "readDisable: enabled={} disable_count={} state={}", *this, - read_disable_count_, disable, static_cast(state())); + ENVOY_CONN_LOG(trace, "readDisable: disable={} disable_count={} state={} buffer_length={}", *this, + disable, read_disable_count_, static_cast(state()), read_buffer_.length()); // When we disable reads, we still allow for early close notifications (the equivalent of // EPOLLRDHUP for an epoll backend). For backends that support it, this allows us to apply @@ -341,25 +349,26 @@ void ConnectionImpl::readDisable(bool disable) { file_event_->setEnabled(Event::FileReadyType::Write); } } else { + ASSERT(read_disable_count_ != 0); --read_disable_count_; - if (read_disable_count_ != 0) { - // The socket should stay disabled. - return; - } if (state() != State::Open || file_event_ == nullptr) { // If readDisable is called on a closed connection, do not crash. return; } - // We never ask for both early close and read at the same time. If we are reading, we want to - // consume all available data. - file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write); - // If the connection has data buffered there's no guarantee there's also data in the kernel - // which will kick off the filter chain. Instead fake an event to make sure the buffered data - // gets processed regardless and ensure that we dispatch it via onRead. - if (read_buffer_.length() > 0) { + if (read_disable_count_ == 0) { + // We never ask for both early close and read at the same time. If we are reading, we want to + // consume all available data. + file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write); + } + + if (consumerWantsToRead() && read_buffer_.length() > 0) { + // If the connection has data buffered there's no guarantee there's also data in the kernel + // which will kick off the filter chain. Alternately if the read buffer has data the fd could + // be read disabled. To handle these cases, fake an event to make sure the buffered data gets + // processed regardless and ensure that we dispatch it via onRead. dispatch_buffered_data_ = true; - file_event_->activate(Event::FileReadyType::Read); + setReadBufferReady(); } } } @@ -465,6 +474,21 @@ void ConnectionImpl::setBufferLimits(uint32_t limit) { // would result in respecting the exact buffer limit. if (limit > 0) { static_cast(write_buffer_.get())->setWatermarks(limit + 1); + read_buffer_.setWatermarks(limit + 1); + } +} + +void ConnectionImpl::onReadBufferLowWatermark() { + ENVOY_CONN_LOG(debug, "onBelowReadBufferLowWatermark", *this); + if (state() == State::Open) { + readDisable(false); + } +} + +void ConnectionImpl::onReadBufferHighWatermark() { + ENVOY_CONN_LOG(debug, "onAboveReadBufferHighWatermark", *this); + if (state() == State::Open) { + readDisable(true); } } @@ -525,10 +549,24 @@ void ConnectionImpl::onFileEvent(uint32_t events) { } void ConnectionImpl::onReadReady() { - ENVOY_CONN_LOG(trace, "read ready", *this); + ENVOY_CONN_LOG(trace, "read ready. dispatch_buffered_data={}", *this, dispatch_buffered_data_); + const bool latched_dispatch_buffered_data = dispatch_buffered_data_; + dispatch_buffered_data_ = false; ASSERT(!connecting_); + // We get here while read disabled in two ways. + // 1) There was a call to setReadBufferReady(), for example if a raw buffer socket ceded due to + // shouldDrainReadBuffer(). In this case we defer the event until the socket is read enabled. + // 2) The consumer of connection data called readDisable(true), and instead of reading from the + // socket we simply need to dispatch already read data. + if (read_disable_count_ != 0) { + if (latched_dispatch_buffered_data && consumerWantsToRead()) { + onRead(read_buffer_.length()); + } + return; + } + IoResult result = transport_socket_->doRead(read_buffer_); uint64_t new_buffer_size = read_buffer_.length(); updateReadBufferStats(result.bytes_processed_, new_buffer_size); @@ -542,13 +580,12 @@ void ConnectionImpl::onReadReady() { read_end_stream_ |= result.end_stream_read_; if (result.bytes_processed_ != 0 || result.end_stream_read_ || - (dispatch_buffered_data_ && read_buffer_.length() > 0)) { + (latched_dispatch_buffered_data && read_buffer_.length() > 0)) { // Skip onRead if no bytes were processed unless we explicitly want to force onRead for // buffered data. For instance, skip onRead if the connection was closed without producing // more data. onRead(new_buffer_size); } - dispatch_buffered_data_ = false; // The read callback may have already closed the connection. if (result.action_ == PostIoAction::Close || bothSidesHalfClosed()) { diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index 6e8c1eb65518..b464e2af96d1 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -102,10 +102,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback } // Network::TransportSocketCallbacks - IoHandle& ioHandle() override { return socket_->ioHandle(); } + IoHandle& ioHandle() final { return socket_->ioHandle(); } const IoHandle& ioHandle() const override { return socket_->ioHandle(); } Connection& connection() override { return *this; } - void raiseEvent(ConnectionEvent event) override; + void raiseEvent(ConnectionEvent event) final; // Should the read buffer be drained? bool shouldDrainReadBuffer() override { return read_buffer_limit_ > 0 && read_buffer_.length() >= read_buffer_limit_; @@ -122,11 +122,22 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback static uint64_t nextGlobalIdForTest() { return next_global_id_; } protected: + // A convenience function which returns true if + // 1) The read disable count is zero or + // 2) The read disable count is one due to the read buffer being overrun. + // In either case the consumer of the data would like to read from the buffer. + // If the read count is greater than one, or equal to one when the buffer is + // not overrun, then the consumer of the data has called readDisable, and does + // not want to read. + bool consumerWantsToRead(); + // Network::ConnectionImplBase void closeConnectionImmediately() override; void closeSocket(ConnectionEvent close_type); + void onReadBufferLowWatermark(); + void onReadBufferHighWatermark(); void onWriteBufferLowWatermark(); void onWriteBufferHighWatermark(); @@ -135,7 +146,9 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback StreamInfo::StreamInfo& stream_info_; FilterManagerImpl filter_manager_; - Buffer::OwnedImpl read_buffer_; + // Ensure that if the consumer of the data from this connection isn't + // consuming, that the connection eventually stops reading from the wire. + Buffer::WatermarkBuffer read_buffer_; // This must be a WatermarkBuffer, but as it is created by a factory the ConnectionImpl only has // a generic pointer. // It MUST be defined after the filter_manager_ as some filters may have callbacks that diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index fbeb06519b09..6ef055b9fab3 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -93,6 +93,12 @@ TEST_P(ConnectionImplDeathTest, BadFd) { ".*assert failure: SOCKET_VALID\\(ConnectionImpl::ioHandle\\(\\)\\.fd\\(\\)\\).*"); } +class TestClientConnectionImpl : public Network::ClientConnectionImpl { +public: + using ClientConnectionImpl::ClientConnectionImpl; + Buffer::WatermarkBuffer& readBuffer() { return read_buffer_; } +}; + class ConnectionImplTest : public testing::TestWithParam { protected: ConnectionImplTest() : api_(Api::createApiForTest(time_system_)), stream_info_(time_system_) {} @@ -104,9 +110,9 @@ class ConnectionImplTest : public testing::TestWithParam { socket_ = std::make_shared(Network::Test::getAnyAddress(GetParam()), nullptr, true); listener_ = dispatcher_->createListener(socket_, listener_callbacks_, true); - client_connection_ = dispatcher_->createClientConnection( - socket_->localAddress(), source_address_, Network::Test::createRawBufferSocket(), - socket_options_); + client_connection_ = std::make_unique( + *dispatcher_, socket_->localAddress(), source_address_, + Network::Test::createRawBufferSocket(), socket_options_); client_connection_->addConnectionCallbacks(client_callbacks_); EXPECT_EQ(nullptr, client_connection_->ssl()); const Network::ClientConnection& const_connection = *client_connection_; @@ -215,6 +221,9 @@ class ConnectionImplTest : public testing::TestWithParam { return ConnectionMocks{std::move(dispatcher), timer, std::move(transport_socket), file_event, &file_ready_cb_}; } + Network::TestClientConnectionImpl* testClientConnection() { + return dynamic_cast(client_connection_.get()); + } Event::FileReadyCb file_ready_cb_; Event::SimulatedTimeSystem time_system_; @@ -742,7 +751,7 @@ TEST_P(ConnectionImplTest, HalfCloseNoEarlyCloseDetection) { } // Test that as watermark levels are changed, the appropriate callbacks are triggered. -TEST_P(ConnectionImplTest, Watermarks) { +TEST_P(ConnectionImplTest, WriteWatermarks) { useMockBuffer(); setUpBasicConnection(); @@ -791,6 +800,123 @@ TEST_P(ConnectionImplTest, Watermarks) { disconnect(false); } +// Test that as watermark levels are changed, the appropriate callbacks are triggered. +TEST_P(ConnectionImplTest, ReadWatermarks) { + + setUpBasicConnection(); + client_connection_->setBufferLimits(2); + std::shared_ptr client_read_filter(new NiceMock()); + client_connection_->addReadFilter(client_read_filter); + connect(); + + EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered()); + EXPECT_TRUE(client_connection_->readEnabled()); + // Add 4 bytes to the buffer and verify the connection becomes read disabled. + { + Buffer::OwnedImpl buffer("data"); + server_connection_->write(buffer, false); + EXPECT_CALL(*client_read_filter, onData(_, false)) + .WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { + dispatcher_->exit(); + return FilterStatus::StopIteration; + })); + dispatcher_->run(Event::Dispatcher::RunType::Block); + + EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered()); + EXPECT_FALSE(client_connection_->readEnabled()); + } + + // Drain 3 bytes from the buffer. This bring sit below the low watermark, and + // read enables, as well as triggering a kick for the remaining byte. + { + testClientConnection()->readBuffer().drain(3); + EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered()); + EXPECT_TRUE(client_connection_->readEnabled()); + + EXPECT_CALL(*client_read_filter, onData(_, false)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } + + // Add 3 bytes to the buffer and verify the connection becomes read disabled + // again. + { + Buffer::OwnedImpl buffer("bye"); + server_connection_->write(buffer, false); + EXPECT_CALL(*client_read_filter, onData(_, false)) + .WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { + dispatcher_->exit(); + return FilterStatus::StopIteration; + })); + dispatcher_->run(Event::Dispatcher::RunType::Block); + + EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered()); + EXPECT_FALSE(client_connection_->readEnabled()); + } + + // Now have the consumer read disable. + // This time when the buffer is drained, there will be no kick as the consumer + // does not want to read. + { + client_connection_->readDisable(true); + testClientConnection()->readBuffer().drain(3); + EXPECT_FALSE(testClientConnection()->readBuffer().highWatermarkTriggered()); + EXPECT_FALSE(client_connection_->readEnabled()); + + EXPECT_CALL(*client_read_filter, onData(_, false)).Times(0); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } + + // Now read enable again. + // Inside the onData call, readDisable and readEnable. This should trigger + // another kick on the next dispatcher loop, so onData gets called twice. + { + client_connection_->readDisable(false); + EXPECT_CALL(*client_read_filter, onData(_, false)) + .Times(2) + .WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { + client_connection_->readDisable(true); + client_connection_->readDisable(false); + return FilterStatus::StopIteration; + })) + .WillRepeatedly(Return(FilterStatus::StopIteration)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } + + // Test the same logic for dispatched_buffered_data from the + // onReadReady() (read_disable_count_ != 0) path. + { + // Fill the buffer and verify the socket is read disabled. + Buffer::OwnedImpl buffer("bye"); + server_connection_->write(buffer, false); + EXPECT_CALL(*client_read_filter, onData(_, false)) + .WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { + dispatcher_->exit(); + return FilterStatus::StopIteration; + })); + dispatcher_->run(Event::Dispatcher::RunType::Block); + EXPECT_TRUE(testClientConnection()->readBuffer().highWatermarkTriggered()); + EXPECT_FALSE(client_connection_->readEnabled()); + + // Read disable and read enable, to set dispatch_buffered_data_ true. + client_connection_->readDisable(true); + client_connection_->readDisable(false); + // Now event loop. This hits the early on-Read path. As above, read + // disable and read enable from inside the stack of onData, to ensure that + // dispatch_buffered_data_ works correctly. + EXPECT_CALL(*client_read_filter, onData(_, false)) + .Times(2) + .WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus { + client_connection_->readDisable(true); + client_connection_->readDisable(false); + return FilterStatus::StopIteration; + })) + .WillRepeatedly(Return(FilterStatus::StopIteration)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } + + disconnect(true); +} + // Write some data to the connection. It will automatically attempt to flush // it to the upstream file descriptor via a write() call to buffer_, which is // configured to succeed and accept all bytes read.