From a625c80cf6aa18f6147bde63f386674113f61d92 Mon Sep 17 00:00:00 2001 From: antonio Date: Wed, 28 Oct 2020 17:39:53 -0400 Subject: [PATCH] connection: Remember transport socket read resumption requests and replay them when re-enabling read. (#13772) Fixes SslSocket read resumption after readDisable when processing the SSL record that contains the last bytes of the HTTP message Signed-off-by: Antonio Vicente --- docs/root/version_history/current.rst | 1 + source/common/network/connection_impl.cc | 32 ++- source/common/network/connection_impl.h | 19 +- test/common/network/connection_impl_test.cc | 191 ++++++++++++++++++ .../tls/integration/ssl_integration_test.cc | 100 +++++++++ .../tls/integration/ssl_integration_test.h | 2 - test/integration/base_integration_test.h | 16 ++ test/integration/utility.cc | 38 +++- test/integration/utility.h | 26 ++- 9 files changed, 397 insertions(+), 28 deletions(-) diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 8fe867417342..763c1f98b11b 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -21,6 +21,7 @@ Bug Fixes *Changes expected to improve the state of the world and are unlikely to have negative effects* * http: sending CONNECT_ERROR for HTTP/2 where appropriate during CONNECT requests. +* tls: fix read resumption after triggering buffer high-watermark and all remaining request/response bytes are stored in the SSL connection's internal buffers. Removed Config or Runtime ------------------------- diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 8f8e9e759662..8c44fd9ff8d6 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -63,7 +63,8 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })), write_buffer_above_high_watermark_(false), detect_early_close_(true), enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false), - write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false) { + write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false), + transport_wants_read_(false) { if (!connected) { connecting_ = true; @@ -193,7 +194,7 @@ Connection::State ConnectionImpl::state() const { void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); } -bool ConnectionImpl::consumerWantsToRead() { +bool ConnectionImpl::filterChainWantsData() { return read_disable_count_ == 0 || (read_disable_count_ == 1 && read_buffer_.highWatermarkTriggered()); } @@ -271,7 +272,7 @@ void ConnectionImpl::noDelay(bool enable) { } void ConnectionImpl::onRead(uint64_t read_buffer_size) { - if (inDelayedClose() || !consumerWantsToRead()) { + if (inDelayedClose() || !filterChainWantsData()) { return; } ASSERT(ioHandle().isOpen()); @@ -356,11 +357,17 @@ void ConnectionImpl::readDisable(bool disable) { ioHandle().enableFileEvents(Event::FileReadyType::Read | Event::FileReadyType::Write); } - if (consumerWantsToRead() && read_buffer_.length() > 0) { - // If the connection has data buffered there's no guarantee there's also data in the kernel - // which will kick off the filter chain. Alternately if the read buffer has data the fd could - // be read disabled. To handle these cases, fake an event to make sure the buffered data gets - // processed regardless and ensure that we dispatch it via onRead. + if (filterChainWantsData() && (read_buffer_.length() > 0 || transport_wants_read_)) { + // If the read_buffer_ is not empty or transport_wants_read_ is true, the connection may be + // able to process additional bytes even if there is no data in the kernel to kick off the + // filter chain. Alternately if the read buffer has data the fd could be read disabled. To + // handle these cases, fake an event to make sure the buffered data in the read buffer or in + // transport socket internal buffers gets processed regardless and ensure that we dispatch it + // via onRead. + + // Sanity check: resumption with read_disable_count_ > 0 should only happen if the read + // buffer's high watermark has triggered. + ASSERT(read_buffer_.length() > 0 || read_disable_count_ == 0); dispatch_buffered_data_ = true; setReadBufferReady(); } @@ -557,12 +564,19 @@ void ConnectionImpl::onReadReady() { // 2) The consumer of connection data called readDisable(true), and instead of reading from the // socket we simply need to dispatch already read data. if (read_disable_count_ != 0) { - if (latched_dispatch_buffered_data && consumerWantsToRead()) { + // Do not clear transport_wants_read_ when returning early; the early return skips the transport + // socket doRead call. + if (latched_dispatch_buffered_data && filterChainWantsData()) { onRead(read_buffer_.length()); } return; } + // Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that + // the transport socket read resumption happens as requested; onReadReady() returns early without + // reading from the transport if the read buffer is above high watermark at the start of the + // method. + transport_wants_read_ = false; IoResult result = transport_socket_->doRead(read_buffer_); uint64_t new_buffer_size = read_buffer_.length(); updateReadBufferStats(result.bytes_processed_, new_buffer_size); diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index ed32c30f24e4..020375414265 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -118,7 +118,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback // TODO(htuch): While this is the basis for also yielding to other connections to provide some // fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees. // Reconsider how to make fairness happen. - void setReadBufferReady() override { ioHandle().activateFileEvents(Event::FileReadyType::Read); } + void setReadBufferReady() override { + transport_wants_read_ = true; + ioHandle().activateFileEvents(Event::FileReadyType::Read); + } void flushWriteBuffer() override; // Obtain global next connection ID. This should only be used in tests. @@ -128,11 +131,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback // A convenience function which returns true if // 1) The read disable count is zero or // 2) The read disable count is one due to the read buffer being overrun. - // In either case the consumer of the data would like to read from the buffer. - // If the read count is greater than one, or equal to one when the buffer is - // not overrun, then the consumer of the data has called readDisable, and does - // not want to read. - bool consumerWantsToRead(); + // In either case the filter chain would like to process data from the read buffer or transport + // socket. If the read count is greater than one, or equal to one when the buffer is not overrun, + // then the filter chain has called readDisable, and does not want additional data. + bool filterChainWantsData(); // Network::ConnectionImplBase void closeConnectionImmediately() final; @@ -197,6 +199,11 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback bool write_end_stream_ : 1; bool current_write_end_stream_ : 1; bool dispatch_buffered_data_ : 1; + // True if the most recent call to the transport socket's doRead method invoked setReadBufferReady + // to schedule read resumption after yielding due to shouldDrainReadBuffer(). When true, + // readDisable must schedule read resumption when read_disable_count_ == 0 to ensure that read + // resumption happens when remaining bytes are held in transport socket internal buffers. + bool transport_wants_read_ : 1; }; class ServerConnectionImpl : public ConnectionImpl, virtual public ServerConnection { diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index 821e73ac0d3e..aab097ee10b8 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -1882,6 +1882,197 @@ TEST_F(MockTransportConnectionImplTest, ObjectDestructOrder) { file_ready_cb_(Event::FileReadyType::Read); } +// Verify that read resumptions requested via setReadBufferReady() are scheduled once read is +// re-enabled. +TEST_F(MockTransportConnectionImplTest, ReadBufferReadyResumeAfterReadDisable) { + InSequence s; + + std::shared_ptr read_filter(new StrictMock()); + connection_->enableHalfClose(true); + connection_->addReadFilter(read_filter); + + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write)); + connection_->readDisable(true); + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write)); + // No calls to activate when re-enabling if there are no pending read requests. + EXPECT_CALL(*file_event_, activate(_)).Times(0); + connection_->readDisable(false); + + // setReadBufferReady triggers an immediate call to activate. + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->setReadBufferReady(); + + // When processing a sequence of read disable/read enable, changes to the enabled event mask + // happen only when the disable count transitions to/from 0. + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write)); + connection_->readDisable(true); + connection_->readDisable(true); + connection_->readDisable(true); + connection_->readDisable(false); + connection_->readDisable(false); + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write)); + // Expect a read activation since there have been no transport doRead calls since the call to + // setReadBufferReady. + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->readDisable(false); + + // No calls to doRead when file_ready_cb is invoked while read disabled. + EXPECT_CALL(*file_event_, setEnabled(_)); + connection_->readDisable(true); + EXPECT_CALL(*transport_socket_, doRead(_)).Times(0); + file_ready_cb_(Event::FileReadyType::Read); + + // Expect a read activate when re-enabling since the file ready cb has not done a read. + EXPECT_CALL(*file_event_, setEnabled(_)); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->readDisable(false); + + // Do a read to clear the transport_wants_read_ flag, verify that no read activation is scheduled. + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false})); + file_ready_cb_(Event::FileReadyType::Read); + EXPECT_CALL(*file_event_, setEnabled(_)); + connection_->readDisable(true); + EXPECT_CALL(*file_event_, setEnabled(_)); + // No read activate call. + EXPECT_CALL(*file_event_, activate(_)).Times(0); + connection_->readDisable(false); +} + +// Verify that read resumption is scheduled when read is re-enabled while the read buffer is +// non-empty. +TEST_F(MockTransportConnectionImplTest, ReadBufferResumeAfterReadDisable) { + InSequence s; + + std::shared_ptr read_filter(new StrictMock()); + connection_->setBufferLimits(5); + connection_->enableHalfClose(true); + connection_->addReadFilter(read_filter); + + // Add some data to the read buffer to trigger read activate calls when re-enabling read. + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Invoke([](Buffer::Instance& buffer) -> IoResult { + buffer.add("0123456789"); + return {PostIoAction::KeepOpen, 10, false}; + })); + // Expect a change to the event mask when hitting the read buffer high-watermark. + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write)); + EXPECT_CALL(*read_filter, onNewConnection()).WillOnce(Return(FilterStatus::Continue)); + EXPECT_CALL(*read_filter, onData(_, false)).WillOnce(Return(FilterStatus::Continue)); + file_ready_cb_(Event::FileReadyType::Read); + + // Already read disabled, expect no changes to enabled events mask. + EXPECT_CALL(*file_event_, setEnabled(_)).Times(0); + connection_->readDisable(true); + connection_->readDisable(true); + connection_->readDisable(false); + // Read buffer is at the high watermark so read_disable_count should be == 1. Expect a read + // activate but no call to setEnable to change the registration mask. + EXPECT_CALL(*file_event_, setEnabled(_)).Times(0); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->readDisable(false); + + // Invoke the file event cb while read_disable_count_ == 1 to partially drain the read buffer. + // Expect no transport reads. + EXPECT_CALL(*transport_socket_, doRead(_)).Times(0); + EXPECT_CALL(*read_filter, onData(_, _)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus { + EXPECT_EQ(10, data.length()); + data.drain(data.length() - 1); + return FilterStatus::Continue; + })); + // Partial drain of the read buffer below low watermark triggers an update to the fd enabled mask + // and a read activate since the read buffer is not empty. + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write)); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + file_ready_cb_(Event::FileReadyType::Read); + + // Drain the rest of the buffer and verify there are no spurious read activate calls. + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false})); + EXPECT_CALL(*read_filter, onData(_, _)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus { + EXPECT_EQ(1, data.length()); + data.drain(1); + return FilterStatus::Continue; + })); + file_ready_cb_(Event::FileReadyType::Read); + + EXPECT_CALL(*file_event_, setEnabled(_)); + connection_->readDisable(true); + EXPECT_CALL(*file_event_, setEnabled(_)); + // read buffer is empty, no read activate call. + EXPECT_CALL(*file_event_, activate(_)).Times(0); + connection_->readDisable(false); +} + +// Verify that transport_wants_read_ read resumption is not lost when processing read buffer +// high-watermark resumptions. +TEST_F(MockTransportConnectionImplTest, ResumeWhileAndAfterReadDisable) { + InSequence s; + + std::shared_ptr read_filter(new StrictMock()); + connection_->setBufferLimits(5); + connection_->enableHalfClose(true); + connection_->addReadFilter(read_filter); + + // Add some data to the read buffer and also call setReadBufferReady to mimic what transport + // sockets are expected to do when the read buffer high watermark is hit. + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Invoke([this](Buffer::Instance& buffer) -> IoResult { + buffer.add("0123456789"); + connection_->setReadBufferReady(); + return {PostIoAction::KeepOpen, 10, false}; + })); + // Expect a change to the event mask when hitting the read buffer high-watermark. + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write)); + // The setReadBufferReady call adds a spurious read activation. + // TODO(antoniovicente) Skip the read activate in setReadBufferReady when read_disable_count_ > 0. + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + EXPECT_CALL(*read_filter, onNewConnection()).WillOnce(Return(FilterStatus::Continue)); + EXPECT_CALL(*read_filter, onData(_, false)).WillOnce(Return(FilterStatus::Continue)); + file_ready_cb_(Event::FileReadyType::Read); + + // Already read disabled, expect no changes to enabled events mask. + EXPECT_CALL(*file_event_, setEnabled(_)).Times(0); + connection_->readDisable(true); + connection_->readDisable(true); + connection_->readDisable(false); + // Read buffer is at the high watermark so read_disable_count should be == 1. Expect a read + // activate but no call to setEnable to change the registration mask. + EXPECT_CALL(*file_event_, setEnabled(_)).Times(0); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->readDisable(false); + + // Invoke the file event cb while read_disable_count_ == 1 and fully drain the read buffer. + // Expect no transport reads. Expect a read resumption due to transport_wants_read_ being true + // when read is re-enabled due to going under the low watermark. + EXPECT_CALL(*transport_socket_, doRead(_)).Times(0); + EXPECT_CALL(*read_filter, onData(_, _)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus { + EXPECT_EQ(10, data.length()); + data.drain(data.length()); + return FilterStatus::Continue; + })); + // The buffer is fully drained. Expect a read activation because setReadBufferReady set + // transport_wants_read_ and no transport doRead calls have happened. + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write)); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + file_ready_cb_(Event::FileReadyType::Read); + + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false})); + file_ready_cb_(Event::FileReadyType::Read); + + // Verify there are no read activate calls the event callback does a transport read and clears the + // transport_wants_read_ state. + EXPECT_CALL(*file_event_, setEnabled(_)); + connection_->readDisable(true); + EXPECT_CALL(*file_event_, setEnabled(_)); + EXPECT_CALL(*file_event_, activate(_)).Times(0); + connection_->readDisable(false); +} + // Test that BytesSentCb is invoked at the correct times TEST_F(MockTransportConnectionImplTest, BytesSentCallback) { uint64_t bytes_sent = 0; diff --git a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc index 58a3453f63ec..f9e76ee3024b 100644 --- a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc +++ b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc @@ -20,6 +20,7 @@ #include "extensions/transport_sockets/tls/context_manager_impl.h" #include "test/extensions/common/tap/common.h" +#include "test/integration/autonomous_upstream.h" #include "test/integration/integration.h" #include "test/integration/utility.h" #include "test/test_common/network_utility.h" @@ -181,6 +182,105 @@ TEST_P(SslIntegrationTest, AdminCertEndpoint) { EXPECT_EQ("200", response->headers().getStatusValue()); } +class RawWriteSslIntegrationTest : public SslIntegrationTest { +protected: + std::unique_ptr + testFragmentedRequestWithBufferLimit(std::list request_chunks, + uint32_t buffer_limit) { + autonomous_upstream_ = true; + config_helper_.setBufferLimits(buffer_limit, buffer_limit); + initialize(); + + // write_request_cb will write each of the items in request_chunks as a separate SSL_write. + auto write_request_cb = [&request_chunks](Network::ClientConnection& client) { + if (!request_chunks.empty()) { + Buffer::OwnedImpl buffer(request_chunks.front()); + client.write(buffer, false); + request_chunks.pop_front(); + } + }; + + auto client_transport_socket_factory_ptr = + createClientSslTransportSocketFactory({}, *context_manager_, *api_); + std::string response; + auto connection = createConnectionDriver( + lookupPort("http"), write_request_cb, + [&](Network::ClientConnection&, const Buffer::Instance& data) -> void { + response.append(data.toString()); + }, + client_transport_socket_factory_ptr->createTransportSocket({})); + + // Drive the connection until we get a response. + while (response.empty()) { + connection->run(Event::Dispatcher::RunType::NonBlock); + } + EXPECT_THAT(response, testing::HasSubstr("HTTP/1.1 200 OK\r\n")); + + connection->close(); + return reinterpret_cast(fake_upstreams_.front().get()) + ->lastRequestHeaders(); + } +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, RawWriteSslIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +// Regression test for https://github.com/envoyproxy/envoy/issues/12304 +TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcessingHeaders) { + // The raw writer will perform a separate SSL_write for each of the chunks below. Chunk sizes were + // picked such that the connection's high watermark will trigger while processing the last SSL + // record containing the request headers. Verify that read resumption works correctly after + // hitting the receive buffer high watermark. + std::list request_chunks = { + "GET / HTTP/1.1\r\nHost: host\r\n", + "key1:" + std::string(14000, 'a') + "\r\n", + "key2:" + std::string(16000, 'b') + "\r\n\r\n", + }; + + std::unique_ptr upstream_headers = + testFragmentedRequestWithBufferLimit(request_chunks, 15 * 1024); + ASSERT_TRUE(upstream_headers != nullptr); + EXPECT_EQ(upstream_headers->Host()->value(), "host"); + EXPECT_EQ( + std::string(14000, 'a'), + upstream_headers->get(Envoy::Http::LowerCaseString("key1"))[0]->value().getStringView()); + EXPECT_EQ( + std::string(16000, 'b'), + upstream_headers->get(Envoy::Http::LowerCaseString("key2"))[0]->value().getStringView()); +} + +// Regression test for https://github.com/envoyproxy/envoy/issues/12304 +TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcesingBody) { + // The raw writer will perform a separate SSL_write for each of the chunks below. Chunk sizes were + // picked such that the connection's high watermark will trigger while processing the last SSL + // record containing the POST body. Verify that read resumption works correctly after hitting the + // receive buffer high watermark. + std::list request_chunks = { + "POST / HTTP/1.1\r\nHost: host\r\ncontent-length: 30000\r\n\r\n", + std::string(14000, 'a'), + std::string(16000, 'a'), + }; + + std::unique_ptr upstream_headers = + testFragmentedRequestWithBufferLimit(request_chunks, 15 * 1024); + ASSERT_TRUE(upstream_headers != nullptr); +} + +// Regression test for https://github.com/envoyproxy/envoy/issues/12304 +TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcesingLargerBody) { + std::list request_chunks = { + "POST / HTTP/1.1\r\nHost: host\r\ncontent-length: 150000\r\n\r\n", + }; + for (int i = 0; i < 10; ++i) { + request_chunks.push_back(std::string(15000, 'a')); + } + + std::unique_ptr upstream_headers = + testFragmentedRequestWithBufferLimit(request_chunks, 16 * 1024); + ASSERT_TRUE(upstream_headers != nullptr); +} + // Validate certificate selection across different certificate types and client TLS versions. class SslCertficateIntegrationTest : public testing::TestWithParam< diff --git a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.h b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.h index ba209528805a..e7f615c54476 100644 --- a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.h +++ b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.h @@ -39,8 +39,6 @@ class SslIntegrationTestBase : public HttpIntegrationTest { // Set this true to debug SSL handshake issues with openssl s_client. The // verbose trace will be in the logs, openssl must be installed separately. bool debug_with_s_client_{false}; - -private: std::unique_ptr context_manager_; }; diff --git a/test/integration/base_integration_test.h b/test/integration/base_integration_test.h index 2a17a1cf07b5..be940cb85fa2 100644 --- a/test/integration/base_integration_test.h +++ b/test/integration/base_integration_test.h @@ -289,6 +289,22 @@ class BaseIntegrationTest : protected Logger::Loggable { *dispatcher_); } + /** + * Helper to create ConnectionDriver. + * + * @param port the port to connect to. + * @param write_request_cb callback used to send data. + * @param data_callback the callback on the received data. + * @param transport_socket transport socket to use for the client connection + **/ + std::unique_ptr createConnectionDriver( + uint32_t port, RawConnectionDriver::DoWriteCallback write_request_cb, + std::function&& data_callback, + Network::TransportSocketPtr transport_socket = nullptr) { + return std::make_unique(port, write_request_cb, data_callback, version_, + *dispatcher_, std::move(transport_socket)); + } + // Helper to create FakeUpstream. // Creates a fake upstream bound to the specified unix domain socket path. std::unique_ptr createFakeUpstream(const std::string& uds_path, diff --git a/test/integration/utility.cc b/test/integration/utility.cc index 05ed83a8d656..ddbd0816f53e 100644 --- a/test/integration/utility.cc +++ b/test/integration/utility.cc @@ -28,6 +28,21 @@ #include "absl/strings/match.h" namespace Envoy { +namespace { + +RawConnectionDriver::DoWriteCallback writeBufferCallback(Buffer::Instance& data) { + auto shared_data = std::make_shared(); + shared_data->move(data); + return [shared_data](Network::ClientConnection& client) { + if (shared_data->length() > 0) { + client.write(*shared_data, false); + shared_data->drain(shared_data->length()); + } + }; +} + +} // namespace + void BufferingStreamDecoder::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) { ASSERT(!complete_); complete_ = end_stream; @@ -115,15 +130,24 @@ IntegrationUtil::makeSingleRequest(uint32_t port, const std::string& method, con return makeSingleRequest(addr, method, url, body, type, host, content_type); } -RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& initial_data, - ReadCallback data_callback, +RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& request_data, + ReadCallback response_data_callback, + Network::Address::IpVersion version, + Event::Dispatcher& dispatcher, + Network::TransportSocketPtr transport_socket) + : RawConnectionDriver(port, writeBufferCallback(request_data), response_data_callback, version, + dispatcher, std::move(transport_socket)) {} + +RawConnectionDriver::RawConnectionDriver(uint32_t port, DoWriteCallback write_request_callback, + ReadCallback response_data_callback, Network::Address::IpVersion version, Event::Dispatcher& dispatcher, Network::TransportSocketPtr transport_socket) : dispatcher_(dispatcher) { api_ = Api::createApiForTest(stats_store_); Event::GlobalTimeSystem time_system; - callbacks_ = std::make_unique(); + callbacks_ = std::make_unique( + [this, write_request_callback]() { write_request_callback(*client_); }); if (transport_socket == nullptr) { transport_socket = Network::Test::createRawBufferSocket(); @@ -133,9 +157,13 @@ RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& initia Network::Utility::resolveUrl( fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version), port)), Network::Address::InstanceConstSharedPtr(), std::move(transport_socket), nullptr); + // ConnectionCallbacks will call write_request_callback from the connect and low-watermark + // callbacks. Set a small buffer limit so high-watermark is triggered after every write and + // low-watermark is triggered every time the buffer is drained. + client_->setBufferLimits(1); client_->addConnectionCallbacks(*callbacks_); - client_->addReadFilter(Network::ReadFilterSharedPtr{new ForwardingFilter(*this, data_callback)}); - client_->write(initial_data, false); + client_->addReadFilter( + Network::ReadFilterSharedPtr{new ForwardingFilter(*this, response_data_callback)}); client_->connect(); } diff --git a/test/integration/utility.h b/test/integration/utility.h index 6ff69ad27a83..497fe872472b 100644 --- a/test/integration/utility.h +++ b/test/integration/utility.h @@ -63,10 +63,17 @@ using BufferingStreamDecoderPtr = std::unique_ptr; */ class RawConnectionDriver { public: + using DoWriteCallback = std::function; using ReadCallback = std::function; - RawConnectionDriver(uint32_t port, Buffer::Instance& initial_data, ReadCallback data_callback, - Network::Address::IpVersion version, Event::Dispatcher& dispatcher, + RawConnectionDriver(uint32_t port, DoWriteCallback write_request_callback, + ReadCallback response_data_callback, Network::Address::IpVersion version, + Event::Dispatcher& dispatcher, + Network::TransportSocketPtr transport_socket = nullptr); + // Similar to the constructor above but accepts the request as a constructor argument. + RawConnectionDriver(uint32_t port, Buffer::Instance& request_data, + ReadCallback response_data_callback, Network::Address::IpVersion version, + Event::Dispatcher& dispatcher, Network::TransportSocketPtr transport_socket = nullptr); ~RawConnectionDriver(); const Network::Connection& connection() { return *client_; } @@ -83,37 +90,44 @@ class RawConnectionDriver { private: struct ForwardingFilter : public Network::ReadFilterBaseImpl { ForwardingFilter(RawConnectionDriver& parent, ReadCallback cb) - : parent_(parent), data_callback_(cb) {} + : parent_(parent), response_data_callback_(cb) {} // Network::ReadFilter Network::FilterStatus onData(Buffer::Instance& data, bool) override { - data_callback_(*parent_.client_, data); + response_data_callback_(*parent_.client_, data); data.drain(data.length()); return Network::FilterStatus::StopIteration; } RawConnectionDriver& parent_; - ReadCallback data_callback_; + ReadCallback response_data_callback_; }; struct ConnectionCallbacks : public Network::ConnectionCallbacks { + using WriteCb = std::function; + ConnectionCallbacks(WriteCb write_cb) : write_cb_(write_cb) {} bool connected() const { return connected_; } bool closed() const { return closed_; } // Network::ConnectionCallbacks void onEvent(Network::ConnectionEvent event) override { + if (!connected_ && event == Network::ConnectionEvent::Connected) { + write_cb_(); + } + last_connection_event_ = event; closed_ |= (event == Network::ConnectionEvent::RemoteClose || event == Network::ConnectionEvent::LocalClose); connected_ |= (event == Network::ConnectionEvent::Connected); } void onAboveWriteBufferHighWatermark() override {} - void onBelowWriteBufferLowWatermark() override {} + void onBelowWriteBufferLowWatermark() override { write_cb_(); } Network::ConnectionEvent last_connection_event_; private: + WriteCb write_cb_; bool connected_{false}; bool closed_{false}; };