Skip to content

Commit

Permalink
hcm: bugfix - request hangs where 1st filter adds body, and 2nd filte…
Browse files Browse the repository at this point in the history
…r waits for it (#11248)

Fix a bug where a request without a body hangs where a filter adds a body, and a subsequent filter stops the iteration until decodeData.

Signed-off-by: Yuval Kohavi <yuval.kohavi@gmail.com>
  • Loading branch information
yuval-k authored May 20, 2020
1 parent 6293138 commit 95c3b41
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 17 deletions.
50 changes: 33 additions & 17 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,19 @@ void ConnectionManagerImpl::ActiveStream::traceRequest() {
}
}

void ConnectionManagerImpl::ActiveStream::maybeContinueDecoding(
const std::list<ActiveStreamDecoderFilterPtr>::iterator& continue_data_entry) {
if (continue_data_entry != decoder_filters_.end()) {
// We use the continueDecoding() code since it will correctly handle not calling
// decodeHeaders() again. Fake setting StopSingleIteration since the continueDecoding() code
// expects it.
ASSERT(buffered_request_data_);
(*continue_data_entry)->iteration_state_ =
ActiveStreamFilterBase::IterationState::StopSingleIteration;
(*continue_data_entry)->continueDecoding();
}
}

void ConnectionManagerImpl::ActiveStream::decodeHeaders(ActiveStreamDecoderFilter* filter,
RequestHeaderMap& headers,
bool end_stream) {
Expand Down Expand Up @@ -1096,6 +1109,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(ActiveStreamDecoderFilte
// Stop iteration IFF this is not the last filter. If it is the last filter, continue with
// processing since we need to handle the case where a terminal filter wants to buffer, but
// a previous filter has added body.
maybeContinueDecoding(continue_data_entry);
return;
}

Expand All @@ -1106,15 +1120,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(ActiveStreamDecoderFilte
}
}

if (continue_data_entry != decoder_filters_.end()) {
// We use the continueDecoding() code since it will correctly handle not calling
// decodeHeaders() again. Fake setting StopSingleIteration since the continueDecoding() code
// expects it.
ASSERT(buffered_request_data_);
(*continue_data_entry)->iteration_state_ =
ActiveStreamFilterBase::IterationState::StopSingleIteration;
(*continue_data_entry)->continueDecoding();
}
maybeContinueDecoding(continue_data_entry);

if (end_stream) {
disarmRequestTimeout();
Expand Down Expand Up @@ -1577,6 +1583,19 @@ void ConnectionManagerImpl::ActiveStream::encode100ContinueHeaders(
response_encoder_->encode100ContinueHeaders(headers);
}

void ConnectionManagerImpl::ActiveStream::maybeContinueEncoding(
const std::list<ActiveStreamEncoderFilterPtr>::iterator& continue_data_entry) {
if (continue_data_entry != encoder_filters_.end()) {
// We use the continueEncoding() code since it will correctly handle not calling
// encodeHeaders() again. Fake setting StopSingleIteration since the continueEncoding() code
// expects it.
ASSERT(buffered_response_data_);
(*continue_data_entry)->iteration_state_ =
ActiveStreamFilterBase::IterationState::StopSingleIteration;
(*continue_data_entry)->continueEncoding();
}
}

void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilter* filter,
ResponseHeaderMap& headers,
bool end_stream) {
Expand Down Expand Up @@ -1612,6 +1631,9 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilte
}

if (!continue_iteration) {
if (!(*entry)->end_stream_) {
maybeContinueEncoding(continue_data_entry);
}
return;
}

Expand All @@ -1626,14 +1648,8 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilte
(end_stream && continue_data_entry == encoder_filters_.end());
encodeHeadersInternal(headers, modified_end_stream);

if (continue_data_entry != encoder_filters_.end() && !modified_end_stream) {
// We use the continueEncoding() code since it will correctly handle not calling
// encodeHeaders() again. Fake setting StopSingleIteration since the continueEncoding() code
// expects it.
ASSERT(buffered_response_data_);
(*continue_data_entry)->iteration_state_ =
ActiveStreamFilterBase::IterationState::StopSingleIteration;
(*continue_data_entry)->continueEncoding();
if (!modified_end_stream) {
maybeContinueEncoding(continue_data_entry);
}
}

Expand Down
6 changes: 6 additions & 0 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,10 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
void addDecodedData(ActiveStreamDecoderFilter& filter, Buffer::Instance& data, bool streaming);
RequestTrailerMap& addDecodedTrailers();
MetadataMapVector& addDecodedMetadata();
// Helper function for the case where we have a header only request, but a filter adds a body
// to it.
void maybeContinueDecoding(
const std::list<ActiveStreamDecoderFilterPtr>::iterator& maybe_continue_data_entry);
void decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHeaderMap& headers,
bool end_stream);
// Sends data through decoding filter chains. filter_iteration_start_state indicates which
Expand All @@ -496,6 +500,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
// As with most of the encode functions, this runs encodeHeaders on various
// filters before calling encodeHeadersInternal which does final header munging and passes the
// headers to the encoder.
void maybeContinueEncoding(
const std::list<ActiveStreamEncoderFilterPtr>::iterator& maybe_continue_data_entry);
void encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHeaderMap& headers,
bool end_stream);
// Sends data through encoding filter chains. filter_iteration_start_state indicates which
Expand Down
48 changes: 48 additions & 0 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3237,6 +3237,54 @@ TEST_F(HttpConnectionManagerImplTest, DrainCloseRaceWithClose) {
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::LocalClose);
}

