Skip to content

Commit

Permalink
Fix crash in tcp_proxy (envoyproxy#4323)
Browse files Browse the repository at this point in the history
* Fix crash in tcp_proxy.

Closing the upstream connection is not safe from the Filter destructor,
because it triggers events back into the downstream connection, which
is partially destructed.

Ensure that the upstream connection is closed before the destructor is called.

Fixes envoyproxy#4310

Signed-off-by: Greg Greenway <ggreenway@apple.com>
  • Loading branch information
ggreenway authored Sep 5, 2018
1 parent ae6a252 commit e34dcd6
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 14 deletions.
31 changes: 19 additions & 12 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,8 @@ Filter::~Filter() {
access_log->log(nullptr, nullptr, nullptr, getRequestInfo());
}

if (upstream_handle_) {
upstream_handle_->cancel();
}

if (upstream_conn_data_) {
upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush);
}
ASSERT(upstream_handle_ == nullptr);
ASSERT(upstream_conn_data_ == nullptr);
}

TcpProxyStats Config::SharedConfig::generateStats(Stats::Scope& scope) {
Expand Down Expand Up @@ -412,17 +407,29 @@ void Filter::onDownstreamEvent(Network::ConnectionEvent event) {
if (event == Network::ConnectionEvent::RemoteClose) {
upstream_conn_data_->connection().close(Network::ConnectionCloseType::FlushWrite);

if (upstream_conn_data_ != nullptr &&
upstream_conn_data_->connection().state() != Network::Connection::State::Closed) {
config_->drainManager().add(config_->sharedConfig(), std::move(upstream_conn_data_),
std::move(upstream_callbacks_), std::move(idle_timer_),
read_callbacks_->upstreamHost());
// Events raised from the previous line may cause upstream_conn_data_ to be NULL if
// it was able to immediately flush all data.

if (upstream_conn_data_ != nullptr) {
if (upstream_conn_data_->connection().state() != Network::Connection::State::Closed) {
config_->drainManager().add(config_->sharedConfig(), std::move(upstream_conn_data_),
std::move(upstream_callbacks_), std::move(idle_timer_),
read_callbacks_->upstreamHost());
} else {
upstream_conn_data_.reset();
}
}
} else if (event == Network::ConnectionEvent::LocalClose) {
upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush);
upstream_conn_data_.reset();
disableIdleTimer();
}
} else if (upstream_handle_) {
if (event == Network::ConnectionEvent::LocalClose ||
event == Network::ConnectionEvent::RemoteClose) {
upstream_handle_->cancel();
upstream_handle_ = nullptr;
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1705,6 +1705,10 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketPrefixAndAutoHostRewrite) {
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

Tcp::ConnectionPool::UpstreamCallbacks* upstream_callbacks = nullptr;
EXPECT_CALL(*conn_pool_.connection_data_, addUpstreamCallbacks(_))
.WillOnce(
Invoke([&](Tcp::ConnectionPool::UpstreamCallbacks& cb) { upstream_callbacks = &cb; }));
conn_pool_.host_->hostname_ = "newhost";
conn_pool_.poolReady(upstream_conn_);

Expand All @@ -1714,6 +1718,7 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketPrefixAndAutoHostRewrite) {
EXPECT_EQ(1U, stats_.named_.downstream_cx_websocket_total_.value());
EXPECT_EQ(0U, stats_.named_.downstream_cx_http1_active_.value());

upstream_callbacks->onEvent(Network::ConnectionEvent::RemoteClose);
filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList();
conn_manager_.reset();
EXPECT_EQ(0U, stats_.named_.downstream_cx_websocket_active_.value());
Expand Down Expand Up @@ -1753,8 +1758,13 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyData) {
EXPECT_CALL(upstream_conn_, write(_, false));
EXPECT_CALL(upstream_conn_, write(BufferEqual(&early_data), false));
EXPECT_CALL(filter_callbacks_.connection_, readDisable(false));
Tcp::ConnectionPool::UpstreamCallbacks* upstream_callbacks = nullptr;
EXPECT_CALL(*conn_pool_.connection_data_, addUpstreamCallbacks(_))
.WillOnce(
Invoke([&](Tcp::ConnectionPool::UpstreamCallbacks& cb) { upstream_callbacks = &cb; }));
conn_pool_.poolReady(upstream_conn_);

upstream_callbacks->onEvent(Network::ConnectionEvent::RemoteClose);
filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList();
conn_manager_.reset();
}
Expand Down Expand Up @@ -1828,7 +1838,12 @@ TEST_F(HttpConnectionManagerImplTest, WebSocketEarlyEndStream) {

EXPECT_CALL(upstream_conn_, write(_, false));
EXPECT_CALL(upstream_conn_, write(_, true)).Times(0);
Tcp::ConnectionPool::UpstreamCallbacks* upstream_callbacks = nullptr;
EXPECT_CALL(*conn_pool_.connection_data_, addUpstreamCallbacks(_))
.WillOnce(
Invoke([&](Tcp::ConnectionPool::UpstreamCallbacks& cb) { upstream_callbacks = &cb; }));
conn_pool_.poolReady(upstream_conn_);
upstream_callbacks->onEvent(Network::ConnectionEvent::RemoteClose);
filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList();
conn_manager_.reset();
}
Expand Down
2 changes: 2 additions & 0 deletions test/common/network/filter_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ TEST_F(NetworkFilterManagerTest, RateLimitAndTcpProxy) {
EXPECT_CALL(upstream_connection, write(BufferEqual(&buffer), _));
read_buffer_.add("hello");
manager.onRead();

connection.raiseEvent(ConnectionEvent::RemoteClose);
}

} // namespace Network
Expand Down
31 changes: 30 additions & 1 deletion test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,12 @@ class TcpProxyTest : public testing::Test {
.WillByDefault(SaveArg<0>(&access_log_data_));
}

~TcpProxyTest() {
if (filter_ != nullptr) {
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);
}
}

