Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ext_proc fuzzer test trigger ENVOY_BUG when clear route cache #27657

Merged
43 changes: 21 additions & 22 deletions source/extensions/filters/http/ext_proc/processor_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,7 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon
}
}

if (common_response.clear_route_cache()) {
clearRouteCache(common_response);
}

clearRouteCache(common_response);
onFinishProcessorCall(Grpc::Status::Ok);

if (common_response.status() == CommonResponse::CONTINUE_AND_REPLACE) {
Expand Down Expand Up @@ -324,10 +321,7 @@ absl::Status ProcessorState::handleBodyResponse(const BodyResponse& response) {
onFinishProcessorCall(Grpc::Status::FailedPrecondition);
}

if (common_response.clear_route_cache()) {
clearRouteCache(common_response);
}

clearRouteCache(common_response);
headers_ = nullptr;

if (send_trailers_ && trailers_available_) {
Expand Down Expand Up @@ -365,20 +359,6 @@ absl::Status ProcessorState::handleTrailersResponse(const TrailersResponse& resp
return absl::FailedPreconditionError("spurious message");
}

void ProcessorState::clearRouteCache(const CommonResponse& common_response) {
// Only clear the route cache if there is a mutation to the header and clearing is allowed.
if (filter_.config().disableClearRouteCache()) {
filter_.stats().clear_route_cache_disabled_.inc();
ENVOY_LOG(debug, "NOT clearing route cache, it is disabled in the config");
} else if (common_response.has_header_mutation()) {
ENVOY_LOG(debug, "clearing route cache");
filter_callbacks_->downstreamCallbacks()->clearRouteCache();
} else {
filter_.stats().clear_route_cache_ignored_.inc();
ENVOY_LOG(debug, "NOT clearing route cache, no header mutations detected");
}
}

void ProcessorState::enqueueStreamingChunk(Buffer::Instance& data, bool end_stream,
bool delivered) {
chunk_queue_.push(data, end_stream, delivered);
Expand Down Expand Up @@ -440,6 +420,25 @@ void DecodingProcessorState::clearWatermark() {
}
}

void DecodingProcessorState::clearRouteCache(const CommonResponse& common_response) {
if (!common_response.clear_route_cache()) {
return;
}
// Only clear the route cache if there is a mutation to the header and clearing is allowed.
if (filter_.config().disableClearRouteCache()) {
filter_.stats().clear_route_cache_disabled_.inc();
yanjunxiang-google marked this conversation as resolved.
Show resolved Hide resolved
ENVOY_LOG(debug, "NOT clearing route cache, it is disabled in the config");
return;
}
if (common_response.has_header_mutation()) {
ENVOY_LOG(debug, "clearing route cache");
decoder_callbacks_->downstreamCallbacks()->clearRouteCache();
return;
}
filter_.stats().clear_route_cache_ignored_.inc();
yanjunxiang-google marked this conversation as resolved.
Show resolved Hide resolved
ENVOY_LOG(debug, "NOT clearing route cache, no header mutations detected");
}

void EncodingProcessorState::setProcessingModeInternal(const ProcessingMode& mode) {
// Account for the different default behaviors of headers and trailers --
// headers are sent by default and trailers are not.
Expand Down
5 changes: 4 additions & 1 deletion source/extensions/filters/http/ext_proc/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
const envoy::config::core::v3::TrafficDirection traffic_direction_;

private:
void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse& common_response);
virtual void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse&) {}
};

class DecodingProcessorState : public ProcessorState {
Expand Down Expand Up @@ -280,6 +280,9 @@ class DecodingProcessorState : public ProcessorState {
void setProcessingModeInternal(
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode);

void
clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse& common_response) override;

Http::StreamDecoderFilterCallbacks* decoder_callbacks_{};
};

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -2086,4 +2086,55 @@ TEST_P(ExtProcIntegrationTest, GetAndSetHeadersAndTrailersWithHeaderScrubbing) {
verifyDownstreamResponse(*response, 200);
}