TEST_F(HttpConnectionManagerImplTest,
FilterThatWaitsForBodyCanBeCalledAfterFilterThatAddsBodyEvenIfItIsNotLast) {
InSequence s;
setup(false, "");

NiceMock<MockResponseEncoder> encoder;
EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> Http::Status {
RequestDecoder* decoder = &conn_manager_->newStream(encoder);
RequestHeaderMapPtr headers{
new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
decoder->decodeHeaders(std::move(headers), true);
return Http::okStatus();
}));

// 3 filters:
// 1st filter adds a body
// 2nd filter waits for the body
// 3rd filter simulates router filter.
setupFilterChain(3, 0);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, true))
.WillOnce(Invoke([&](RequestHeaderMap&, bool) -> FilterHeadersStatus {
Buffer::OwnedImpl body("body");
decoder_filters_[0]->callbacks_->addDecodedData(body, false);
return FilterHeadersStatus::Continue;
}));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Invoke([](RequestHeaderMap&, bool) -> FilterHeadersStatus {
return FilterHeadersStatus::StopIteration;
}));
EXPECT_CALL(*decoder_filters_[1], decodeData(_, true))
.WillOnce(Invoke(
[](Buffer::Instance&, bool) -> FilterDataStatus { return FilterDataStatus::Continue; }));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());
EXPECT_CALL(*decoder_filters_[2], decodeHeaders(_, false))
.WillOnce(Invoke([](RequestHeaderMap&, bool) -> FilterHeadersStatus {
return FilterHeadersStatus::Continue;
}));
EXPECT_CALL(*decoder_filters_[2], decodeData(_, true))
.WillOnce(Invoke(
[](Buffer::Instance&, bool) -> FilterDataStatus { return FilterDataStatus::Continue; }));
EXPECT_CALL(*decoder_filters_[2], decodeComplete());

Buffer::OwnedImpl fake_input;
conn_manager_->onData(fake_input, false);
}