void configure(const envoy::config::filter::network::tcp_proxy::v2::TcpProxy& config) {
config_.reset(new Config(config, factory_context_));
}
Expand Down Expand Up @@ -734,6 +740,22 @@ TEST_F(TcpProxyTest, DisconnectBeforeData) {
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);
}

// Test that if the downstream connection is closed before the upstream connection
// is established, the upstream connection is cancelled.
TEST_F(TcpProxyTest, RemoteClosetBeforeUpstreamConnected) {
setup(1);
EXPECT_CALL(*conn_pool_handles_.at(0), cancel());
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);
}

// Test that if the downstream connection is closed before the upstream connection
// is established, the upstream connection is cancelled.
TEST_F(TcpProxyTest, LocalClosetBeforeUpstreamConnected) {
setup(1);
EXPECT_CALL(*conn_pool_handles_.at(0), cancel());
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::LocalClose);
}

TEST_F(TcpProxyTest, UpstreamConnectFailure) {
setup(1, accessLogConfig("%RESPONSE_FLAGS%"));

Expand Down Expand Up @@ -873,6 +895,7 @@ TEST_F(TcpProxyTest, IdleTimeoutWithOutstandingDataFlushed) {
TEST_F(TcpProxyTest, AccessLogUpstreamHost) {
setup(1, accessLogConfig("%UPSTREAM_HOST% %UPSTREAM_CLUSTER%"));
raiseEventUpstreamConnected(0);
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);
filter_.reset();
EXPECT_EQ(access_log_data_, "127.0.0.1:80 fake_cluster");
}
Expand All @@ -881,6 +904,7 @@ TEST_F(TcpProxyTest, AccessLogUpstreamHost) {
TEST_F(TcpProxyTest, AccessLogUpstreamLocalAddress) {
setup(1, accessLogConfig("%UPSTREAM_LOCAL_ADDRESS%"));
raiseEventUpstreamConnected(0);
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);
filter_.reset();
EXPECT_EQ(access_log_data_, "2.2.2.2:50000");
}
Expand All @@ -893,6 +917,7 @@ TEST_F(TcpProxyTest, AccessLogDownstreamAddress) {
filter_callbacks_.connection_.remote_address_ =
Network::Utility::resolveUrl("tcp://1.1.1.1:40000");
setup(1, accessLogConfig("%DOWNSTREAM_REMOTE_ADDRESS_WITHOUT_PORT% %DOWNSTREAM_LOCAL_ADDRESS%"));
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);
filter_.reset();
EXPECT_EQ(access_log_data_, "1.1.1.1 1.1.1.2:20000");
}
Expand Down Expand Up @@ -1075,6 +1100,9 @@ TEST_F(TcpProxyRoutingTest, NonRoutableConnection) {

EXPECT_EQ(total_cx + 1, config_->stats().downstream_cx_total_.value());
EXPECT_EQ(non_routable_cx + 1, config_->stats().downstream_cx_no_route_.value());

// Cleanup
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);
}

TEST_F(TcpProxyRoutingTest, RoutableConnection) {
Expand All @@ -1087,7 +1115,8 @@ TEST_F(TcpProxyRoutingTest, RoutableConnection) {
connection_.local_address_ = std::make_shared<Network::Address::Ipv4Instance>("1.2.3.4", 9999);

// Expect filter to try to open a connection to specified cluster.
EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _));
EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _))
.WillOnce(Return(nullptr));

filter_->onNewConnection();

Expand Down
2 changes: 1 addition & 1 deletion test/integration/fake_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ AssertionResult FakeRawConnection::write(const std::string& data, bool end_strea
Network::FilterStatus FakeRawConnection::ReadFilter::onData(Buffer::Instance& data,
bool end_stream) {
Thread::LockGuard lock(parent_.lock_);
ENVOY_LOG(debug, "got {} bytes", data.length());
ENVOY_LOG(debug, "got {} bytes, end_stream {}", data.length(), end_stream);
parent_.data_.append(data.toString());
parent_.half_closed_ = end_stream;
data.drain(data.length());
Expand Down

0 comments on commit e34dcd6

Please sign in to comment.