Skip to content

Commit

Permalink
http2: Proactively disconnect connections flooded when resetting stre…
Browse files Browse the repository at this point in the history
…am (#13482)

Signed-off-by: Yan Avlasov <yavlasov@google.com>
  • Loading branch information
yanavlasov authored Oct 13, 2020
1 parent 876a6bb commit 549acee
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 3 deletions.
1 change: 1 addition & 0 deletions source/common/http/http2/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ void ConnectionImpl::StreamImpl::resetStream(StreamResetReason reason) {
auto status = parent_.sendPendingFrames();
// See comment in the `encodeHeadersBase()` method about this RELEASE_ASSERT.
RELEASE_ASSERT(status.ok(), "sendPendingFrames() failure in non dispatching context");
parent_.checkProtocolConstraintViolation();
}

void ConnectionImpl::StreamImpl::resetStreamWorker(StreamResetReason reason) {
Expand Down
1 change: 1 addition & 0 deletions source/common/http/http2/codec_impl_legacy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ void ConnectionImpl::StreamImpl::resetStream(StreamResetReason reason) {
// the cleanup logic to run which will reset the stream in all cases if all data frames could not
// be sent.
parent_.sendPendingFrames();
parent_.checkProtocolConstraintViolation();
}

void ConnectionImpl::StreamImpl::resetStreamWorker(StreamResetReason reason) {
Expand Down
41 changes: 41 additions & 0 deletions test/common/http/http2/codec_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2603,6 +2603,47 @@ TEST_P(Http2CodecImplTest, KeepAliveCausesOutboundFlood) {
EXPECT_EQ(1, server_stats_store_.counter("http2.outbound_flood").value());
}

// Verify that codec detects flood of RST_STREAM frame caused by resetStream() method
TEST_P(Http2CodecImplTest, ResetStreamCausesOutboundFlood) {
initialize();

TestRequestHeaderMapImpl request_headers;
HttpTestUtility::addDefaultHeaders(request_headers);
EXPECT_CALL(request_decoder_, decodeHeaders_(_, false));
request_encoder_->encodeHeaders(request_headers, false);

int frame_count = 0;
Buffer::OwnedImpl buffer;
ON_CALL(server_connection_, write(_, _))
.WillByDefault(Invoke([&buffer, &frame_count](Buffer::Instance& frame, bool) {
++frame_count;
buffer.move(frame);
}));

auto* violation_callback =
new NiceMock<Event::MockSchedulableCallback>(&server_connection_.dispatcher_);

TestResponseHeaderMapImpl response_headers{{":status", "200"}};
response_encoder_->encodeHeaders(response_headers, false);
// Account for the single HEADERS frame above
for (uint32_t i = 0; i < CommonUtility::OptionsLimits::DEFAULT_MAX_OUTBOUND_FRAMES - 1; ++i) {
Buffer::OwnedImpl data("0");
EXPECT_NO_THROW(response_encoder_->encodeData(data, false));
}

EXPECT_FALSE(violation_callback->enabled_);
EXPECT_CALL(server_stream_callbacks_, onResetStream(StreamResetReason::RemoteReset, _));

server_->getStream(1)->resetStream(StreamResetReason::RemoteReset);

EXPECT_TRUE(violation_callback->enabled_);
EXPECT_CALL(server_connection_, close(Envoy::Network::ConnectionCloseType::NoFlush));
violation_callback->invokeCallback();

EXPECT_EQ(frame_count, CommonUtility::OptionsLimits::DEFAULT_MAX_OUTBOUND_FRAMES + 1);
EXPECT_EQ(1, server_stats_store_.counter("http2.outbound_flood").value());
}

// CONNECT without upgrade type gets tagged with "bytestream"
TEST_P(Http2CodecImplTest, ConnectTest) {
client_http2_options_.set_allow_connect(true);
Expand Down
19 changes: 16 additions & 3 deletions test/integration/autonomous_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const char AutonomousStream::RESPONSE_DATA_BLOCKS[] = "response_data_blocks";
const char AutonomousStream::EXPECT_REQUEST_SIZE_BYTES[] = "expect_request_size_bytes";
const char AutonomousStream::RESET_AFTER_REQUEST[] = "reset_after_request";
const char AutonomousStream::NO_TRAILERS[] = "no_trailers";
const char AutonomousStream::NO_END_STREAM[] = "no_end_stream";

AutonomousStream::AutonomousStream(FakeHttpConnection& parent, Http::ResponseEncoder& encoder,
AutonomousUpstream& upstream, bool allow_incomplete_streams)
Expand Down Expand Up @@ -64,8 +65,9 @@ void AutonomousStream::sendResponse() {
int32_t response_data_blocks = 1;
HeaderToInt(RESPONSE_DATA_BLOCKS, response_data_blocks, headers);

const bool send_trailers = headers.get_(NO_TRAILERS).empty();
const bool headers_only_response = !send_trailers && response_data_blocks == 0;
const bool end_stream = headers.get_(NO_END_STREAM).empty();
const bool send_trailers = end_stream && headers.get_(NO_TRAILERS).empty();
const bool headers_only_response = !send_trailers && response_data_blocks == 0 && end_stream;

pre_response_headers_metadata_ = upstream_.preResponseHeadersMetadata();
if (pre_response_headers_metadata_) {
Expand All @@ -75,7 +77,8 @@ void AutonomousStream::sendResponse() {
encodeHeaders(upstream_.responseHeaders(), headers_only_response);
if (!headers_only_response) {
for (int32_t i = 0; i < response_data_blocks; ++i) {
encodeData(response_body_length, i == (response_data_blocks - 1) && !send_trailers);
encodeData(response_body_length,
i == (response_data_blocks - 1) && !send_trailers && end_stream);
}
if (send_trailers) {
encodeTrailers(upstream_.responseTrailers());
Expand Down Expand Up @@ -166,4 +169,14 @@ std::unique_ptr<Http::MetadataMapVector> AutonomousUpstream::preResponseHeadersM
return std::move(pre_response_headers_metadata_);
}

AssertionResult AutonomousUpstream::closeConnection(uint32_t index,
std::chrono::milliseconds timeout) {
return shared_connections_[index]->executeOnDispatcher(
[](Network::Connection& connection) {
ASSERT(connection.state() == Network::Connection::State::Open);
connection.close(Network::ConnectionCloseType::FlushWrite);
},
timeout);
}

} // namespace Envoy
4 changes: 4 additions & 0 deletions test/integration/autonomous_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class AutonomousStream : public FakeStream {
static const char RESET_AFTER_REQUEST[];
// Prevents upstream from sending trailers.
static const char NO_TRAILERS[];
// Prevents upstream from finishing response.
static const char NO_END_STREAM[];

AutonomousStream(FakeHttpConnection& parent, Http::ResponseEncoder& encoder,
AutonomousUpstream& upstream, bool allow_incomplete_streams);
Expand Down Expand Up @@ -81,6 +83,8 @@ class AutonomousUpstream : public FakeUpstream {
bool createListenerFilterChain(Network::ListenerFilterManager& listener) override;
void createUdpListenerFilterChain(Network::UdpListenerFilterManager& listener,
Network::UdpReadFilterCallbacks& callbacks) override;
AssertionResult closeConnection(uint32_t index,
std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

void setLastRequestHeaders(const Http::HeaderMap& headers);
std::unique_ptr<Http::TestRequestHeaderMapImpl> lastRequestHeaders();
Expand Down
56 changes: 56 additions & 0 deletions test/integration/http2_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2301,6 +2301,62 @@ name: send_local_reply_filter
EXPECT_EQ(1, test_server_->counter("http.config_test.downstream_cx_drain_close")->value());
}

// Verify that the server can detect flooding by the RST_STREAM on when upstream disconnects
// before sending response headers.
TEST_P(Http2FloodMitigationTest, RstStreamOnUpstreamRemoteCloseBeforeResponseHeaders) {
// pre-fill 3 away from overflow
prefillOutboundDownstreamQueue(AllFrameFloodLimit - 3);

// Start second request.
auto request2 =
Http2Frame::makePostRequest(Http2Frame::makeClientStreamId(1), "host", "/test/long/url");
sendFrame(request2);

// Wait for it to be proxied
test_server_->waitForCounterGe("cluster.cluster_0.upstream_rq_total", 2);

// Disconnect upstream connection. Since there no response headers were sent yet the router
// filter will send 503 with body and then RST_STREAM. With these 3 frames the downstream outbound
// frame queue should overflow.
ASSERT_TRUE(static_cast<AutonomousUpstream*>(fake_upstreams_.front().get())->closeConnection(0));

// Wait for connection to be flooded with outbound RST_STREAM frame and disconnected.
tcp_client_->waitForDisconnect();

ASSERT_EQ(1, test_server_->counter("cluster.cluster_0.upstream_cx_destroy")->value());
// Verify that the flood check was triggered
EXPECT_EQ(1, test_server_->counter("http2.outbound_flood")->value());
}

// Verify that the server can detect flooding by the RST_STREAM on stream idle timeout
// after sending response headers.
TEST_P(Http2FloodMitigationTest, RstStreamOnStreamIdleTimeoutAfterResponseHeaders) {
config_helper_.addConfigModifier(
[](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager&
hcm) {
auto* stream_idle_timeout = hcm.mutable_stream_idle_timeout();
std::chrono::milliseconds timeout(1000);
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(timeout);
stream_idle_timeout->set_seconds(seconds.count());
});
// pre-fill 2 away from overflow
prefillOutboundDownstreamQueue(AllFrameFloodLimit - 2);

// Start second request, which should result in response headers to be sent but the stream kept
// open.
auto request2 = Http2Frame::makeRequest(
Http2Frame::makeClientStreamId(1), "host", "/test/long/url",
{Http2Frame::Header("response_data_blocks", "0"), Http2Frame::Header("no_end_stream", "0")});
sendFrame(request2);

// Wait for stream idle timeout to send RST_STREAM. With the response headers frame from the
// second response the downstream outbound frame queue should overflow.
tcp_client_->waitForDisconnect();

EXPECT_EQ(1, test_server_->counter("http2.outbound_flood")->value());
EXPECT_EQ(1, test_server_->counter("http.config_test.downstream_rq_idle_timeout")->value());
}

// Verify detection of overflowing outbound frame queue with the PING frames sent by the keep alive
// timer. The test verifies protocol constraint violation handling in the
// Http2::ConnectionImpl::sendKeepalive() method.
Expand Down

0 comments on commit 549acee

Please sign in to comment.