TEST_F(HttpConnectionManagerImplTest, DrainClose) {
setup(true, "");

Expand Down
2 changes: 2 additions & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ envoy_cc_test_library(
"//source/extensions/filters/http/router:config",
"//source/extensions/filters/network/http_connection_manager:config",
"//test/common/upstream:utility_lib",
"//test/integration/filters:add_body_filter_config_lib",
"//test/integration/filters:add_trailers_filter_config_lib",
"//test/integration/filters:call_decodedata_once_filter_config_lib",
"//test/integration/filters:decode_headers_return_stop_all_filter_config_lib",
Expand All @@ -466,6 +467,7 @@ envoy_cc_test_library(
"//test/integration/filters:modify_buffer_filter_config_lib",
"//test/integration/filters:passthrough_filter_config_lib",
"//test/integration/filters:pause_filter_lib",
"//test/integration/filters:wait_for_whole_request_and_response_config_lib",
"//test/test_common:registry_lib",
"@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/network/http_connection_manager/v3:pkg_cc_proto",
Expand Down
30 changes: 30 additions & 0 deletions test/integration/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,36 @@ load(

envoy_package()

envoy_cc_test_library(
name = "add_body_filter_config_lib",
srcs = [
"add_body_filter.cc",
],
deps = [
":common_lib",
"//include/envoy/http:filter_interface",
"//include/envoy/registry",
"//include/envoy/server:filter_config_interface",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"//test/extensions/filters/http/common:empty_http_filter_config_lib",
],
)

envoy_cc_test_library(
name = "wait_for_whole_request_and_response_config_lib",
srcs = [
"wait_for_whole_request_and_response.cc",
],
deps = [
":common_lib",
"//include/envoy/http:filter_interface",
"//include/envoy/registry",
"//include/envoy/server:filter_config_interface",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"//test/extensions/filters/http/common:empty_http_filter_config_lib",
],
)

envoy_cc_test_library(
name = "add_trailers_filter_config_lib",
srcs = [
Expand Down
49 changes: 49 additions & 0 deletions test/integration/filters/add_body_filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include <string>

#include "envoy/http/filter.h"
#include "envoy/registry/registry.h"
#include "envoy/server/filter_config.h"

#include "common/buffer/buffer_impl.h"

#include "extensions/filters/http/common/pass_through_filter.h"

#include "test/extensions/filters/http/common/empty_http_filter_config.h"
#include "test/integration/filters/common.h"

namespace Envoy {

// A test filter that inserts body to a header only request/response.
class AddBodyStreamFilter : public Http::PassThroughFilter {
public:
constexpr static char name[] = "add-body-filter";

Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers,
bool end_stream) override {
if (end_stream) {
Buffer::OwnedImpl body("body");
headers.setContentLength(body.length());
decoder_callbacks_->addDecodedData(body, false);
}

return Http::FilterHeadersStatus::Continue;
}

Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers,
bool end_stream) override {
if (end_stream) {
Buffer::OwnedImpl body("body");
headers.setContentLength(body.length());
encoder_callbacks_->addEncodedData(body, false);
}

return Http::FilterHeadersStatus::Continue;
}
};

constexpr char AddBodyStreamFilter::name[];

static Registry::RegisterFactory<SimpleFilterConfig<AddBodyStreamFilter>,
Server::Configuration::NamedHttpFilterConfigFactory>
encoder_register_;
} // namespace Envoy
52 changes: 52 additions & 0 deletions test/integration/filters/wait_for_whole_request_and_response.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#include <string>

#include "envoy/http/filter.h"
#include "envoy/registry/registry.h"
#include "envoy/server/filter_config.h"

#include "extensions/filters/http/common/pass_through_filter.h"

#include "test/extensions/filters/http/common/empty_http_filter_config.h"
#include "test/integration/filters/common.h"

namespace Envoy {

// A test filter that waits for the request/response to finish before continuing.
class WaitForWholeRequestAndResponseStreamFilter : public Http::PassThroughFilter {
public:
constexpr static char name[] = "wait-for-whole-request-and-response-filter";

Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap&, bool end_stream) override {
if (end_stream) {
return Http::FilterHeadersStatus::Continue;
}
return Http::FilterHeadersStatus::StopIteration;
}
Http::FilterDataStatus decodeData(Buffer::Instance&, bool end_stream) override {
if (end_stream) {
return Http::FilterDataStatus::Continue;
}
return Http::FilterDataStatus::StopIterationAndBuffer;
}

Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap&, bool end_stream) override {
if (end_stream) {
return Http::FilterHeadersStatus::Continue;
}
return Http::FilterHeadersStatus::StopIteration;
}

Http::FilterDataStatus encodeData(Buffer::Instance&, bool end_stream) override {
if (end_stream) {
return Http::FilterDataStatus::Continue;
}
return Http::FilterDataStatus::StopIterationAndBuffer;
}
};

constexpr char WaitForWholeRequestAndResponseStreamFilter::name[];

static Registry::RegisterFactory<SimpleFilterConfig<WaitForWholeRequestAndResponseStreamFilter>,
Server::Configuration::NamedHttpFilterConfigFactory>
encoder_register_;
} // namespace Envoy
47 changes: 47 additions & 0 deletions test/integration/protocol_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,53 @@ name: health_check
EXPECT_EQ("503", response->headers().Status()->value().getStringView());
}

// Verifies behavior for https://github.com/envoyproxy/envoy/pull/11248
TEST_P(ProtocolIntegrationTest, AddBodyToRequestAndWaitForIt) {
// filters are prepended, so add them in reverse order
config_helper_.addFilter(R"EOF(
name: wait-for-whole-request-and-response-filter
)EOF");
config_helper_.addFilter(R"EOF(
name: add-body-filter
)EOF");
initialize();
codec_client_ = makeHttpConnection(lookupPort("http"));

auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_);
waitForNextUpstreamRequest();
EXPECT_EQ("body", upstream_request_->body().toString());
upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false);
// encode data, as we have a separate test for the transforming header only response.
upstream_request_->encodeData(128, true);
response->waitForEndStream();

EXPECT_TRUE(upstream_request_->complete());
EXPECT_TRUE(response->complete());
EXPECT_EQ("200", response->headers().Status()->value().getStringView());
}

TEST_P(ProtocolIntegrationTest, AddBodyToResponseAndWaitForIt) {
// filters are prepended, so add them in reverse order
config_helper_.addFilter(R"EOF(
name: add-body-filter
)EOF");
config_helper_.addFilter(R"EOF(
name: wait-for-whole-request-and-response-filter
)EOF");
initialize();
codec_client_ = makeHttpConnection(lookupPort("http"));

auto response = codec_client_->makeRequestWithBody(default_request_headers_, 128);
waitForNextUpstreamRequest();
upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true);
response->waitForEndStream();

EXPECT_TRUE(upstream_request_->complete());
EXPECT_TRUE(response->complete());
EXPECT_EQ("200", response->headers().Status()->value().getStringView());
EXPECT_EQ("body", response->body());
}

TEST_P(ProtocolIntegrationTest, AddEncodedTrailers) {
config_helper_.addFilter(R"EOF(
name: add-trailers-filter
Expand Down

0 comments on commit 95c3b41

Please sign in to comment.