From 4204341ee54f92bfdd4004361f282e7d1d4a2c9c Mon Sep 17 00:00:00 2001 From: antonio Date: Mon, 30 Nov 2020 23:18:52 -0800 Subject: [PATCH] backport to v1.15: connection: Remember transport socket read resumption requests and replay them when re-enabling read. (#13772) (#14173) * connection: Remember transport socket read resumption requests and replay them when re-enabling read. (#13772) Fixes SslSocket read resumption after readDisable when processing the SSL record that contains the last bytes of the HTTP message Signed-off-by: Antonio Vicente --- docs/root/version_history/current.rst | 1 + source/common/network/connection_impl.cc | 32 ++- source/common/network/connection_impl.h | 19 +- test/common/network/connection_impl_test.cc | 191 ++++++++++++++++++ .../tls/integration/ssl_integration_test.cc | 98 +++++++++ .../tls/integration/ssl_integration_test.h | 2 - test/integration/integration.h | 16 ++ test/integration/utility.cc | 38 +++- test/integration/utility.h | 26 ++- 9 files changed, 395 insertions(+), 28 deletions(-) diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index bf6c1dbedd10..bd0c2db54bb3 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -5,3 +5,4 @@ Changes ------- * listener: fix crash when disabling or re-enabling listeners due to overload while processing LDS updates. * proxy_proto: fixed a bug where network filters would not have the correct downstreamRemoteAddress() when accessed from the StreamInfo. This could result in incorrect enforcement of RBAC rules in the RBAC network filter (but not in the RBAC HTTP filter), or incorrect access log addresses from tcp_proxy. +* tls: fix read resumption after triggering buffer high-watermark and all remaining request/response bytes are stored in the SSL connection's internal buffers. diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 649e8057d6d2..6415261e7345 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -56,7 +56,8 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })), write_buffer_above_high_watermark_(false), detect_early_close_(true), enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false), - write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false) { + write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false), + transport_wants_read_(false) { // Treat the lack of a valid fd (which in practice only happens if we run out of FDs) as an OOM // condition and just crash. RELEASE_ASSERT(SOCKET_VALID(ConnectionImpl::ioHandle().fd()), ""); @@ -189,7 +190,7 @@ Connection::State ConnectionImpl::state() const { void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); } -bool ConnectionImpl::consumerWantsToRead() { +bool ConnectionImpl::filterChainWantsData() { return read_disable_count_ == 0 || (read_disable_count_ == 1 && read_buffer_.highWatermarkTriggered()); } @@ -268,7 +269,7 @@ void ConnectionImpl::noDelay(bool enable) { } void ConnectionImpl::onRead(uint64_t read_buffer_size) { - if (inDelayedClose() || !consumerWantsToRead()) { + if (inDelayedClose() || !filterChainWantsData()) { return; } ASSERT(ioHandle().isOpen()); @@ -354,11 +355,17 @@ void ConnectionImpl::readDisable(bool disable) { file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write); } - if (consumerWantsToRead() && read_buffer_.length() > 0) { - // If the connection has data buffered there's no guarantee there's also data in the kernel - // which will kick off the filter chain. Alternately if the read buffer has data the fd could - // be read disabled. To handle these cases, fake an event to make sure the buffered data gets - // processed regardless and ensure that we dispatch it via onRead. + if (filterChainWantsData() && (read_buffer_.length() > 0 || transport_wants_read_)) { + // If the read_buffer_ is not empty or transport_wants_read_ is true, the connection may be + // able to process additional bytes even if there is no data in the kernel to kick off the + // filter chain. Alternately if the read buffer has data the fd could be read disabled. To + // handle these cases, fake an event to make sure the buffered data in the read buffer or in + // transport socket internal buffers gets processed regardless and ensure that we dispatch it + // via onRead. + + // Sanity check: resumption with read_disable_count_ > 0 should only happen if the read + // buffer's high watermark has triggered. + ASSERT(read_buffer_.length() > 0 || read_disable_count_ == 0); dispatch_buffered_data_ = true; setReadBufferReady(); } @@ -553,12 +560,19 @@ void ConnectionImpl::onReadReady() { // 2) The consumer of connection data called readDisable(true), and instead of reading from the // socket we simply need to dispatch already read data. if (read_disable_count_ != 0) { - if (latched_dispatch_buffered_data && consumerWantsToRead()) { + // Do not clear transport_wants_read_ when returning early; the early return skips the transport + // socket doRead call. + if (latched_dispatch_buffered_data && filterChainWantsData()) { onRead(read_buffer_.length()); } return; } + // Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that + // the transport socket read resumption happens as requested; onReadReady() returns early without + // reading from the transport if the read buffer is above high watermark at the start of the + // method. + transport_wants_read_ = false; IoResult result = transport_socket_->doRead(read_buffer_); uint64_t new_buffer_size = read_buffer_.length(); updateReadBufferStats(result.bytes_processed_, new_buffer_size); diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index b464e2af96d1..f66cca701e63 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -115,7 +115,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback // TODO(htuch): While this is the basis for also yielding to other connections to provide some // fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees. // Reconsider how to make fairness happen. - void setReadBufferReady() override { file_event_->activate(Event::FileReadyType::Read); } + void setReadBufferReady() override { + transport_wants_read_ = true; + file_event_->activate(Event::FileReadyType::Read); + } void flushWriteBuffer() override; // Obtain global next connection ID. This should only be used in tests. @@ -125,11 +128,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback // A convenience function which returns true if // 1) The read disable count is zero or // 2) The read disable count is one due to the read buffer being overrun. - // In either case the consumer of the data would like to read from the buffer. - // If the read count is greater than one, or equal to one when the buffer is - // not overrun, then the consumer of the data has called readDisable, and does - // not want to read. - bool consumerWantsToRead(); + // In either case the filter chain would like to process data from the read buffer or transport + // socket. If the read count is greater than one, or equal to one when the buffer is not overrun, + // then the filter chain has called readDisable, and does not want additional data. + bool filterChainWantsData(); // Network::ConnectionImplBase void closeConnectionImmediately() override; @@ -195,6 +197,11 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback bool write_end_stream_ : 1; bool current_write_end_stream_ : 1; bool dispatch_buffered_data_ : 1; + // True if the most recent call to the transport socket's doRead method invoked setReadBufferReady + // to schedule read resumption after yielding due to shouldDrainReadBuffer(). When true, + // readDisable must schedule read resumption when read_disable_count_ == 0 to ensure that read + // resumption happens when remaining bytes are held in transport socket internal buffers. + bool transport_wants_read_ : 1; }; /** diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index 72168e2a666d..086962d34c2c 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -1657,6 +1657,197 @@ TEST_F(MockTransportConnectionImplTest, ObjectDestructOrder) { file_ready_cb_(Event::FileReadyType::Read); } +// Verify that read resumptions requested via setReadBufferReady() are scheduled once read is +// re-enabled. +TEST_F(MockTransportConnectionImplTest, ReadBufferReadyResumeAfterReadDisable) { + InSequence s; + + std::shared_ptr read_filter(new StrictMock()); + connection_->enableHalfClose(true); + connection_->addReadFilter(read_filter); + + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write)); + connection_->readDisable(true); + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write)); + // No calls to activate when re-enabling if there are no pending read requests. + EXPECT_CALL(*file_event_, activate(_)).Times(0); + connection_->readDisable(false); + + // setReadBufferReady triggers an immediate call to activate. + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->setReadBufferReady(); + + // When processing a sequence of read disable/read enable, changes to the enabled event mask + // happen only when the disable count transitions to/from 0. + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write)); + connection_->readDisable(true); + connection_->readDisable(true); + connection_->readDisable(true); + connection_->readDisable(false); + connection_->readDisable(false); + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write)); + // Expect a read activation since there have been no transport doRead calls since the call to + // setReadBufferReady. + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->readDisable(false); + + // No calls to doRead when file_ready_cb is invoked while read disabled. + EXPECT_CALL(*file_event_, setEnabled(_)); + connection_->readDisable(true); + EXPECT_CALL(*transport_socket_, doRead(_)).Times(0); + file_ready_cb_(Event::FileReadyType::Read); + + // Expect a read activate when re-enabling since the file ready cb has not done a read. + EXPECT_CALL(*file_event_, setEnabled(_)); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->readDisable(false); + + // Do a read to clear the transport_wants_read_ flag, verify that no read activation is scheduled. + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false})); + file_ready_cb_(Event::FileReadyType::Read); + EXPECT_CALL(*file_event_, setEnabled(_)); + connection_->readDisable(true); + EXPECT_CALL(*file_event_, setEnabled(_)); + // No read activate call. + EXPECT_CALL(*file_event_, activate(_)).Times(0); + connection_->readDisable(false); +} + +// Verify that read resumption is scheduled when read is re-enabled while the read buffer is +// non-empty. +TEST_F(MockTransportConnectionImplTest, ReadBufferResumeAfterReadDisable) { + InSequence s; + + std::shared_ptr read_filter(new StrictMock()); + connection_->setBufferLimits(5); + connection_->enableHalfClose(true); + connection_->addReadFilter(read_filter); + + // Add some data to the read buffer to trigger read activate calls when re-enabling read. + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Invoke([](Buffer::Instance& buffer) -> IoResult { + buffer.add("0123456789"); + return {PostIoAction::KeepOpen, 10, false}; + })); + // Expect a change to the event mask when hitting the read buffer high-watermark. + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write)); + EXPECT_CALL(*read_filter, onNewConnection()).WillOnce(Return(FilterStatus::Continue)); + EXPECT_CALL(*read_filter, onData(_, false)).WillOnce(Return(FilterStatus::Continue)); + file_ready_cb_(Event::FileReadyType::Read); + + // Already read disabled, expect no changes to enabled events mask. + EXPECT_CALL(*file_event_, setEnabled(_)).Times(0); + connection_->readDisable(true); + connection_->readDisable(true); + connection_->readDisable(false); + // Read buffer is at the high watermark so read_disable_count should be == 1. Expect a read + // activate but no call to setEnable to change the registration mask. + EXPECT_CALL(*file_event_, setEnabled(_)).Times(0); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->readDisable(false); + + // Invoke the file event cb while read_disable_count_ == 1 to partially drain the read buffer. + // Expect no transport reads. + EXPECT_CALL(*transport_socket_, doRead(_)).Times(0); + EXPECT_CALL(*read_filter, onData(_, _)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus { + EXPECT_EQ(10, data.length()); + data.drain(data.length() - 1); + return FilterStatus::Continue; + })); + // Partial drain of the read buffer below low watermark triggers an update to the fd enabled mask + // and a read activate since the read buffer is not empty. + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write)); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + file_ready_cb_(Event::FileReadyType::Read); + + // Drain the rest of the buffer and verify there are no spurious read activate calls. + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false})); + EXPECT_CALL(*read_filter, onData(_, _)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus { + EXPECT_EQ(1, data.length()); + data.drain(1); + return FilterStatus::Continue; + })); + file_ready_cb_(Event::FileReadyType::Read); + + EXPECT_CALL(*file_event_, setEnabled(_)); + connection_->readDisable(true); + EXPECT_CALL(*file_event_, setEnabled(_)); + // read buffer is empty, no read activate call. + EXPECT_CALL(*file_event_, activate(_)).Times(0); + connection_->readDisable(false); +} + +// Verify that transport_wants_read_ read resumption is not lost when processing read buffer +// high-watermark resumptions. +TEST_F(MockTransportConnectionImplTest, ResumeWhileAndAfterReadDisable) { + InSequence s; + + std::shared_ptr read_filter(new StrictMock()); + connection_->setBufferLimits(5); + connection_->enableHalfClose(true); + connection_->addReadFilter(read_filter); + + // Add some data to the read buffer and also call setReadBufferReady to mimic what transport + // sockets are expected to do when the read buffer high watermark is hit. + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Invoke([this](Buffer::Instance& buffer) -> IoResult { + buffer.add("0123456789"); + connection_->setReadBufferReady(); + return {PostIoAction::KeepOpen, 10, false}; + })); + // Expect a change to the event mask when hitting the read buffer high-watermark. + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write)); + // The setReadBufferReady call adds a spurious read activation. + // TODO(antoniovicente) Skip the read activate in setReadBufferReady when read_disable_count_ > 0. + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + EXPECT_CALL(*read_filter, onNewConnection()).WillOnce(Return(FilterStatus::Continue)); + EXPECT_CALL(*read_filter, onData(_, false)).WillOnce(Return(FilterStatus::Continue)); + file_ready_cb_(Event::FileReadyType::Read); + + // Already read disabled, expect no changes to enabled events mask. + EXPECT_CALL(*file_event_, setEnabled(_)).Times(0); + connection_->readDisable(true); + connection_->readDisable(true); + connection_->readDisable(false); + // Read buffer is at the high watermark so read_disable_count should be == 1. Expect a read + // activate but no call to setEnable to change the registration mask. + EXPECT_CALL(*file_event_, setEnabled(_)).Times(0); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->readDisable(false); + + // Invoke the file event cb while read_disable_count_ == 1 and fully drain the read buffer. + // Expect no transport reads. Expect a read resumption due to transport_wants_read_ being true + // when read is re-enabled due to going under the low watermark. + EXPECT_CALL(*transport_socket_, doRead(_)).Times(0); + EXPECT_CALL(*read_filter, onData(_, _)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus { + EXPECT_EQ(10, data.length()); + data.drain(data.length()); + return FilterStatus::Continue; + })); + // The buffer is fully drained. Expect a read activation because setReadBufferReady set + // transport_wants_read_ and no transport doRead calls have happened. + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write)); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + file_ready_cb_(Event::FileReadyType::Read); + + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false})); + file_ready_cb_(Event::FileReadyType::Read); + + // Verify there are no read activate calls the event callback does a transport read and clears the + // transport_wants_read_ state. + EXPECT_CALL(*file_event_, setEnabled(_)); + connection_->readDisable(true); + EXPECT_CALL(*file_event_, setEnabled(_)); + EXPECT_CALL(*file_event_, activate(_)).Times(0); + connection_->readDisable(false); +} + // Test that BytesSentCb is invoked at the correct times TEST_F(MockTransportConnectionImplTest, BytesSentCallback) { uint64_t bytes_sent = 0; diff --git a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc index 9994f8ca314b..66cf82b905d2 100644 --- a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc +++ b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc @@ -19,6 +19,7 @@ #include "extensions/transport_sockets/tls/context_config_impl.h" #include "extensions/transport_sockets/tls/context_manager_impl.h" +#include "test/integration/autonomous_upstream.h" #include "test/integration/integration.h" #include "test/integration/utility.h" #include "test/test_common/network_utility.h" @@ -177,6 +178,103 @@ TEST_P(SslIntegrationTest, AdminCertEndpoint) { EXPECT_EQ("200", response->headers().getStatusValue()); } +class RawWriteSslIntegrationTest : public SslIntegrationTest { +protected: + std::unique_ptr + testFragmentedRequestWithBufferLimit(std::list request_chunks, + uint32_t buffer_limit) { + autonomous_upstream_ = true; + config_helper_.setBufferLimits(buffer_limit, buffer_limit); + initialize(); + + // write_request_cb will write each of the items in request_chunks as a separate SSL_write. + auto write_request_cb = [&request_chunks](Network::ClientConnection& client) { + if (!request_chunks.empty()) { + Buffer::OwnedImpl buffer(request_chunks.front()); + client.write(buffer, false); + request_chunks.pop_front(); + } + }; + + auto client_transport_socket_factory_ptr = + createClientSslTransportSocketFactory({}, *context_manager_, *api_); + std::string response; + auto connection = createConnectionDriver( + lookupPort("http"), write_request_cb, + [&](Network::ClientConnection&, const Buffer::Instance& data) -> void { + response.append(data.toString()); + }, + client_transport_socket_factory_ptr->createTransportSocket({})); + + // Drive the connection until we get a response. + while (response.empty()) { + connection->run(Event::Dispatcher::RunType::NonBlock); + } + EXPECT_THAT(response, testing::HasSubstr("HTTP/1.1 200 OK\r\n")); + + connection->close(); + return reinterpret_cast(fake_upstreams_.front().get()) + ->lastRequestHeaders(); + } +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, RawWriteSslIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +// Regression test for https://github.com/envoyproxy/envoy/issues/12304 +TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcessingHeaders) { + // The raw writer will perform a separate SSL_write for each of the chunks below. Chunk sizes were + // picked such that the connection's high watermark will trigger while processing the last SSL + // record containing the request headers. Verify that read resumption works correctly after + // hitting the receive buffer high watermark. + std::list request_chunks = { + "GET / HTTP/1.1\r\nHost: host\r\n", + "key1:" + std::string(14000, 'a') + "\r\n", + "key2:" + std::string(16000, 'b') + "\r\n\r\n", + }; + + std::unique_ptr upstream_headers = + testFragmentedRequestWithBufferLimit(request_chunks, 15 * 1024); + ASSERT_TRUE(upstream_headers != nullptr); + EXPECT_EQ(upstream_headers->Host()->value(), "host"); + EXPECT_EQ(std::string(14000, 'a'), + upstream_headers->get(Envoy::Http::LowerCaseString("key1"))->value().getStringView()); + EXPECT_EQ(std::string(16000, 'b'), + upstream_headers->get(Envoy::Http::LowerCaseString("key2"))->value().getStringView()); +} + +// Regression test for https://github.com/envoyproxy/envoy/issues/12304 +TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcesingBody) { + // The raw writer will perform a separate SSL_write for each of the chunks below. Chunk sizes were + // picked such that the connection's high watermark will trigger while processing the last SSL + // record containing the POST body. Verify that read resumption works correctly after hitting the + // receive buffer high watermark. + std::list request_chunks = { + "POST / HTTP/1.1\r\nHost: host\r\ncontent-length: 30000\r\n\r\n", + std::string(14000, 'a'), + std::string(16000, 'a'), + }; + + std::unique_ptr upstream_headers = + testFragmentedRequestWithBufferLimit(request_chunks, 15 * 1024); + ASSERT_TRUE(upstream_headers != nullptr); +} + +// Regression test for https://github.com/envoyproxy/envoy/issues/12304 +TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcesingLargerBody) { + std::list request_chunks = { + "POST / HTTP/1.1\r\nHost: host\r\ncontent-length: 150000\r\n\r\n", + }; + for (int i = 0; i < 10; ++i) { + request_chunks.push_back(std::string(15000, 'a')); + } + + std::unique_ptr upstream_headers = + testFragmentedRequestWithBufferLimit(request_chunks, 16 * 1024); + ASSERT_TRUE(upstream_headers != nullptr); +} + // Validate certificate selection across different certificate types and client TLS versions. class SslCertficateIntegrationTest : public testing::TestWithParam< diff --git a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.h b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.h index 133e73bd433e..5af886e851bf 100644 --- a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.h +++ b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.h @@ -36,8 +36,6 @@ class SslIntegrationTestBase : public HttpIntegrationTest { // Set this true to debug SSL handshake issues with openssl s_client. The // verbose trace will be in the logs, openssl must be installed separately. bool debug_with_s_client_{false}; - -private: std::unique_ptr context_manager_; }; diff --git a/test/integration/integration.h b/test/integration/integration.h index c68efaf0963e..894a052da5c4 100644 --- a/test/integration/integration.h +++ b/test/integration/integration.h @@ -392,6 +392,22 @@ class BaseIntegrationTest : protected Logger::Loggable { *dispatcher_); } + /** + * Helper to create ConnectionDriver. + * + * @param port the port to connect to. + * @param write_request_cb callback used to send data. + * @param data_callback the callback on the received data. + * @param transport_socket transport socket to use for the client connection + **/ + std::unique_ptr createConnectionDriver( + uint32_t port, RawConnectionDriver::DoWriteCallback write_request_cb, + std::function&& data_callback, + Network::TransportSocketPtr transport_socket = nullptr) { + return std::make_unique(port, write_request_cb, data_callback, version_, + *dispatcher_, std::move(transport_socket)); + } + protected: // Create the envoy server in another thread and start it. // Will not return until that server is listening. diff --git a/test/integration/utility.cc b/test/integration/utility.cc index c969a5b8a2ef..ea1a810c036d 100644 --- a/test/integration/utility.cc +++ b/test/integration/utility.cc @@ -27,6 +27,21 @@ #include "absl/strings/match.h" namespace Envoy { +namespace { + +RawConnectionDriver::DoWriteCallback writeBufferCallback(Buffer::Instance& data) { + auto shared_data = std::make_shared(); + shared_data->move(data); + return [shared_data](Network::ClientConnection& client) { + if (shared_data->length() > 0) { + client.write(*shared_data, false); + shared_data->drain(shared_data->length()); + } + }; +} + +} // namespace + void BufferingStreamDecoder::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) { ASSERT(!complete_); complete_ = end_stream; @@ -112,15 +127,24 @@ IntegrationUtil::makeSingleRequest(uint32_t port, const std::string& method, con return makeSingleRequest(addr, method, url, body, type, host, content_type); } -RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& initial_data, - ReadCallback data_callback, +RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& request_data, + ReadCallback response_data_callback, + Network::Address::IpVersion version, + Event::Dispatcher& dispatcher, + Network::TransportSocketPtr transport_socket) + : RawConnectionDriver(port, writeBufferCallback(request_data), response_data_callback, version, + dispatcher, std::move(transport_socket)) {} + +RawConnectionDriver::RawConnectionDriver(uint32_t port, DoWriteCallback write_request_callback, + ReadCallback response_data_callback, Network::Address::IpVersion version, Event::Dispatcher& dispatcher, Network::TransportSocketPtr transport_socket) : dispatcher_(dispatcher) { api_ = Api::createApiForTest(stats_store_); Event::GlobalTimeSystem time_system; - callbacks_ = std::make_unique(); + callbacks_ = std::make_unique( + [this, write_request_callback]() { write_request_callback(*client_); }); if (transport_socket == nullptr) { transport_socket = Network::Test::createRawBufferSocket(); @@ -130,9 +154,13 @@ RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& initia Network::Utility::resolveUrl( fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version), port)), Network::Address::InstanceConstSharedPtr(), std::move(transport_socket), nullptr); + // ConnectionCallbacks will call write_request_callback from the connect and low-watermark + // callbacks. Set a small buffer limit so high-watermark is triggered after every write and + // low-watermark is triggered every time the buffer is drained. + client_->setBufferLimits(1); client_->addConnectionCallbacks(*callbacks_); - client_->addReadFilter(Network::ReadFilterSharedPtr{new ForwardingFilter(*this, data_callback)}); - client_->write(initial_data, false); + client_->addReadFilter( + Network::ReadFilterSharedPtr{new ForwardingFilter(*this, response_data_callback)}); client_->connect(); } diff --git a/test/integration/utility.h b/test/integration/utility.h index 21235c2e2b42..d22feacd846b 100644 --- a/test/integration/utility.h +++ b/test/integration/utility.h @@ -61,10 +61,17 @@ using BufferingStreamDecoderPtr = std::unique_ptr; */ class RawConnectionDriver { public: + using DoWriteCallback = std::function; using ReadCallback = std::function; - RawConnectionDriver(uint32_t port, Buffer::Instance& initial_data, ReadCallback data_callback, - Network::Address::IpVersion version, Event::Dispatcher& dispatcher, + RawConnectionDriver(uint32_t port, DoWriteCallback write_request_callback, + ReadCallback response_data_callback, Network::Address::IpVersion version, + Event::Dispatcher& dispatcher, + Network::TransportSocketPtr transport_socket = nullptr); + // Similar to the constructor above but accepts the request as a constructor argument. + RawConnectionDriver(uint32_t port, Buffer::Instance& request_data, + ReadCallback response_data_callback, Network::Address::IpVersion version, + Event::Dispatcher& dispatcher, Network::TransportSocketPtr transport_socket = nullptr); ~RawConnectionDriver(); const Network::Connection& connection() { return *client_; } @@ -81,37 +88,44 @@ class RawConnectionDriver { private: struct ForwardingFilter : public Network::ReadFilterBaseImpl { ForwardingFilter(RawConnectionDriver& parent, ReadCallback cb) - : parent_(parent), data_callback_(cb) {} + : parent_(parent), response_data_callback_(cb) {} // Network::ReadFilter Network::FilterStatus onData(Buffer::Instance& data, bool) override { - data_callback_(*parent_.client_, data); + response_data_callback_(*parent_.client_, data); data.drain(data.length()); return Network::FilterStatus::StopIteration; } RawConnectionDriver& parent_; - ReadCallback data_callback_; + ReadCallback response_data_callback_; }; struct ConnectionCallbacks : public Network::ConnectionCallbacks { + using WriteCb = std::function; + ConnectionCallbacks(WriteCb write_cb) : write_cb_(write_cb) {} bool connected() const { return connected_; } bool closed() const { return closed_; } // Network::ConnectionCallbacks void onEvent(Network::ConnectionEvent event) override { + if (!connected_ && event == Network::ConnectionEvent::Connected) { + write_cb_(); + } + last_connection_event_ = event; closed_ |= (event == Network::ConnectionEvent::RemoteClose || event == Network::ConnectionEvent::LocalClose); connected_ |= (event == Network::ConnectionEvent::Connected); } void onAboveWriteBufferHighWatermark() override {} - void onBelowWriteBufferLowWatermark() override {} + void onBelowWriteBufferLowWatermark() override { write_cb_(); } Network::ConnectionEvent last_connection_event_; private: + WriteCb write_cb_; bool connected_{false}; bool closed_{false}; };