diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index c491fa94c04e..eb7d4196d910 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -200,6 +200,12 @@ void Filter::readDisableUpstream(bool disable) { } void Filter::readDisableDownstream(bool disable) { + if (read_callbacks_->connection().state() != Network::Connection::State::Open) { + // During idle timeouts, we close both upstream and downstream with NoFlush. + // Envoy still does a best-effort flush which can case readDisableDownstream to be called + // despite the downstream connection being closed. + return; + } read_callbacks_->connection().readDisable(disable); if (disable) { diff --git a/test/common/tcp_proxy/tcp_proxy_test.cc b/test/common/tcp_proxy/tcp_proxy_test.cc index 3d810c8e9e56..f447f62b08e3 100644 --- a/test/common/tcp_proxy/tcp_proxy_test.cc +++ b/test/common/tcp_proxy/tcp_proxy_test.cc @@ -28,6 +28,7 @@ using testing::_; using testing::Invoke; +using testing::InvokeWithoutArgs; using testing::MatchesRegex; using testing::NiceMock; using testing::Return; @@ -420,15 +421,18 @@ class TcpProxyTest : public testing::Test { .WillRepeatedly(Return(nullptr)); } - filter_.reset(new Filter(config_, factory_context_.cluster_manager_)); - EXPECT_CALL(filter_callbacks_.connection_, readDisable(true)); - EXPECT_CALL(filter_callbacks_.connection_, enableHalfClose(true)); - filter_->initializeReadFilterCallbacks(filter_callbacks_); - EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection()); - - EXPECT_EQ(absl::optional(), filter_->computeHashKey()); - EXPECT_EQ(&filter_callbacks_.connection_, filter_->downstreamConnection()); - EXPECT_EQ(nullptr, filter_->metadataMatchCriteria()); + { + testing::InSequence sequence; + filter_.reset(new Filter(config_, factory_context_.cluster_manager_)); + EXPECT_CALL(filter_callbacks_.connection_, enableHalfClose(true)); + EXPECT_CALL(filter_callbacks_.connection_, readDisable(true)); + filter_->initializeReadFilterCallbacks(filter_callbacks_); + EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection()); + + EXPECT_EQ(absl::optional(), filter_->computeHashKey()); + EXPECT_EQ(&filter_callbacks_.connection_, filter_->downstreamConnection()); + EXPECT_EQ(nullptr, filter_->metadataMatchCriteria()); + } } void setup(uint32_t connections) { setup(connections, defaultConfig()); } @@ -816,6 +820,55 @@ TEST_F(TcpProxyTest, IdleTimerDisabledUpstreamClose) { upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); } +// Tests that flushing data during an idle timeout doesn't cause problems. +TEST_F(TcpProxyTest, IdleTimeoutWithOutstandingDataFlushed) { + envoy::config::filter::network::tcp_proxy::v2::TcpProxy config = defaultConfig(); + config.mutable_idle_timeout()->set_seconds(1); + setup(1, config); + + Event::MockTimer* idle_timer = new Event::MockTimer(&filter_callbacks_.connection_.dispatcher_); + EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(1000))); + raiseEventUpstreamConnected(0); + + Buffer::OwnedImpl buffer("hello"); + EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(1000))); + filter_->onData(buffer, false); + + buffer.add("hello2"); + EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(1000))); + upstream_callbacks_->onUpstreamData(buffer, false); + + EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(1000))); + filter_callbacks_.connection_.raiseBytesSentCallbacks(1); + + EXPECT_CALL(*idle_timer, enableTimer(std::chrono::milliseconds(1000))); + upstream_connections_.at(0)->raiseBytesSentCallbacks(2); + + // Mark the upstream connection as blocked. + // This should read-disable the downstream connection. + EXPECT_CALL(filter_callbacks_.connection_, readDisable(_)); + upstream_connections_.at(0)->runHighWatermarkCallbacks(); + + // When Envoy has an idle timeout, the following happens. + // Envoy closes the downstream connection + // Envoy closes the upstream connection. + // When closing the upstream connection with ConnectionCloseType::NoFlush, + // if there is data in the buffer, Envoy does a best-effort flush. + // If the write succeeds, Envoy may go under the flow control limit and start + // the callbacks to read-enable the already-closed downstream connection. + // + // In this case we expect readDisable to not be called on the already closed + // connection. + EXPECT_CALL(filter_callbacks_.connection_, readDisable(true)).Times(0); + EXPECT_CALL(*upstream_connections_.at(0), close(Network::ConnectionCloseType::NoFlush)) + .WillOnce(InvokeWithoutArgs( + [&]() -> void { upstream_connections_.at(0)->runLowWatermarkCallbacks(); })); + + EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush)); + EXPECT_CALL(*idle_timer, disableTimer()); + idle_timer->callback_(); +} + // Test that access log fields %UPSTREAM_HOST% and %UPSTREAM_CLUSTER% are correctly logged. TEST_F(TcpProxyTest, AccessLogUpstreamHost) { setup(1, accessLogConfig("%UPSTREAM_HOST% %UPSTREAM_CLUSTER%"));