diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index bfee20b0dfe2..1bb740a782f1 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -4,3 +4,5 @@ Changes ------- * listener: fix crash when disabling or re-enabling listeners due to overload while processing LDS updates. +* tls: fix read resumption after triggering buffer high-watermark and all remaining request/response bytes are stored in the SSL connection's internal buffers. + diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 649e8057d6d2..5cb16ca45a16 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -56,10 +56,8 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })), write_buffer_above_high_watermark_(false), detect_early_close_(true), enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false), - write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false) { - // Treat the lack of a valid fd (which in practice only happens if we run out of FDs) as an OOM - // condition and just crash. - RELEASE_ASSERT(SOCKET_VALID(ConnectionImpl::ioHandle().fd()), ""); + write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false), + transport_wants_read_(false) { if (!connected) { connecting_ = true; @@ -189,7 +187,7 @@ Connection::State ConnectionImpl::state() const { void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); } -bool ConnectionImpl::consumerWantsToRead() { +bool ConnectionImpl::filterChainWantsData() { return read_disable_count_ == 0 || (read_disable_count_ == 1 && read_buffer_.highWatermarkTriggered()); } @@ -268,7 +266,7 @@ void ConnectionImpl::noDelay(bool enable) { } void ConnectionImpl::onRead(uint64_t read_buffer_size) { - if (inDelayedClose() || !consumerWantsToRead()) { + if (inDelayedClose() || !filterChainWantsData()) { return; } ASSERT(ioHandle().isOpen()); @@ -354,11 +352,17 @@ void ConnectionImpl::readDisable(bool disable) { file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write); } - if (consumerWantsToRead() && read_buffer_.length() > 0) { - // If the connection has data buffered there's no guarantee there's also data in the kernel - // which will kick off the filter chain. Alternately if the read buffer has data the fd could - // be read disabled. To handle these cases, fake an event to make sure the buffered data gets - // processed regardless and ensure that we dispatch it via onRead. + if (filterChainWantsData() && (read_buffer_.length() > 0 || transport_wants_read_)) { + // If the read_buffer_ is not empty or transport_wants_read_ is true, the connection may be + // able to process additional bytes even if there is no data in the kernel to kick off the + // filter chain. Alternately if the read buffer has data the fd could be read disabled. To + // handle these cases, fake an event to make sure the buffered data in the read buffer or in + // transport socket internal buffers gets processed regardless and ensure that we dispatch it + // via onRead. + + // Sanity check: resumption with read_disable_count_ > 0 should only happen if the read + // buffer's high watermark has triggered. + ASSERT(read_buffer_.length() > 0 || read_disable_count_ == 0); dispatch_buffered_data_ = true; setReadBufferReady(); } @@ -553,12 +557,19 @@ void ConnectionImpl::onReadReady() { // 2) The consumer of connection data called readDisable(true), and instead of reading from the // socket we simply need to dispatch already read data. if (read_disable_count_ != 0) { - if (latched_dispatch_buffered_data && consumerWantsToRead()) { + // Do not clear transport_wants_read_ when returning early; the early return skips the transport + // socket doRead call. + if (latched_dispatch_buffered_data && filterChainWantsData()) { onRead(read_buffer_.length()); } return; } + // Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that + // the transport socket read resumption happens as requested; onReadReady() returns early without + // reading from the transport if the read buffer is above high watermark at the start of the + // method. + transport_wants_read_ = false; IoResult result = transport_socket_->doRead(read_buffer_); uint64_t new_buffer_size = read_buffer_.length(); updateReadBufferStats(result.bytes_processed_, new_buffer_size); diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index b464e2af96d1..16083f4c9196 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -115,7 +115,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback // TODO(htuch): While this is the basis for also yielding to other connections to provide some // fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees. // Reconsider how to make fairness happen. - void setReadBufferReady() override { file_event_->activate(Event::FileReadyType::Read); } + void setReadBufferReady() override { + transport_wants_read_ = true; + ioHandle().activateFileEvents(Event::FileReadyType::Read); + } void flushWriteBuffer() override; // Obtain global next connection ID. This should only be used in tests. @@ -125,11 +128,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback // A convenience function which returns true if // 1) The read disable count is zero or // 2) The read disable count is one due to the read buffer being overrun. - // In either case the consumer of the data would like to read from the buffer. - // If the read count is greater than one, or equal to one when the buffer is - // not overrun, then the consumer of the data has called readDisable, and does - // not want to read. - bool consumerWantsToRead(); + // In either case the filter chain would like to process data from the read buffer or transport + // socket. If the read count is greater than one, or equal to one when the buffer is not overrun, + // then the filter chain has called readDisable, and does not want additional data. + bool filterChainWantsData(); // Network::ConnectionImplBase void closeConnectionImmediately() override; @@ -195,6 +197,11 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback bool write_end_stream_ : 1; bool current_write_end_stream_ : 1; bool dispatch_buffered_data_ : 1; + // True if the most recent call to the transport socket's doRead method invoked setReadBufferReady + // to schedule read resumption after yielding due to shouldDrainReadBuffer(). When true, + // readDisable must schedule read resumption when read_disable_count_ == 0 to ensure that read + // resumption happens when remaining bytes are held in transport socket internal buffers. + bool transport_wants_read_ : 1; }; /** diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index 72168e2a666d..086962d34c2c 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -1657,6 +1657,197 @@ TEST_F(MockTransportConnectionImplTest, ObjectDestructOrder) { file_ready_cb_(Event::FileReadyType::Read); } +// Verify that read resumptions requested via setReadBufferReady() are scheduled once read is +// re-enabled. +TEST_F(MockTransportConnectionImplTest, ReadBufferReadyResumeAfterReadDisable) { + InSequence s; + + std::shared_ptr read_filter(new StrictMock()); + connection_->enableHalfClose(true); + connection_->addReadFilter(read_filter); + + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write)); + connection_->readDisable(true); + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write)); + // No calls to activate when re-enabling if there are no pending read requests. + EXPECT_CALL(*file_event_, activate(_)).Times(0); + connection_->readDisable(false); + + // setReadBufferReady triggers an immediate call to activate. + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->setReadBufferReady(); + + // When processing a sequence of read disable/read enable, changes to the enabled event mask + // happen only when the disable count transitions to/from 0. + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write)); + connection_->readDisable(true); + connection_->readDisable(true); + connection_->readDisable(true); + connection_->readDisable(false); + connection_->readDisable(false); + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write)); + // Expect a read activation since there have been no transport doRead calls since the call to + // setReadBufferReady. + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->readDisable(false); + + // No calls to doRead when file_ready_cb is invoked while read disabled. + EXPECT_CALL(*file_event_, setEnabled(_)); + connection_->readDisable(true); + EXPECT_CALL(*transport_socket_, doRead(_)).Times(0); + file_ready_cb_(Event::FileReadyType::Read); + + // Expect a read activate when re-enabling since the file ready cb has not done a read. + EXPECT_CALL(*file_event_, setEnabled(_)); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->readDisable(false); + + // Do a read to clear the transport_wants_read_ flag, verify that no read activation is scheduled. + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false})); + file_ready_cb_(Event::FileReadyType::Read); + EXPECT_CALL(*file_event_, setEnabled(_)); + connection_->readDisable(true); + EXPECT_CALL(*file_event_, setEnabled(_)); + // No read activate call. + EXPECT_CALL(*file_event_, activate(_)).Times(0); + connection_->readDisable(false); +} + +// Verify that read resumption is scheduled when read is re-enabled while the read buffer is +// non-empty. +TEST_F(MockTransportConnectionImplTest, ReadBufferResumeAfterReadDisable) { + InSequence s; + + std::shared_ptr read_filter(new StrictMock()); + connection_->setBufferLimits(5); + connection_->enableHalfClose(true); + connection_->addReadFilter(read_filter); + + // Add some data to the read buffer to trigger read activate calls when re-enabling read. + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Invoke([](Buffer::Instance& buffer) -> IoResult { + buffer.add("0123456789"); + return {PostIoAction::KeepOpen, 10, false}; + })); + // Expect a change to the event mask when hitting the read buffer high-watermark. + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write)); + EXPECT_CALL(*read_filter, onNewConnection()).WillOnce(Return(FilterStatus::Continue)); + EXPECT_CALL(*read_filter, onData(_, false)).WillOnce(Return(FilterStatus::Continue)); + file_ready_cb_(Event::FileReadyType::Read); + + // Already read disabled, expect no changes to enabled events mask. + EXPECT_CALL(*file_event_, setEnabled(_)).Times(0); + connection_->readDisable(true); + connection_->readDisable(true); + connection_->readDisable(false); + // Read buffer is at the high watermark so read_disable_count should be == 1. Expect a read + // activate but no call to setEnable to change the registration mask. + EXPECT_CALL(*file_event_, setEnabled(_)).Times(0); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->readDisable(false); + + // Invoke the file event cb while read_disable_count_ == 1 to partially drain the read buffer. + // Expect no transport reads. + EXPECT_CALL(*transport_socket_, doRead(_)).Times(0); + EXPECT_CALL(*read_filter, onData(_, _)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus { + EXPECT_EQ(10, data.length()); + data.drain(data.length() - 1); + return FilterStatus::Continue; + })); + // Partial drain of the read buffer below low watermark triggers an update to the fd enabled mask + // and a read activate since the read buffer is not empty. + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write)); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + file_ready_cb_(Event::FileReadyType::Read); + + // Drain the rest of the buffer and verify there are no spurious read activate calls. + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false})); + EXPECT_CALL(*read_filter, onData(_, _)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus { + EXPECT_EQ(1, data.length()); + data.drain(1); + return FilterStatus::Continue; + })); + file_ready_cb_(Event::FileReadyType::Read); + + EXPECT_CALL(*file_event_, setEnabled(_)); + connection_->readDisable(true); + EXPECT_CALL(*file_event_, setEnabled(_)); + // read buffer is empty, no read activate call. + EXPECT_CALL(*file_event_, activate(_)).Times(0); + connection_->readDisable(false); +} + +// Verify that transport_wants_read_ read resumption is not lost when processing read buffer +// high-watermark resumptions. +TEST_F(MockTransportConnectionImplTest, ResumeWhileAndAfterReadDisable) { + InSequence s; + + std::shared_ptr read_filter(new StrictMock()); + connection_->setBufferLimits(5); + connection_->enableHalfClose(true); + connection_->addReadFilter(read_filter); + + // Add some data to the read buffer and also call setReadBufferReady to mimic what transport + // sockets are expected to do when the read buffer high watermark is hit. + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Invoke([this](Buffer::Instance& buffer) -> IoResult { + buffer.add("0123456789"); + connection_->setReadBufferReady(); + return {PostIoAction::KeepOpen, 10, false}; + })); + // Expect a change to the event mask when hitting the read buffer high-watermark. + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write)); + // The setReadBufferReady call adds a spurious read activation. + // TODO(antoniovicente) Skip the read activate in setReadBufferReady when read_disable_count_ > 0. + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + EXPECT_CALL(*read_filter, onNewConnection()).WillOnce(Return(FilterStatus::Continue)); + EXPECT_CALL(*read_filter, onData(_, false)).WillOnce(Return(FilterStatus::Continue)); + file_ready_cb_(Event::FileReadyType::Read); + + // Already read disabled, expect no changes to enabled events mask. + EXPECT_CALL(*file_event_, setEnabled(_)).Times(0); + connection_->readDisable(true); + connection_->readDisable(true); + connection_->readDisable(false); + // Read buffer is at the high watermark so read_disable_count should be == 1. Expect a read + // activate but no call to setEnable to change the registration mask. + EXPECT_CALL(*file_event_, setEnabled(_)).Times(0); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->readDisable(false); + + // Invoke the file event cb while read_disable_count_ == 1 and fully drain the read buffer. + // Expect no transport reads. Expect a read resumption due to transport_wants_read_ being true + // when read is re-enabled due to going under the low watermark. + EXPECT_CALL(*transport_socket_, doRead(_)).Times(0); + EXPECT_CALL(*read_filter, onData(_, _)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus { + EXPECT_EQ(10, data.length()); + data.drain(data.length()); + return FilterStatus::Continue; + })); + // The buffer is fully drained. Expect a read activation because setReadBufferReady set + // transport_wants_read_ and no transport doRead calls have happened. + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write)); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + file_ready_cb_(Event::FileReadyType::Read); + + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false})); + file_ready_cb_(Event::FileReadyType::Read); + + // Verify there are no read activate calls the event callback does a transport read and clears the + // transport_wants_read_ state. + EXPECT_CALL(*file_event_, setEnabled(_)); + connection_->readDisable(true); + EXPECT_CALL(*file_event_, setEnabled(_)); + EXPECT_CALL(*file_event_, activate(_)).Times(0); + connection_->readDisable(false); +} + // Test that BytesSentCb is invoked at the correct times TEST_F(MockTransportConnectionImplTest, BytesSentCallback) { uint64_t bytes_sent = 0; diff --git a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc index 9994f8ca314b..52b2d115097e 100644 --- a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc +++ b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc @@ -19,6 +19,7 @@ #include "extensions/transport_sockets/tls/context_config_impl.h" #include "extensions/transport_sockets/tls/context_manager_impl.h" +#include "test/integration/autonomous_upstream.h" #include "test/integration/integration.h" #include "test/integration/utility.h" #include "test/test_common/network_utility.h" @@ -177,6 +178,105 @@ TEST_P(SslIntegrationTest, AdminCertEndpoint) { EXPECT_EQ("200", response->headers().getStatusValue()); } +class RawWriteSslIntegrationTest : public SslIntegrationTest { +protected: + std::unique_ptr + testFragmentedRequestWithBufferLimit(std::list request_chunks, + uint32_t buffer_limit) { + autonomous_upstream_ = true; + config_helper_.setBufferLimits(buffer_limit, buffer_limit); + initialize(); + + // write_request_cb will write each of the items in request_chunks as a separate SSL_write. + auto write_request_cb = [&request_chunks](Network::ClientConnection& client) { + if (!request_chunks.empty()) { + Buffer::OwnedImpl buffer(request_chunks.front()); + client.write(buffer, false); + request_chunks.pop_front(); + } + }; + + auto client_transport_socket_factory_ptr = + createClientSslTransportSocketFactory({}, *context_manager_, *api_); + std::string response; + auto connection = createConnectionDriver( + lookupPort("http"), write_request_cb, + [&](Network::ClientConnection&, const Buffer::Instance& data) -> void { + response.append(data.toString()); + }, + client_transport_socket_factory_ptr->createTransportSocket({})); + + // Drive the connection until we get a response. + while (response.empty()) { + connection->run(Event::Dispatcher::RunType::NonBlock); + } + EXPECT_THAT(response, testing::HasSubstr("HTTP/1.1 200 OK\r\n")); + + connection->close(); + return reinterpret_cast(fake_upstreams_.front().get()) + ->lastRequestHeaders(); + } +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, RawWriteSslIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +// Regression test for https://github.com/envoyproxy/envoy/issues/12304 +TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcessingHeaders) { + // The raw writer will perform a separate SSL_write for each of the chunks below. Chunk sizes were + // picked such that the connection's high watermark will trigger while processing the last SSL + // record containing the request headers. Verify that read resumption works correctly after + // hitting the receive buffer high watermark. + std::list request_chunks = { + "GET / HTTP/1.1\r\nHost: host\r\n", + "key1:" + std::string(14000, 'a') + "\r\n", + "key2:" + std::string(16000, 'b') + "\r\n\r\n", + }; + + std::unique_ptr upstream_headers = + testFragmentedRequestWithBufferLimit(request_chunks, 15 * 1024); + ASSERT_TRUE(upstream_headers != nullptr); + EXPECT_EQ(upstream_headers->Host()->value(), "host"); + EXPECT_EQ( + std::string(14000, 'a'), + upstream_headers->get(Envoy::Http::LowerCaseString("key1"))[0]->value().getStringView()); + EXPECT_EQ( + std::string(16000, 'b'), + upstream_headers->get(Envoy::Http::LowerCaseString("key2"))[0]->value().getStringView()); +} + +// Regression test for https://github.com/envoyproxy/envoy/issues/12304 +TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcesingBody) { + // The raw writer will perform a separate SSL_write for each of the chunks below. Chunk sizes were + // picked such that the connection's high watermark will trigger while processing the last SSL + // record containing the POST body. Verify that read resumption works correctly after hitting the + // receive buffer high watermark. + std::list request_chunks = { + "POST / HTTP/1.1\r\nHost: host\r\ncontent-length: 30000\r\n\r\n", + std::string(14000, 'a'), + std::string(16000, 'a'), + }; + + std::unique_ptr upstream_headers = + testFragmentedRequestWithBufferLimit(request_chunks, 15 * 1024); + ASSERT_TRUE(upstream_headers != nullptr); +} + +// Regression test for https://github.com/envoyproxy/envoy/issues/12304 +TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcesingLargerBody) { + std::list request_chunks = { + "POST / HTTP/1.1\r\nHost: host\r\ncontent-length: 150000\r\n\r\n", + }; + for (int i = 0; i < 10; ++i) { + request_chunks.push_back(std::string(15000, 'a')); + } + + std::unique_ptr upstream_headers = + testFragmentedRequestWithBufferLimit(request_chunks, 16 * 1024); + ASSERT_TRUE(upstream_headers != nullptr); +} + // Validate certificate selection across different certificate types and client TLS versions. class SslCertficateIntegrationTest : public testing::TestWithParam< diff --git a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.h b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.h index 133e73bd433e..5af886e851bf 100644 --- a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.h +++ b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.h @@ -36,8 +36,6 @@ class SslIntegrationTestBase : public HttpIntegrationTest { // Set this true to debug SSL handshake issues with openssl s_client. The // verbose trace will be in the logs, openssl must be installed separately. bool debug_with_s_client_{false}; - -private: std::unique_ptr context_manager_; }; diff --git a/test/integration/base_integration_test.h b/test/integration/base_integration_test.h new file mode 100644 index 000000000000..be940cb85fa2 --- /dev/null +++ b/test/integration/base_integration_test.h @@ -0,0 +1,446 @@ +#pragma once + +#include +#include +#include + +#include "envoy/api/v2/discovery.pb.h" +#include "envoy/config/endpoint/v3/endpoint_components.pb.h" +#include "envoy/server/process_context.h" + +#include "common/config/api_version.h" +#include "common/config/version_converter.h" + +#include "extensions/transport_sockets/tls/context_manager_impl.h" + +#include "test/common/grpc/grpc_client_integration.h" +#include "test/config/utility.h" +#include "test/integration/fake_upstream.h" +#include "test/integration/integration_tcp_client.h" +#include "test/integration/server.h" +#include "test/integration/utility.h" +#include "test/mocks/buffer/mocks.h" +#include "test/mocks/common.h" +#include "test/mocks/server/transport_socket_factory_context.h" +#include "test/test_common/environment.h" +#include "test/test_common/test_time.h" + +#include "absl/types/optional.h" +#include "spdlog/spdlog.h" + +namespace Envoy { + +struct ApiFilesystemConfig { + std::string bootstrap_path_; + std::string cds_path_; + std::string eds_path_; + std::string lds_path_; + std::string rds_path_; +}; + +/** + * Test fixture for all integration tests. + */ +class BaseIntegrationTest : protected Logger::Loggable { +public: + using TestTimeSystemPtr = std::unique_ptr; + using InstanceConstSharedPtrFn = std::function; + + // Creates a test fixture with an upstream bound to INADDR_ANY on an unspecified port using the + // provided IP |version|. + BaseIntegrationTest(Network::Address::IpVersion version, + const std::string& config = ConfigHelper::httpProxyConfig()); + BaseIntegrationTest(Network::Address::IpVersion version, TestTimeSystemPtr, + const std::string& config = ConfigHelper::httpProxyConfig()) + : BaseIntegrationTest(version, config) {} + // Creates a test fixture with a specified |upstream_address| function that provides the IP and + // port to use. + BaseIntegrationTest(const InstanceConstSharedPtrFn& upstream_address_fn, + Network::Address::IpVersion version, + const std::string& config = ConfigHelper::httpProxyConfig()); + virtual ~BaseIntegrationTest() = default; + + // TODO(jmarantz): Remove this once + // https://github.com/envoyproxy/envoy-filter-example/pull/69 is reverted. + static TestTimeSystemPtr realTime() { return TestTimeSystemPtr(); } + + // Initialize the basic proto configuration, create fake upstreams, and start Envoy. + virtual void initialize(); + // Set up the fake upstream connections. This is called by initialize() and + // is virtual to allow subclass overrides. + virtual void createUpstreams(); + // Finalize the config and spin up an Envoy instance. + virtual void createEnvoy(); + // Sets upstream_protocol_ and alters the upstream protocol in the config_helper_ + void setUpstreamProtocol(FakeHttpConnection::Type protocol); + // Sets fake_upstreams_count_ + void setUpstreamCount(uint32_t count) { fake_upstreams_count_ = count; } + // Skip validation that ensures that all upstream ports are referenced by the + // configuration generated in ConfigHelper::finalize. + void skipPortUsageValidation() { config_helper_.skipPortUsageValidation(); } + // Make test more deterministic by using a fixed RNG value. + void setDeterministic() { deterministic_ = true; } + void setNewCodecs() { config_helper_.setNewCodecs(); } + + FakeHttpConnection::Type upstreamProtocol() const { return upstream_protocol_; } + + IntegrationTcpClientPtr + makeTcpConnection(uint32_t port, + const Network::ConnectionSocket::OptionsSharedPtr& options = nullptr); + + // Test-wide port map. + void registerPort(const std::string& key, uint32_t port); + uint32_t lookupPort(const std::string& key); + + // Set the endpoint's socket address to point at upstream at given index. + void setUpstreamAddress(uint32_t upstream_index, + envoy::config::endpoint::v3::LbEndpoint& endpoint) const; + + Network::ClientConnectionPtr makeClientConnection(uint32_t port); + virtual Network::ClientConnectionPtr + makeClientConnectionWithOptions(uint32_t port, + const Network::ConnectionSocket::OptionsSharedPtr& options); + + void registerTestServerPorts(const std::vector& port_names); + void createGeneratedApiTestServer(const std::string& bootstrap_path, + const std::vector& port_names, + Server::FieldValidationConfig validator_config, + bool allow_lds_rejection); + void createApiTestServer(const ApiFilesystemConfig& api_filesystem_config, + const std::vector& port_names, + Server::FieldValidationConfig validator_config, + bool allow_lds_rejection); + + Event::TestTimeSystem& timeSystem() { return time_system_; } + + Stats::IsolatedStoreImpl stats_store_; + Api::ApiPtr api_; + Api::ApiPtr api_for_server_stat_store_; + MockBufferFactory* mock_buffer_factory_; // Will point to the dispatcher's factory. + + // Enable the listener access log + void useListenerAccessLog(absl::string_view format = ""); + // Waits for the nth access log entry, defaulting to log entry 0. + std::string waitForAccessLog(const std::string& filename, uint32_t entry = 0); + + std::string listener_access_log_name_; + + // Functions for testing reloadable config (xDS) + void createXdsUpstream(); + void createXdsConnection(); + void cleanUpXdsConnection(); + + // See if a port can be successfully bound within the given timeout. + ABSL_MUST_USE_RESULT AssertionResult waitForPortAvailable( + uint32_t port, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + + // Helpers for setting up expectations and making the internal gears turn for xDS request/response + // sending/receiving to/from the (imaginary) xDS server. You should almost always use + // compareDiscoveryRequest() and sendDiscoveryResponse(), but the SotW/delta-specific versions are + // available if you're writing a SotW/delta-specific test. + // TODO(fredlas) expect_node was defaulting false here; the delta+SotW unification work restores + // it. + AssertionResult compareDiscoveryRequest( + const std::string& expected_type_url, const std::string& expected_version, + const std::vector& expected_resource_names, + const std::vector& expected_resource_names_added, + const std::vector& expected_resource_names_removed, bool expect_node = true, + const Protobuf::int32 expected_error_code = Grpc::Status::WellKnownGrpcStatus::Ok, + const std::string& expected_error_message = ""); + template + void sendDiscoveryResponse(const std::string& type_url, const std::vector& state_of_the_world, + const std::vector& added_or_updated, + const std::vector& removed, const std::string& version, + const bool api_downgrade = true) { + if (sotw_or_delta_ == Grpc::SotwOrDelta::Sotw) { + sendSotwDiscoveryResponse(type_url, state_of_the_world, version, api_downgrade); + } else { + sendDeltaDiscoveryResponse(type_url, added_or_updated, removed, version, api_downgrade); + } + } + + AssertionResult compareDeltaDiscoveryRequest( + const std::string& expected_type_url, + const std::vector& expected_resource_subscriptions, + const std::vector& expected_resource_unsubscriptions, + const Protobuf::int32 expected_error_code = Grpc::Status::WellKnownGrpcStatus::Ok, + const std::string& expected_error_message = "") { + return compareDeltaDiscoveryRequest(expected_type_url, expected_resource_subscriptions, + expected_resource_unsubscriptions, xds_stream_, + expected_error_code, expected_error_message); + } + + AssertionResult compareDeltaDiscoveryRequest( + const std::string& expected_type_url, + const std::vector& expected_resource_subscriptions, + const std::vector& expected_resource_unsubscriptions, FakeStreamPtr& stream, + const Protobuf::int32 expected_error_code = Grpc::Status::WellKnownGrpcStatus::Ok, + const std::string& expected_error_message = ""); + + // TODO(fredlas) expect_node was defaulting false here; the delta+SotW unification work restores + // it. + AssertionResult compareSotwDiscoveryRequest( + const std::string& expected_type_url, const std::string& expected_version, + const std::vector& expected_resource_names, bool expect_node = true, + const Protobuf::int32 expected_error_code = Grpc::Status::WellKnownGrpcStatus::Ok, + const std::string& expected_error_message = ""); + + template + void sendSotwDiscoveryResponse(const std::string& type_url, const std::vector& messages, + const std::string& version, const bool api_downgrade = true) { + API_NO_BOOST(envoy::api::v2::DiscoveryResponse) discovery_response; + discovery_response.set_version_info(version); + discovery_response.set_type_url(type_url); + for (const auto& message : messages) { + if (api_downgrade) { + discovery_response.add_resources()->PackFrom(API_DOWNGRADE(message)); + } else { + discovery_response.add_resources()->PackFrom(message); + } + } + static int next_nonce_counter = 0; + discovery_response.set_nonce(absl::StrCat("nonce", next_nonce_counter++)); + xds_stream_->sendGrpcMessage(discovery_response); + } + + template + void sendDeltaDiscoveryResponse(const std::string& type_url, + const std::vector& added_or_updated, + const std::vector& removed, + const std::string& version, const bool api_downgrade = true) { + sendDeltaDiscoveryResponse(type_url, added_or_updated, removed, version, xds_stream_, {}, + api_downgrade); + } + template + void + sendDeltaDiscoveryResponse(const std::string& type_url, const std::vector& added_or_updated, + const std::vector& removed, const std::string& version, + FakeStreamPtr& stream, const std::vector& aliases = {}, + const bool api_downgrade = true) { + auto response = createDeltaDiscoveryResponse(type_url, added_or_updated, removed, version, + aliases, api_downgrade); + stream->sendGrpcMessage(response); + } + + template + envoy::api::v2::DeltaDiscoveryResponse + createDeltaDiscoveryResponse(const std::string& type_url, const std::vector& added_or_updated, + const std::vector& removed, const std::string& version, + const std::vector& aliases, + const bool api_downgrade = true) { + + API_NO_BOOST(envoy::api::v2::DeltaDiscoveryResponse) response; + response.set_system_version_info("system_version_info_this_is_a_test"); + response.set_type_url(type_url); + for (const auto& message : added_or_updated) { + auto* resource = response.add_resources(); + ProtobufWkt::Any temp_any; + if (api_downgrade) { + temp_any.PackFrom(API_DOWNGRADE(message)); + resource->mutable_resource()->PackFrom(API_DOWNGRADE(message)); + } else { + temp_any.PackFrom(message); + resource->mutable_resource()->PackFrom(message); + } + resource->set_name(TestUtility::xdsResourceName(temp_any)); + resource->set_version(version); + for (const auto& alias : aliases) { + resource->add_aliases(alias); + } + } + *response.mutable_removed_resources() = {removed.begin(), removed.end()}; + static int next_nonce_counter = 0; + response.set_nonce(absl::StrCat("nonce", next_nonce_counter++)); + return response; + } + +private: + Event::GlobalTimeSystem time_system_; + +public: + Event::DispatcherPtr dispatcher_; + + /** + * Open a connection to Envoy, send a series of bytes, and return the + * response. This function will continue reading response bytes until Envoy + * closes the connection (as a part of error handling) or (if configured true) + * the complete headers are read. + * + * @param port the port to connect to. + * @param raw_http the data to send. + * @param response the response data will be sent here + * @param if the connection should be terminated once '\r\n\r\n' has been read. + **/ + void sendRawHttpAndWaitForResponse(int port, const char* raw_http, std::string* response, + bool disconnect_after_headers_complete = false); + + /** + * Helper to create ConnectionDriver. + * + * @param port the port to connect to. + * @param initial_data the data to send. + * @param data_callback the callback on the received data. + **/ + std::unique_ptr createConnectionDriver( + uint32_t port, const std::string& initial_data, + std::function&& data_callback) { + Buffer::OwnedImpl buffer(initial_data); + return std::make_unique(port, buffer, data_callback, version_, + *dispatcher_); + } + + /** + * Helper to create ConnectionDriver. + * + * @param port the port to connect to. + * @param write_request_cb callback used to send data. + * @param data_callback the callback on the received data. + * @param transport_socket transport socket to use for the client connection + **/ + std::unique_ptr createConnectionDriver( + uint32_t port, RawConnectionDriver::DoWriteCallback write_request_cb, + std::function&& data_callback, + Network::TransportSocketPtr transport_socket = nullptr) { + return std::make_unique(port, write_request_cb, data_callback, version_, + *dispatcher_, std::move(transport_socket)); + } + + // Helper to create FakeUpstream. + // Creates a fake upstream bound to the specified unix domain socket path. + std::unique_ptr createFakeUpstream(const std::string& uds_path, + FakeHttpConnection::Type type) { + return std::make_unique(uds_path, type, timeSystem()); + } + // Creates a fake upstream bound to the specified |address|. + std::unique_ptr + createFakeUpstream(const Network::Address::InstanceConstSharedPtr& address, + FakeHttpConnection::Type type, bool enable_half_close = false, + bool udp_fake_upstream = false) { + return std::make_unique(address, type, timeSystem(), enable_half_close, + udp_fake_upstream); + } + // Creates a fake upstream bound to INADDR_ANY and there is no specified port. + std::unique_ptr createFakeUpstream(FakeHttpConnection::Type type, + bool enable_half_close = false) { + return std::make_unique(0, type, version_, timeSystem(), enable_half_close); + } + std::unique_ptr + createFakeUpstream(Network::TransportSocketFactoryPtr&& transport_socket_factory, + FakeHttpConnection::Type type) { + return std::make_unique(std::move(transport_socket_factory), 0, type, version_, + timeSystem()); + } + // Helper to add FakeUpstream. + // Add a fake upstream bound to the specified unix domain socket path. + void addFakeUpstream(const std::string& uds_path, FakeHttpConnection::Type type) { + fake_upstreams_.emplace_back(createFakeUpstream(uds_path, type)); + } + // Add a fake upstream bound to the specified |address|. + void addFakeUpstream(const Network::Address::InstanceConstSharedPtr& address, + FakeHttpConnection::Type type, bool enable_half_close = false, + bool udp_fake_upstream = false) { + fake_upstreams_.emplace_back( + createFakeUpstream(address, type, enable_half_close, udp_fake_upstream)); + } + // Add a fake upstream bound to INADDR_ANY and there is no specified port. + void addFakeUpstream(FakeHttpConnection::Type type, bool enable_half_close = false) { + fake_upstreams_.emplace_back(createFakeUpstream(type, enable_half_close)); + } + void addFakeUpstream(Network::TransportSocketFactoryPtr&& transport_socket_factory, + FakeHttpConnection::Type type) { + fake_upstreams_.emplace_back(createFakeUpstream(std::move(transport_socket_factory), type)); + } + +protected: + bool initialized() const { return initialized_; } + + std::unique_ptr upstream_stats_store_; + + // Make sure the test server will be torn down after any fake client. + // The test server owns the runtime, which is often accessed by client and + // fake upstream codecs and must outlast them. + IntegrationTestServerPtr test_server_; + + // The IpVersion (IPv4, IPv6) to use. + Network::Address::IpVersion version_; + // IP Address to use when binding sockets on upstreams. + InstanceConstSharedPtrFn upstream_address_fn_; + // The config for envoy start-up. + ConfigHelper config_helper_; + // The ProcessObject to use when constructing the envoy server. + ProcessObjectOptRef process_object_{absl::nullopt}; + + // Steps that should be done before the envoy server starting. + std::function on_server_ready_function_; + + // Steps that should be done in parallel with the envoy server starting. E.g., xDS + // pre-init, control plane synchronization needed for server start. + std::function on_server_init_function_; + + // A map of keys to port names. Generally the names are pulled from the v2 listener name + // but if a listener is created via ADS, it will be from whatever key is used with registerPort. + TestEnvironment::PortMap port_map_; + + // The DrainStrategy that dictates the behaviour of + // DrainManagerImpl::drainClose(). + Server::DrainStrategy drain_strategy_{Server::DrainStrategy::Gradual}; + + // Member variables for xDS testing. + FakeUpstream* xds_upstream_{}; + FakeHttpConnectionPtr xds_connection_; + FakeStreamPtr xds_stream_; + bool create_xds_upstream_{false}; + bool tls_xds_upstream_{false}; + bool use_lds_{true}; // Use the integration framework's LDS set up. + + testing::NiceMock factory_context_; + Extensions::TransportSockets::Tls::ContextManagerImpl context_manager_{timeSystem()}; + + // The fake upstreams_ are created using the context_manager, so make sure + // they are destroyed before it is. + std::vector> fake_upstreams_; + + Grpc::SotwOrDelta sotw_or_delta_{Grpc::SotwOrDelta::Sotw}; + + spdlog::level::level_enum default_log_level_; + + // Target number of upstreams. + uint32_t fake_upstreams_count_{1}; + + // The duration of the drain manager graceful drain period. + std::chrono::seconds drain_time_{1}; + + // The number of worker threads that the test server uses. + uint32_t concurrency_{1}; + + // If true, use AutonomousUpstream for fake upstreams. + bool autonomous_upstream_{false}; + + // If true, allow incomplete streams in AutonomousUpstream + // This does nothing if autonomous_upstream_ is false + bool autonomous_allow_incomplete_streams_{false}; + + bool enable_half_close_{false}; + + // Whether the default created fake upstreams are UDP listeners. + bool udp_fake_upstream_{false}; + + // True if test will use a fixed RNG value. + bool deterministic_{}; + + // Set true when your test will itself take care of ensuring listeners are up, and registering + // them in the port_map_. + bool defer_listener_finalization_{false}; + + // By default the test server will use custom stats to notify on increment. + // This override exists for tests measuring stats memory. + bool use_real_stats_{}; + +private: + // The type for the Envoy-to-backend connection + FakeHttpConnection::Type upstream_protocol_{FakeHttpConnection::Type::HTTP1}; + // True if initialized() has been called. + bool initialized_{}; +}; + +} // namespace Envoy diff --git a/test/integration/utility.cc b/test/integration/utility.cc index c969a5b8a2ef..ea1a810c036d 100644 --- a/test/integration/utility.cc +++ b/test/integration/utility.cc @@ -27,6 +27,21 @@ #include "absl/strings/match.h" namespace Envoy { +namespace { + +RawConnectionDriver::DoWriteCallback writeBufferCallback(Buffer::Instance& data) { + auto shared_data = std::make_shared(); + shared_data->move(data); + return [shared_data](Network::ClientConnection& client) { + if (shared_data->length() > 0) { + client.write(*shared_data, false); + shared_data->drain(shared_data->length()); + } + }; +} + +} // namespace + void BufferingStreamDecoder::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) { ASSERT(!complete_); complete_ = end_stream; @@ -112,15 +127,24 @@ IntegrationUtil::makeSingleRequest(uint32_t port, const std::string& method, con return makeSingleRequest(addr, method, url, body, type, host, content_type); } -RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& initial_data, - ReadCallback data_callback, +RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& request_data, + ReadCallback response_data_callback, + Network::Address::IpVersion version, + Event::Dispatcher& dispatcher, + Network::TransportSocketPtr transport_socket) + : RawConnectionDriver(port, writeBufferCallback(request_data), response_data_callback, version, + dispatcher, std::move(transport_socket)) {} + +RawConnectionDriver::RawConnectionDriver(uint32_t port, DoWriteCallback write_request_callback, + ReadCallback response_data_callback, Network::Address::IpVersion version, Event::Dispatcher& dispatcher, Network::TransportSocketPtr transport_socket) : dispatcher_(dispatcher) { api_ = Api::createApiForTest(stats_store_); Event::GlobalTimeSystem time_system; - callbacks_ = std::make_unique(); + callbacks_ = std::make_unique( + [this, write_request_callback]() { write_request_callback(*client_); }); if (transport_socket == nullptr) { transport_socket = Network::Test::createRawBufferSocket(); @@ -130,9 +154,13 @@ RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& initia Network::Utility::resolveUrl( fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version), port)), Network::Address::InstanceConstSharedPtr(), std::move(transport_socket), nullptr); + // ConnectionCallbacks will call write_request_callback from the connect and low-watermark + // callbacks. Set a small buffer limit so high-watermark is triggered after every write and + // low-watermark is triggered every time the buffer is drained. + client_->setBufferLimits(1); client_->addConnectionCallbacks(*callbacks_); - client_->addReadFilter(Network::ReadFilterSharedPtr{new ForwardingFilter(*this, data_callback)}); - client_->write(initial_data, false); + client_->addReadFilter( + Network::ReadFilterSharedPtr{new ForwardingFilter(*this, response_data_callback)}); client_->connect(); } diff --git a/test/integration/utility.h b/test/integration/utility.h index 21235c2e2b42..d22feacd846b 100644 --- a/test/integration/utility.h +++ b/test/integration/utility.h @@ -61,10 +61,17 @@ using BufferingStreamDecoderPtr = std::unique_ptr; */ class RawConnectionDriver { public: + using DoWriteCallback = std::function; using ReadCallback = std::function; - RawConnectionDriver(uint32_t port, Buffer::Instance& initial_data, ReadCallback data_callback, - Network::Address::IpVersion version, Event::Dispatcher& dispatcher, + RawConnectionDriver(uint32_t port, DoWriteCallback write_request_callback, + ReadCallback response_data_callback, Network::Address::IpVersion version, + Event::Dispatcher& dispatcher, + Network::TransportSocketPtr transport_socket = nullptr); + // Similar to the constructor above but accepts the request as a constructor argument. + RawConnectionDriver(uint32_t port, Buffer::Instance& request_data, + ReadCallback response_data_callback, Network::Address::IpVersion version, + Event::Dispatcher& dispatcher, Network::TransportSocketPtr transport_socket = nullptr); ~RawConnectionDriver(); const Network::Connection& connection() { return *client_; } @@ -81,37 +88,44 @@ class RawConnectionDriver { private: struct ForwardingFilter : public Network::ReadFilterBaseImpl { ForwardingFilter(RawConnectionDriver& parent, ReadCallback cb) - : parent_(parent), data_callback_(cb) {} + : parent_(parent), response_data_callback_(cb) {} // Network::ReadFilter Network::FilterStatus onData(Buffer::Instance& data, bool) override { - data_callback_(*parent_.client_, data); + response_data_callback_(*parent_.client_, data); data.drain(data.length()); return Network::FilterStatus::StopIteration; } RawConnectionDriver& parent_; - ReadCallback data_callback_; + ReadCallback response_data_callback_; }; struct ConnectionCallbacks : public Network::ConnectionCallbacks { + using WriteCb = std::function; + ConnectionCallbacks(WriteCb write_cb) : write_cb_(write_cb) {} bool connected() const { return connected_; } bool closed() const { return closed_; } // Network::ConnectionCallbacks void onEvent(Network::ConnectionEvent event) override { + if (!connected_ && event == Network::ConnectionEvent::Connected) { + write_cb_(); + } + last_connection_event_ = event; closed_ |= (event == Network::ConnectionEvent::RemoteClose || event == Network::ConnectionEvent::LocalClose); connected_ |= (event == Network::ConnectionEvent::Connected); } void onAboveWriteBufferHighWatermark() override {} - void onBelowWriteBufferLowWatermark() override {} + void onBelowWriteBufferLowWatermark() override { write_cb_(); } Network::ConnectionEvent last_connection_event_; private: + WriteCb write_cb_; bool connected_{false}; bool closed_{false}; };