// Test clear route cache in both upstream and downstream header and body processing.
TEST_P(ExtProcIntegrationTest, GetAndSetBodyOnBothWithClearRouteCache) {
yanjunxiang-google marked this conversation as resolved.
Show resolved Hide resolved
proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED);
proto_config_.mutable_processing_mode()->set_response_body_mode(ProcessingMode::STREAMED);
initializeConfig();
HttpIntegrationTest::initialize();

auto response = sendDownstreamRequestWithBody("Replace this!", absl::nullopt);
processRequestHeadersMessage(
*grpc_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse& headers_resp) {
auto* content_length =
headers_resp.mutable_response()->mutable_header_mutation()->add_set_headers();
content_length->mutable_header()->set_key("content-length");
content_length->mutable_header()->set_value("13");
headers_resp.mutable_response()->set_clear_route_cache(true);
return true;
});
processRequestBodyMessage(
*grpc_upstreams_[0], false, [](const HttpBody& body, BodyResponse& body_resp) {
EXPECT_TRUE(body.end_of_stream());
auto* body_mut = body_resp.mutable_response()->mutable_body_mutation();
body_mut->set_body("Hello, World!");
body_resp.mutable_response()->set_clear_route_cache(true);
return true;
});

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_));
upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false);

processResponseHeadersMessage(
*grpc_upstreams_[0], false, [](const HttpHeaders&, HeadersResponse& headers_resp) {
headers_resp.mutable_response()->mutable_header_mutation()->add_remove_headers(
"content-length");
headers_resp.mutable_response()->set_clear_route_cache(true);
return true;
});
upstream_request_->encodeData(100, true);
processResponseBodyMessage(
*grpc_upstreams_[0], false, [](const HttpBody&, BodyResponse& body_resp) {
auto* header_mut = body_resp.mutable_response()->mutable_header_mutation();
auto* header_add = header_mut->add_set_headers();
header_add->mutable_header()->set_key("x-testing-response-header");
header_add->mutable_header()->set_value("Yes");
body_resp.mutable_response()->set_clear_route_cache(true);
yanjunxiang-google marked this conversation as resolved.
Show resolved Hide resolved
return true;
});
verifyDownstreamResponse(*response, 200);
}

} // namespace Envoy
22 changes: 14 additions & 8 deletions test/extensions/filters/http/ext_proc/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2064,7 +2064,8 @@ TEST_F(HttpFilterTest, ProcessingModeResponseHeadersOnlyWithoutCallingDecodeHead
}

// Using the default configuration, verify that the "clear_route_cache" flag makes the appropriate
// callback on the filter when header modifications are also present.
// callback on the filter for inbound traffic when header modifications are also present.
// Also verify it does not make the callback for outbound traffic.
TEST_F(HttpFilterTest, ClearRouteCacheHeaderMutation) {
initialize(R"EOF(
grpc_service:
Expand All @@ -2075,7 +2076,7 @@ TEST_F(HttpFilterTest, ClearRouteCacheHeaderMutation) {
)EOF");

EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, true));

// Call ClearRouteCache() for inbound traffic with header mutation.
EXPECT_CALL(decoder_callbacks_.downstream_callbacks_, clearRouteCache());
processRequestHeaders(false, [](const HttpHeaders&, ProcessingResponse&, HeadersResponse& resp) {
auto* resp_headers_mut = resp.mutable_response()->mutable_header_mutation();
Expand All @@ -2092,9 +2093,8 @@ TEST_F(HttpFilterTest, ClearRouteCacheHeaderMutation) {
Buffer::OwnedImpl buffered_response_data;
setUpEncodingBuffering(buffered_response_data);

// There is no ClearRouteCache() call for outbound traffic.
EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(resp_data, true));

EXPECT_CALL(encoder_callbacks_.downstream_callbacks_, clearRouteCache());
processResponseBody([](const HttpBody&, ProcessingResponse&, BodyResponse& resp) {
auto* resp_headers_mut = resp.mutable_response()->mutable_header_mutation();
auto* resp_add = resp_headers_mut->add_set_headers();
Expand All @@ -2105,14 +2105,16 @@ TEST_F(HttpFilterTest, ClearRouteCacheHeaderMutation) {

filter_->onDestroy();

EXPECT_EQ(config_->stats().clear_route_cache_disabled_.value(), 0);
EXPECT_EQ(config_->stats().clear_route_cache_ignored_.value(), 0);
EXPECT_EQ(config_->stats().streams_started_.value(), 1);
EXPECT_EQ(config_->stats().stream_msgs_sent_.value(), 3);
EXPECT_EQ(config_->stats().stream_msgs_received_.value(), 3);
EXPECT_EQ(config_->stats().streams_closed_.value(), 1);
}

// Verify that the "disable_route_cache_clearing" setting prevents the "clear_route_cache" flag
// from performing route clearing callbacks when enabled.
// from performing route clearing callbacks for inbound traffic when enabled.
TEST_F(HttpFilterTest, ClearRouteCacheDisabledHeaderMutation) {
initialize(R"EOF(
grpc_service:
Expand All @@ -2123,6 +2125,7 @@ TEST_F(HttpFilterTest, ClearRouteCacheDisabledHeaderMutation) {
disable_clear_route_cache: true
)EOF");

// The ClearRouteCache() call is disabled for inbound traffic.
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, true));
processRequestHeaders(false, [](const HttpHeaders&, ProcessingResponse&, HeadersResponse& resp) {
auto* resp_headers_mut = resp.mutable_response()->mutable_header_mutation();
Expand All @@ -2139,6 +2142,7 @@ TEST_F(HttpFilterTest, ClearRouteCacheDisabledHeaderMutation) {
Buffer::OwnedImpl buffered_response_data;
setUpEncodingBuffering(buffered_response_data);

// There is no ClearRouteCache() call for outbound traffic regardless.
EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(resp_data, true));
processResponseBody([](const HttpBody&, ProcessingResponse&, BodyResponse& resp) {
auto* resp_headers_mut = resp.mutable_response()->mutable_header_mutation();
Expand All @@ -2150,16 +2154,16 @@ TEST_F(HttpFilterTest, ClearRouteCacheDisabledHeaderMutation) {

filter_->onDestroy();

EXPECT_EQ(config_->stats().clear_route_cache_disabled_.value(), 1);
EXPECT_EQ(config_->stats().clear_route_cache_ignored_.value(), 0);
EXPECT_EQ(config_->stats().clear_route_cache_disabled_.value(), 2);
EXPECT_EQ(config_->stats().streams_started_.value(), 1);
EXPECT_EQ(config_->stats().stream_msgs_sent_.value(), 3);
EXPECT_EQ(config_->stats().stream_msgs_received_.value(), 3);
EXPECT_EQ(config_->stats().streams_closed_.value(), 1);
}

// Using the default configuration, verify that the "clear_route_cache" flag does not preform
// route clearing callbacks on the filter when no header changes are present.
// route clearing callbacks for inbound traffic when no header changes are present.
TEST_F(HttpFilterTest, ClearRouteCacheUnchanged) {
initialize(R"EOF(
grpc_service:
Expand All @@ -2169,6 +2173,7 @@ TEST_F(HttpFilterTest, ClearRouteCacheUnchanged) {
response_body_mode: "BUFFERED"
)EOF");

// Do not call ClearRouteCache() for inbound traffic without header mutation.
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, true));
processRequestHeaders(false, [](const HttpHeaders&, ProcessingResponse&, HeadersResponse& resp) {
resp.mutable_response()->set_clear_route_cache(true);
Expand All @@ -2181,14 +2186,15 @@ TEST_F(HttpFilterTest, ClearRouteCacheUnchanged) {
Buffer::OwnedImpl buffered_response_data;
setUpEncodingBuffering(buffered_response_data);

// There is no ClearRouteCache() call for outbound traffic regardless.
EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(resp_data, true));
processResponseBody([](const HttpBody&, ProcessingResponse&, BodyResponse& resp) {
resp.mutable_response()->set_clear_route_cache(true);
});

filter_->onDestroy();

EXPECT_EQ(config_->stats().clear_route_cache_ignored_.value(), 2);
EXPECT_EQ(config_->stats().clear_route_cache_ignored_.value(), 1);
EXPECT_EQ(config_->stats().clear_route_cache_disabled_.value(), 0);
EXPECT_EQ(config_->stats().streams_started_.value(), 1);
EXPECT_EQ(config_->stats().stream_msgs_sent_.value(), 3);
Expand Down