diff --git a/api/envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.proto b/api/envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.proto index 9f5c952a57ea..ad5ab5d507bb 100644 --- a/api/envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.proto +++ b/api/envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.proto @@ -23,16 +23,16 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // The External Processing filter allows an external service to act on HTTP traffic in a flexible way. // **Current Implementation Status:** -// At this time, the filter will send a "request_headers" message to the server when the -// filter is invoked from the downstream, and apply any header mutations returned by the -// server. No other part of the protocol is implemented yet. +// At this time, the filter will send the "request_headers" and "response_headers" messages +// to the server when the filter is invoked, and apply any header mutations returned by the +// server, and respond to "immediate_response" messages. No other parts of the protocol are implemented yet. // As designed, the filter supports up to six different processing steps, which are in the // process of being implemented: // * Request headers: IMPLEMENTED // * Request body: NOT IMPLEMENTED // * Request trailers: NOT IMPLEMENTED -// * Response headers: NOT IMPLEMENTED +// * Response headers: IMPLEMENTED // * Response body: NOT IMPLEMENTED // * Response trailers: NOT IMPLEMENTED @@ -78,7 +78,6 @@ message ExternalProcessor { // The filter supports both the "Envoy" and "Google" gRPC clients. config.core.v3.GrpcService grpc_service = 1; - // [#not-implemented-hide:] // By default, if the gRPC stream cannot be established, or if it is closed // prematurely with an error, the filter will fail. Specifically, if the // response headers have not yet been delivered, then it will return a 500 diff --git a/generated_api_shadow/envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.proto b/generated_api_shadow/envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.proto index 9f5c952a57ea..ad5ab5d507bb 100644 --- a/generated_api_shadow/envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.proto +++ b/generated_api_shadow/envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.proto @@ -23,16 +23,16 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // The External Processing filter allows an external service to act on HTTP traffic in a flexible way. // **Current Implementation Status:** -// At this time, the filter will send a "request_headers" message to the server when the -// filter is invoked from the downstream, and apply any header mutations returned by the -// server. No other part of the protocol is implemented yet. +// At this time, the filter will send the "request_headers" and "response_headers" messages +// to the server when the filter is invoked, and apply any header mutations returned by the +// server, and respond to "immediate_response" messages. No other parts of the protocol are implemented yet. // As designed, the filter supports up to six different processing steps, which are in the // process of being implemented: // * Request headers: IMPLEMENTED // * Request body: NOT IMPLEMENTED // * Request trailers: NOT IMPLEMENTED -// * Response headers: NOT IMPLEMENTED +// * Response headers: IMPLEMENTED // * Response body: NOT IMPLEMENTED // * Response trailers: NOT IMPLEMENTED @@ -78,7 +78,6 @@ message ExternalProcessor { // The filter supports both the "Envoy" and "Google" gRPC clients. config.core.v3.GrpcService grpc_service = 1; - // [#not-implemented-hide:] // By default, if the gRPC stream cannot be established, or if it is closed // prematurely with an error, the filter will fail. Specifically, if the // response headers have not yet been delivered, then it will return a 500 diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 874b35cf0ef4..9b0ce3da4144 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -9,12 +9,14 @@ namespace Extensions { namespace HttpFilters { namespace ExternalProcessing { +using envoy::service::ext_proc::v3alpha::HeadersResponse; using envoy::service::ext_proc::v3alpha::ImmediateResponse; using envoy::service::ext_proc::v3alpha::ProcessingRequest; using envoy::service::ext_proc::v3alpha::ProcessingResponse; using Http::FilterHeadersStatus; using Http::RequestHeaderMap; +using Http::ResponseHeaderMap; static const std::string kErrorPrefix = "ext_proc error"; @@ -48,66 +50,96 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_of return FilterHeadersStatus::StopAllIterationAndWatermark; } +FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_of_stream) { + if (stream_closed_) { + return FilterHeadersStatus::Continue; + } + + response_headers_ = &headers; + ProcessingRequest req; + auto* headers_req = req.mutable_response_headers(); + MutationUtils::buildHttpHeaders(headers, *headers_req->mutable_headers()); + headers_req->set_end_of_stream(end_of_stream); + response_state_ = FilterState::HEADERS; + stream_->send(std::move(req), false); + stats_.stream_msgs_sent_.inc(); + return FilterHeadersStatus::StopAllIterationAndWatermark; +} + void Filter::onReceiveMessage( std::unique_ptr&& r) { auto response = std::move(r); - bool message_valid = false; + bool message_handled = false; ENVOY_LOG(debug, "Received gRPC message. State = {}", request_state_); - // This next section will grow as we support the rest of the protocol - if (request_state_ == FilterState::HEADERS) { - if (response->has_request_headers()) { - ENVOY_LOG(debug, "applying request_headers response"); - message_valid = true; - const auto& headers_response = response->request_headers(); - if (headers_response.has_response()) { - const auto& common_response = headers_response.response(); - if (common_response.has_header_mutation()) { - MutationUtils::applyHeaderMutations(common_response.header_mutation(), *request_headers_); - } - } - } else if (response->has_immediate_response()) { - ENVOY_LOG(debug, "Returning immediate response from processor"); - sendImmediateResponse(response->immediate_response()); - message_valid = true; - } - request_state_ = FilterState::IDLE; - decoder_callbacks_->continueDecoding(); + if (response->has_request_headers()) { + message_handled = handleRequestHeadersResponse(response->request_headers()); + } else if (response->has_response_headers()) { + message_handled = handleResponseHeadersResponse(response->response_headers()); + } else if (response->has_immediate_response()) { + handleImmediateResponse(response->immediate_response()); + message_handled = true; } - if (message_valid) { + if (message_handled) { stats_.stream_msgs_received_.inc(); } else { stats_.spurious_msgs_received_.inc(); // Ignore messages received out of order. However, close the stream to // protect ourselves since the server is not following the protocol. ENVOY_LOG(warn, "Spurious response message received on gRPC stream"); + cleanupState(); closeStream(); } } +bool Filter::handleRequestHeadersResponse(const HeadersResponse& response) { + if (request_state_ == FilterState::HEADERS) { + ENVOY_LOG(debug, "applying request_headers response"); + MutationUtils::applyCommonHeaderResponse(response, *request_headers_); + request_state_ = FilterState::IDLE; + decoder_callbacks_->continueDecoding(); + return true; + } + return false; +} + +bool Filter::handleResponseHeadersResponse(const HeadersResponse& response) { + if (response_state_ == FilterState::HEADERS) { + ENVOY_LOG(debug, "applying response_headers response"); + MutationUtils::applyCommonHeaderResponse(response, *response_headers_); + response_state_ = FilterState::IDLE; + encoder_callbacks_->continueEncoding(); + return true; + } + return false; +} + +void Filter::handleImmediateResponse(const ImmediateResponse& response) { + // We don't want to process any more stream messages after this. + // Close the stream before sending because "sendLocalResponse" triggers + // additional calls to this filter. + request_state_ = FilterState::IDLE; + response_state_ = FilterState::IDLE; + closeStream(); + sendImmediateResponse(response); +} + void Filter::onGrpcError(Grpc::Status::GrpcStatus status) { ENVOY_LOG(debug, "Received gRPC error on stream: {}", status); - stream_closed_ = true; stats_.streams_failed_.inc(); + if (config_->failureModeAllow()) { // Ignore this and treat as a successful close onGrpcClose(); stats_.failure_mode_allowed_.inc(); + } else { - // Use a switch here now because there will be more than two - // cases very soon. - switch (request_state_) { - case FilterState::HEADERS: - request_state_ = FilterState::IDLE; - decoder_callbacks_->sendLocalReply( - Http::Code::InternalServerError, "", nullptr, absl::nullopt, - absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status)); - break; - default: - // Nothing else to do - break; - } + stream_closed_ = true; + ImmediateResponse errorResponse; + errorResponse.mutable_status()->set_code(envoy::type::v3::StatusCode::InternalServerError); + errorResponse.set_details(absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status)); + handleImmediateResponse(errorResponse); } } @@ -117,16 +149,17 @@ void Filter::onGrpcClose() { stats_.streams_closed_.inc(); // Successful close. We can ignore the stream for the rest of our request // and response processing. - // Use a switch here now because there will be more than two - // cases very soon. - switch (request_state_) { - case FilterState::HEADERS: + cleanupState(); +} + +void Filter::cleanupState() { + if (request_state_ != FilterState::IDLE) { request_state_ = FilterState::IDLE; decoder_callbacks_->continueDecoding(); - break; - default: - // Nothing to do otherwise - break; + } + if (response_state_ != FilterState::IDLE) { + response_state_ = FilterState::IDLE; + encoder_callbacks_->continueEncoding(); } } @@ -136,15 +169,14 @@ void Filter::sendImmediateResponse(const ImmediateResponse& response) { response.has_grpc_status() ? absl::optional(response.grpc_status().status()) : absl::nullopt; + const auto mutate_headers = [&response](Http::ResponseHeaderMap& headers) { + if (response.has_headers()) { + MutationUtils::applyHeaderMutations(response.headers(), headers); + } + }; - decoder_callbacks_->sendLocalReply( - static_cast(status_code), response.body(), - [&response](Http::ResponseHeaderMap& headers) { - if (response.has_headers()) { - MutationUtils::applyHeaderMutations(response.headers(), headers); - } - }, - grpc_status, response.details()); + encoder_callbacks_->sendLocalReply(static_cast(status_code), response.body(), + mutate_headers, grpc_status, response.details()); } } // namespace ExternalProcessing diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 299780f3e05a..b8c8860d00a8 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -83,12 +83,10 @@ class Filter : public Logger::Loggable, void onDestroy() override; - void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override { - decoder_callbacks_ = &callbacks; - } - Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) override; + Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers, + bool end_stream) override; // ExternalProcessorCallbacks @@ -101,23 +99,33 @@ class Filter : public Logger::Loggable, private: void closeStream(); + void cleanupState(); void sendImmediateResponse(const envoy::service::ext_proc::v3alpha::ImmediateResponse& response); + bool + handleRequestHeadersResponse(const envoy::service::ext_proc::v3alpha::HeadersResponse& response); + bool + handleResponseHeadersResponse(const envoy::service::ext_proc::v3alpha::HeadersResponse& response); + void + handleImmediateResponse(const envoy::service::ext_proc::v3alpha::ImmediateResponse& response); + const FilterConfigSharedPtr config_; const ExternalProcessorClientPtr client_; ExtProcFilterStats stats_; - Http::StreamDecoderFilterCallbacks* decoder_callbacks_ = nullptr; - // The state of the request-processing, or "decoding" side of the filter. // We maintain separate states for encoding and decoding since they may // be interleaved. FilterState request_state_ = FilterState::IDLE; + // The state of the response-processing side + FilterState response_state_ = FilterState::IDLE; + ExternalProcessorStreamPtr stream_; bool stream_closed_ = false; Http::HeaderMap* request_headers_ = nullptr; + Http::HeaderMap* response_headers_ = nullptr; }; } // namespace ExternalProcessing diff --git a/source/extensions/filters/http/ext_proc/mutation_utils.cc b/source/extensions/filters/http/ext_proc/mutation_utils.cc index 4f31b45a4751..84df1e6a40d7 100644 --- a/source/extensions/filters/http/ext_proc/mutation_utils.cc +++ b/source/extensions/filters/http/ext_proc/mutation_utils.cc @@ -14,6 +14,9 @@ namespace ExternalProcessing { using Http::Headers; using Http::LowerCaseString; +using envoy::service::ext_proc::v3alpha::HeaderMutation; +using envoy::service::ext_proc::v3alpha::HeadersResponse; + void MutationUtils::buildHttpHeaders(const Http::HeaderMap& headers_in, envoy::config::core::v3::HeaderMap& headers_out) { headers_in.iterate([&headers_out](const Http::HeaderEntry& e) -> Http::HeaderMap::Iterate { @@ -24,8 +27,17 @@ void MutationUtils::buildHttpHeaders(const Http::HeaderMap& headers_in, }); } -void MutationUtils::applyHeaderMutations( - const envoy::service::ext_proc::v3alpha::HeaderMutation& mutation, Http::HeaderMap& headers) { +void MutationUtils::applyCommonHeaderResponse(const HeadersResponse& response, + Http::HeaderMap& headers) { + if (response.has_response()) { + const auto& common_response = response.response(); + if (common_response.has_header_mutation()) { + applyHeaderMutations(common_response.header_mutation(), headers); + } + } +} + +void MutationUtils::applyHeaderMutations(const HeaderMutation& mutation, Http::HeaderMap& headers) { for (const auto& remove_header : mutation.remove_headers()) { if (Http::HeaderUtility::isRemovableHeader(remove_header)) { headers.remove(LowerCaseString(remove_header)); diff --git a/source/extensions/filters/http/ext_proc/mutation_utils.h b/source/extensions/filters/http/ext_proc/mutation_utils.h index cad4929a3d61..1fc888e77d8c 100644 --- a/source/extensions/filters/http/ext_proc/mutation_utils.h +++ b/source/extensions/filters/http/ext_proc/mutation_utils.h @@ -14,6 +14,11 @@ class MutationUtils { static void buildHttpHeaders(const Http::HeaderMap& headers_in, envoy::config::core::v3::HeaderMap& headers_out); + // Apply mutations that are common to header responses. + static void + applyCommonHeaderResponse(const envoy::service::ext_proc::v3alpha::HeadersResponse& response, + Http::HeaderMap& headers); + // Modify header map based on a set of mutations from a protobuf static void applyHeaderMutations(const envoy::service::ext_proc::v3alpha::HeaderMutation& mutation, diff --git a/test/extensions/filters/http/ext_proc/BUILD b/test/extensions/filters/http/ext_proc/BUILD index 8838adb851cf..395d0e6020f9 100644 --- a/test/extensions/filters/http/ext_proc/BUILD +++ b/test/extensions/filters/http/ext_proc/BUILD @@ -37,6 +37,19 @@ envoy_extension_cc_test( ], ) +envoy_extension_cc_test( + name = "ordering_test", + srcs = ["ordering_test.cc"], + extension_name = "envoy.filters.http.ext_proc", + deps = [ + ":mock_server_lib", + "//source/extensions/filters/http/ext_proc", + "//test/common/http:common_lib", + "//test/mocks/server:factory_context_mocks", + "//test/test_common:test_runtime_lib", + ], +) + envoy_extension_cc_test( name = "client_test", srcs = ["client_test.cc"], @@ -95,6 +108,7 @@ envoy_extension_cc_test_library( deps = [ "//include/envoy/http:header_map_interface", "//test/test_common:utility_lib", + "@com_google_absl//absl/strings:str_format", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", ], ) diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index f0d8604645bf..1e91e1762d1b 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -15,7 +15,9 @@ namespace Envoy { using envoy::service::ext_proc::v3alpha::ProcessingRequest; using envoy::service::ext_proc::v3alpha::ProcessingResponse; -using Extensions::HttpFilters::ExternalProcessing::ExtProcTestUtility; +using Extensions::HttpFilters::ExternalProcessing::HasNoHeader; +using Extensions::HttpFilters::ExternalProcessing::HeaderProtosEqual; +using Extensions::HttpFilters::ExternalProcessing::SingleHeaderValueIs; using Http::LowerCaseString; @@ -66,6 +68,32 @@ class ExtProcIntegrationTest : public HttpIntegrationTest, setDownstreamProtocol(Http::CodecClient::Type::HTTP2); } + IntegrationStreamDecoderPtr + sendDownstreamRequest(std::function modify_headers) { + auto conn = makeClientConnection(lookupPort("http")); + codec_client_ = makeHttpConnection(std::move(conn)); + Http::TestRequestHeaderMapImpl headers; + if (modify_headers != nullptr) { + modify_headers(headers); + } + HttpTestUtility::addDefaultHeaders(headers); + return codec_client_->makeHeaderOnlyRequest(headers); + } + + void verifyDownstreamResponse(IntegrationStreamDecoder& response, int status_code) { + response.waitForEndStream(); + EXPECT_TRUE(response.complete()); + EXPECT_EQ(std::to_string(status_code), response.headers().getStatusValue()); + } + + void handleUpstreamRequest() { + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(100, true); + } + void waitForFirstMessage(ProcessingRequest& request) { ASSERT_TRUE(fake_upstreams_.back()->waitForHttpConnection(*dispatcher_, processor_connection_)); ASSERT_TRUE(processor_connection_->waitForNewStream(*dispatcher_, processor_stream_)); @@ -86,12 +114,7 @@ INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, ExtProcIntegrationTest, TEST_P(ExtProcIntegrationTest, GetAndCloseStream) { initializeConfig(); HttpIntegrationTest::initialize(); - - auto conn = makeClientConnection(lookupPort("http")); - codec_client_ = makeHttpConnection(std::move(conn)); - Http::TestRequestHeaderMapImpl headers; - HttpTestUtility::addDefaultHeaders(headers); - auto response = codec_client_->makeHeaderOnlyRequest(headers); + auto response = sendDownstreamRequest(nullptr); ProcessingRequest request_headers_msg; waitForFirstMessage(request_headers_msg); @@ -99,21 +122,8 @@ TEST_P(ExtProcIntegrationTest, GetAndCloseStream) { processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); processor_stream_->encodeTrailers(Http::TestResponseTrailerMapImpl{{"grpc-status", "0"}}); - // Now expect a message to the real upstream - ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); - ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); - ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); - - // Respond from the upstream with a simple 200 - upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); - upstream_request_->encodeData(100, true); - - // Now expect a response to the original request - response->waitForEndStream(); - - EXPECT_TRUE(upstream_request_->complete()); - EXPECT_TRUE(response->complete()); - EXPECT_EQ("200", response->headers().getStatusValue()); + handleUpstreamRequest(); + verifyDownstreamResponse(*response, 200); } // Test the filter using the default configuration by connecting to @@ -122,21 +132,103 @@ TEST_P(ExtProcIntegrationTest, GetAndCloseStream) { TEST_P(ExtProcIntegrationTest, GetAndFailStream) { initializeConfig(); HttpIntegrationTest::initialize(); - - auto conn = makeClientConnection(lookupPort("http")); - codec_client_ = makeHttpConnection(std::move(conn)); - Http::TestRequestHeaderMapImpl headers; - HttpTestUtility::addDefaultHeaders(headers); - auto response = codec_client_->makeHeaderOnlyRequest(headers); + auto response = sendDownstreamRequest(nullptr); ProcessingRequest request_headers_msg; waitForFirstMessage(request_headers_msg); // Fail the stream immediately processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "500"}}, true); + verifyDownstreamResponse(*response, 500); +} + +// Test the filter using the default configuration by connecting to +// an ext_proc server that responds to the request_headers message +// successfully, but then sends a gRPC error. +TEST_P(ExtProcIntegrationTest, GetAndFailStreamOutOfLine) { + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(nullptr); + + ProcessingRequest request_headers_msg; + waitForFirstMessage(request_headers_msg); + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + ProcessingResponse resp1; + resp1.mutable_request_headers(); + processor_stream_->sendGrpcMessage(resp1); + + // Fail the stream in between messages + processor_stream_->encodeTrailers(Http::TestResponseTrailerMapImpl{{"grpc-status", "13"}}); + + verifyDownstreamResponse(*response, 500); +} + +// Test the filter using the default configuration by connecting to +// an ext_proc server that responds to the request_headers message +// successfully, but then sends a gRPC error. +TEST_P(ExtProcIntegrationTest, GetAndFailStreamOutOfLineLater) { + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(nullptr); + + ProcessingRequest request_headers_msg; + waitForFirstMessage(request_headers_msg); + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + ProcessingResponse resp1; + resp1.mutable_request_headers(); + processor_stream_->sendGrpcMessage(resp1); - response->waitForEndStream(); - EXPECT_TRUE(response->complete()); - EXPECT_EQ("500", response->headers().getStatusValue()); + // Fail the stream in between messages + processor_stream_->encodeTrailers(Http::TestResponseTrailerMapImpl{{"grpc-status", "13"}}); + + verifyDownstreamResponse(*response, 500); +} + +// Test the filter using the default configuration by connecting to +// an ext_proc server that responds to the request_headers message +// successfully but closes the stream after response_headers. +TEST_P(ExtProcIntegrationTest, GetAndCloseStreamOnResponse) { + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(nullptr); + + ProcessingRequest request_headers_msg; + waitForFirstMessage(request_headers_msg); + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + ProcessingResponse resp1; + resp1.mutable_request_headers(); + processor_stream_->sendGrpcMessage(resp1); + + handleUpstreamRequest(); + + ProcessingRequest response_headers_msg; + ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, response_headers_msg)); + processor_stream_->encodeTrailers(Http::TestResponseTrailerMapImpl{{"grpc-status", "0"}}); + + verifyDownstreamResponse(*response, 200); +} + +// Test the filter using the default configuration by connecting to +// an ext_proc server that responds to the request_headers message +// successfully but then fails on the response_headers message. +TEST_P(ExtProcIntegrationTest, GetAndFailStreamOnResponse) { + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(nullptr); + + ProcessingRequest request_headers_msg; + waitForFirstMessage(request_headers_msg); + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + ProcessingResponse resp1; + resp1.mutable_request_headers(); + processor_stream_->sendGrpcMessage(resp1); + + handleUpstreamRequest(); + + ProcessingRequest response_headers_msg; + ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, response_headers_msg)); + processor_stream_->encodeTrailers(Http::TestResponseTrailerMapImpl{{"grpc-status", "13"}}); + + verifyDownstreamResponse(*response, 500); } // Test the filter using the default configuration by connecting to @@ -145,34 +237,27 @@ TEST_P(ExtProcIntegrationTest, GetAndFailStream) { TEST_P(ExtProcIntegrationTest, GetAndSetHeaders) { initializeConfig(); HttpIntegrationTest::initialize(); - - auto conn = makeClientConnection(lookupPort("http")); - codec_client_ = makeHttpConnection(std::move(conn)); - Http::TestRequestHeaderMapImpl headers; - HttpTestUtility::addDefaultHeaders(headers); - headers.addCopy(LowerCaseString("x-remove-this"), "yes"); - auto response = codec_client_->makeHeaderOnlyRequest(headers); + auto response = sendDownstreamRequest( + [](Http::HeaderMap& headers) { headers.addCopy(LowerCaseString("x-remove-this"), "yes"); }); ProcessingRequest request_headers_msg; waitForFirstMessage(request_headers_msg); - EXPECT_TRUE(request_headers_msg.has_request_headers()); + ASSERT_TRUE(request_headers_msg.has_request_headers()); const auto request_headers = request_headers_msg.request_headers(); Http::TestRequestHeaderMapImpl expected_request_headers{{":scheme", "http"}, {":method", "GET"}, {"host", "host"}, {":path", "/"}, {"x-remove-this", "yes"}}; - EXPECT_TRUE(ExtProcTestUtility::headerProtosEqualIgnoreOrder(expected_request_headers, - request_headers.headers())); + EXPECT_THAT(request_headers.headers(), HeaderProtosEqual(expected_request_headers)); processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); // Ask to change the headers ProcessingResponse response_msg; - auto response_headers_msg = response_msg.mutable_request_headers(); auto response_header_mutation = - response_headers_msg->mutable_response()->mutable_header_mutation(); + response_msg.mutable_request_headers()->mutable_response()->mutable_header_mutation(); auto mut1 = response_header_mutation->add_set_headers(); mut1->mutable_header()->set_key("x-new-header"); mut1->mutable_header()->set_value("new"); @@ -183,20 +268,60 @@ TEST_P(ExtProcIntegrationTest, GetAndSetHeaders) { ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); - auto has_hdr1 = upstream_request_->headers().get(LowerCaseString("x-remove-this")); - EXPECT_TRUE(has_hdr1.empty()); - auto has_hdr2 = upstream_request_->headers().get(LowerCaseString("x-new-header")); - EXPECT_EQ(has_hdr2.size(), 1); - EXPECT_EQ(has_hdr2[0]->value(), "new"); + EXPECT_THAT(upstream_request_->headers(), HasNoHeader("x-remove-this")); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new")); upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); upstream_request_->encodeData(100, true); - response->waitForEndStream(); + // Now expect a message for the response path + ProcessingRequest response_headers_msg; + ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, response_headers_msg)); + ASSERT_TRUE(response_headers_msg.has_response_headers()); + const auto response_headers = response_headers_msg.response_headers(); + Http::TestRequestHeaderMapImpl expected_response_headers{{":status", "200"}}; + EXPECT_THAT(response_headers.headers(), HeaderProtosEqual(expected_response_headers)); - EXPECT_TRUE(upstream_request_->complete()); - EXPECT_TRUE(response->complete()); - EXPECT_EQ("200", response->headers().getStatusValue()); + // Send back a response but don't do anything + ProcessingResponse response_2; + response_2.mutable_response_headers(); + processor_stream_->sendGrpcMessage(response_2); + + verifyDownstreamResponse(*response, 200); +} + +// Test the filter using the default configuration by connecting to +// an ext_proc server that responds to the response_headers message +// by requesting to modify the request headers. +TEST_P(ExtProcIntegrationTest, GetAndSetHeadersOnResponse) { + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(nullptr); + + ProcessingRequest request_headers_msg; + waitForFirstMessage(request_headers_msg); + + ASSERT_TRUE(request_headers_msg.has_request_headers()); + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + ProcessingResponse resp1; + resp1.mutable_request_headers(); + processor_stream_->sendGrpcMessage(resp1); + + handleUpstreamRequest(); + + ProcessingRequest response_headers_msg; + ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, response_headers_msg)); + ASSERT_TRUE(response_headers_msg.has_response_headers()); + ProcessingResponse resp2; + auto* headers2 = resp2.mutable_response_headers(); + auto* response_mutation = headers2->mutable_response()->mutable_header_mutation(); + auto* add1 = response_mutation->add_set_headers(); + add1->mutable_header()->set_key("x-response-processed"); + add1->mutable_header()->set_value("1"); + processor_stream_->sendGrpcMessage(resp2); + + verifyDownstreamResponse(*response, 200); + EXPECT_THAT(response->headers(), SingleHeaderValueIs("x-response-processed", "1")); } // Test the filter using the default configuration by connecting to @@ -204,14 +329,11 @@ TEST_P(ExtProcIntegrationTest, GetAndSetHeaders) { // by sending back an immediate_response message, which should be // returned directly to the downstream. TEST_P(ExtProcIntegrationTest, GetAndRespondImmediately) { + // Logger::Registry::getLog(Logger::Id::filter).set_level(spdlog::level::trace); + initializeConfig(); HttpIntegrationTest::initialize(); - - auto conn = makeClientConnection(lookupPort("http")); - codec_client_ = makeHttpConnection(std::move(conn)); - Http::TestRequestHeaderMapImpl headers; - HttpTestUtility::addDefaultHeaders(headers); - auto response = codec_client_->makeHeaderOnlyRequest(headers); + auto response = sendDownstreamRequest(nullptr); ProcessingRequest request_headers_msg; waitForFirstMessage(request_headers_msg); @@ -233,13 +355,48 @@ TEST_P(ExtProcIntegrationTest, GetAndRespondImmediately) { hdr2->mutable_header()->set_value("application/json"); processor_stream_->sendGrpcMessage(response_msg); - response->waitForEndStream(); - EXPECT_TRUE(response->complete()); - EXPECT_EQ("401", response->headers().getStatusValue()); - EXPECT_EQ( - "testing", - response->headers().get(LowerCaseString("x-failure-reason"))[0]->value().getStringView()); - EXPECT_EQ("application/json", response->headers().ContentType()->value().getStringView()); + verifyDownstreamResponse(*response, 401); + EXPECT_THAT(response->headers(), SingleHeaderValueIs("x-failure-reason", "testing")); + EXPECT_THAT(response->headers(), SingleHeaderValueIs("content-type", "application/json")); + EXPECT_EQ("{\"reason\": \"Not authorized\"}", response->body()); +} + +// Test the filter using the default configuration by connecting to +// an ext_proc server that responds to the request_headers message +// by sending back an immediate_response message after the +// request_headers message +TEST_P(ExtProcIntegrationTest, GetAndRespondImmediatelyOnResponse) { + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(nullptr); + + // request_headers message to processor + ProcessingRequest request_headers_msg; + waitForFirstMessage(request_headers_msg); + EXPECT_TRUE(request_headers_msg.has_request_headers()); + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + + // Response to request_headers + ProcessingResponse resp1; + resp1.mutable_request_headers(); + processor_stream_->sendGrpcMessage(resp1); + + handleUpstreamRequest(); + + // response_headers message to processor + ProcessingRequest response_headers_msg; + ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, response_headers_msg)); + ASSERT_TRUE(response_headers_msg.has_response_headers()); + + // Response to response_headers + ProcessingResponse resp2; + auto* immediate_response = resp2.mutable_immediate_response(); + immediate_response->mutable_status()->set_code(envoy::type::v3::StatusCode::Unauthorized); + immediate_response->set_body("{\"reason\": \"Not authorized\"}"); + immediate_response->set_details("Failed because you are not authorized"); + processor_stream_->sendGrpcMessage(resp2); + + verifyDownstreamResponse(*response, 401); EXPECT_EQ("{\"reason\": \"Not authorized\"}", response->body()); } diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index bbd43d1f240d..0a08ff30e7ec 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -67,7 +67,9 @@ class HttpFilterTest : public testing::Test { void doSend(ProcessingRequest&& request, bool end_stream) { ASSERT_FALSE(stream_close_sent_); + ASSERT_TRUE(last_request_processed_); last_request_ = std::move(request); + last_request_processed_ = false; if (end_stream) { stream_close_sent_ = true; } @@ -81,6 +83,7 @@ class HttpFilterTest : public testing::Test { std::unique_ptr client_; ExternalProcessorCallbacks* stream_callbacks_ = nullptr; ProcessingRequest last_request_; + bool last_request_processed_ = true; bool stream_close_sent_ = false; std::chrono::milliseconds stream_timeout_; NiceMock stats_store_; @@ -130,8 +133,8 @@ TEST_F(HttpFilterTest, SimplestPost) { {"content-type", "text/plain"}, {"content-length", "10"}, {"x-some-other-header", "yes"}}; - EXPECT_TRUE( - ExtProcTestUtility::headerProtosEqualIgnoreOrder(expected, request_headers.headers())); + EXPECT_THAT(request_headers.headers(), HeaderProtosEqual(expected)); + last_request_processed_ = true; // Send back a response EXPECT_CALL(decoder_callbacks_, continueDecoding()); @@ -143,8 +146,32 @@ TEST_F(HttpFilterTest, SimplestPost) { EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + response_headers_.addCopy(LowerCaseString("content-length"), "3"); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encode100ContinueHeaders(response_headers_)); - EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true)); + EXPECT_EQ(FilterHeadersStatus::StopAllIterationAndWatermark, + filter_->encodeHeaders(response_headers_, false)); + + // Expect another stream message + EXPECT_FALSE(last_request_.async_mode()); + EXPECT_FALSE(stream_close_sent_); + ASSERT_TRUE(last_request_.has_response_headers()); + const auto response_headers = last_request_.response_headers(); + EXPECT_FALSE(response_headers.end_of_stream()); + + Http::TestRequestHeaderMapImpl expected_response{ + {":status", "200"}, {"content-type", "text/plain"}, {"content-length", "3"}}; + EXPECT_THAT(response_headers.headers(), HeaderProtosEqual(expected_response)); + last_request_processed_ = true; + + // Send back a response + EXPECT_CALL(encoder_callbacks_, continueEncoding()); + std::unique_ptr resp2 = std::make_unique(); + resp2->mutable_response_headers(); + stream_callbacks_->onReceiveMessage(std::move(resp2)); + data_.add("bar"); EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, false)); EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, true)); @@ -153,8 +180,8 @@ TEST_F(HttpFilterTest, SimplestPost) { EXPECT_TRUE(stream_close_sent_); EXPECT_EQ(1, config_->stats().streams_started_.value()); - EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value()); - EXPECT_EQ(1, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_received_.value()); EXPECT_EQ(1, config_->stats().streams_closed_.value()); } @@ -202,14 +229,51 @@ TEST_F(HttpFilterTest, PostAndChangeHeaders) { {"x-new-header", "new"}, {"x-some-other-header", "yes"}, {"x-some-other-header", "no"}}; - EXPECT_TRUE(TestUtility::headerMapEqualIgnoreOrder(expected, request_headers_)); + EXPECT_THAT(&request_headers_, HeaderMapEqualIgnoreOrder(&expected)); + last_request_processed_ = true; data_.add("foo"); EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + response_headers_.addCopy(LowerCaseString("content-length"), "3"); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encode100ContinueHeaders(response_headers_)); - EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true)); + EXPECT_EQ(FilterHeadersStatus::StopAllIterationAndWatermark, + filter_->encodeHeaders(response_headers_, false)); + + // Expect another stream message + EXPECT_FALSE(last_request_.async_mode()); + EXPECT_FALSE(stream_close_sent_); + ASSERT_TRUE(last_request_.has_response_headers()); + const auto response_headers = last_request_.response_headers(); + EXPECT_FALSE(response_headers.end_of_stream()); + + Http::TestRequestHeaderMapImpl expected_response{ + {":status", "200"}, {"content-type", "text/plain"}, {"content-length", "3"}}; + EXPECT_TRUE(ExtProcTestUtility::headerProtosEqualIgnoreOrder(expected_response, + response_headers.headers())); + last_request_processed_ = true; + + // Send back a response + EXPECT_CALL(encoder_callbacks_, continueEncoding()); + std::unique_ptr resp2 = std::make_unique(); + auto resp_headers = resp2->mutable_response_headers(); + auto resp_headers_mut = resp_headers->mutable_response()->mutable_header_mutation(); + auto resp_add1 = resp_headers_mut->add_set_headers(); + resp_add1->mutable_header()->set_key("x-new-header"); + resp_add1->mutable_header()->set_value("new"); + stream_callbacks_->onReceiveMessage(std::move(resp2)); + + // We should now have changed the original header a bit + Http::TestRequestHeaderMapImpl final_expected_response{{":status", "200"}, + {"content-type", "text/plain"}, + {"content-length", "3"}, + {"x-new-header", "new"}}; + EXPECT_THAT(&response_headers_, HeaderMapEqualIgnoreOrder(&final_expected_response)); + data_.add("bar"); EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, false)); EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, true)); @@ -218,8 +282,8 @@ TEST_F(HttpFilterTest, PostAndChangeHeaders) { EXPECT_TRUE(stream_close_sent_); EXPECT_EQ(1, config_->stats().streams_started_.value()); - EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value()); - EXPECT_EQ(1, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_received_.value()); EXPECT_EQ(1, config_->stats().streams_closed_.value()); } @@ -240,8 +304,7 @@ TEST_F(HttpFilterTest, PostAndRespondImmediately) { filter_->decodeHeaders(request_headers_, false)); Http::TestResponseHeaderMapImpl immediate_response_headers; - EXPECT_CALL(decoder_callbacks_, continueDecoding()); - EXPECT_CALL(decoder_callbacks_, sendLocalReply(Http::Code::BadRequest, "Bad request", _, + EXPECT_CALL(encoder_callbacks_, sendLocalReply(Http::Code::BadRequest, "Bad request", _, Eq(absl::nullopt), "Got a bad request")) .WillOnce(Invoke([&immediate_response_headers]( Unused, Unused, @@ -265,12 +328,10 @@ TEST_F(HttpFilterTest, PostAndRespondImmediately) { hdr3->mutable_header()->set_key("x-another-thing"); hdr3->mutable_header()->set_value("2"); stream_callbacks_->onReceiveMessage(std::move(resp1)); - EXPECT_FALSE(stream_close_sent_); Http::TestResponseHeaderMapImpl expected_response_headers{ {"content-type", "text/plain"}, {"x-another-thing", "1"}, {"x-another-thing", "2"}}; - EXPECT_TRUE(TestUtility::headerMapEqualIgnoreOrder(expected_response_headers, - immediate_response_headers)); + EXPECT_THAT(&immediate_response_headers, HeaderMapEqualIgnoreOrder(&expected_response_headers)); data_.add("foo"); EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); @@ -291,6 +352,73 @@ TEST_F(HttpFilterTest, PostAndRespondImmediately) { EXPECT_EQ(1, config_->stats().streams_closed_.value()); } +// Using the default configuration, test the filter with a processor that +// replies to the request_headers message with an "immediate response" message +// during response headers processing that should result in a response being +// directly sent downstream with custom headers. +TEST_F(HttpFilterTest, PostAndRespondImmediatelyOnResponse) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + )EOF"); + + HttpTestUtility::addDefaultHeaders(request_headers_, "POST"); + + EXPECT_EQ(FilterHeadersStatus::StopAllIterationAndWatermark, + filter_->decodeHeaders(request_headers_, false)); + + EXPECT_FALSE(last_request_.async_mode()); + EXPECT_FALSE(stream_close_sent_); + ASSERT_TRUE(last_request_.has_request_headers()); + last_request_processed_ = true; + + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + std::unique_ptr resp1 = std::make_unique(); + resp1->mutable_request_headers(); + stream_callbacks_->onReceiveMessage(std::move(resp1)); + + data_.add("foo"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encode100ContinueHeaders(response_headers_)); + EXPECT_EQ(FilterHeadersStatus::StopAllIterationAndWatermark, + filter_->encodeHeaders(response_headers_, false)); + + EXPECT_FALSE(last_request_.async_mode()); + EXPECT_FALSE(stream_close_sent_); + ASSERT_TRUE(last_request_.has_response_headers()); + last_request_processed_ = true; + + Http::TestResponseHeaderMapImpl immediate_response_headers; + EXPECT_CALL(encoder_callbacks_, sendLocalReply(Http::Code::BadRequest, "Bad request", _, + Eq(absl::nullopt), "Got a bad request")) + .WillOnce(Invoke([&immediate_response_headers]( + Unused, Unused, + std::function modify_headers, + Unused, Unused) { modify_headers(immediate_response_headers); })); + std::unique_ptr resp2 = std::make_unique(); + auto* immediate_response = resp2->mutable_immediate_response(); + immediate_response->mutable_status()->set_code(envoy::type::v3::StatusCode::BadRequest); + immediate_response->set_body("Bad request"); + immediate_response->set_details("Got a bad request"); + stream_callbacks_->onReceiveMessage(std::move(resp2)); + EXPECT_TRUE(immediate_response_headers.empty()); + + data_.add("bar"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, false)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + filter_->onDestroy(); + EXPECT_TRUE(stream_close_sent_); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); +} + // Using the default configuration, test the filter with a processor that // replies to the request_headers message with an empty immediate_response message TEST_F(HttpFilterTest, RespondImmediatelyDefault) { @@ -306,20 +434,15 @@ TEST_F(HttpFilterTest, RespondImmediatelyDefault) { filter_->decodeHeaders(request_headers_, false)); Http::TestResponseHeaderMapImpl immediate_response_headers; - EXPECT_CALL(decoder_callbacks_, continueDecoding()); - EXPECT_CALL(decoder_callbacks_, sendLocalReply(Http::Code::OK, "", _, Eq(absl::nullopt), "")) + EXPECT_CALL(encoder_callbacks_, sendLocalReply(Http::Code::OK, "", _, Eq(absl::nullopt), "")) .WillOnce(Invoke([&immediate_response_headers]( Unused, Unused, std::function modify_headers, Unused, Unused) { modify_headers(immediate_response_headers); })); std::unique_ptr resp1 = std::make_unique(); - /*auto* immediate_response = */ resp1->mutable_immediate_response(); + resp1->mutable_immediate_response(); stream_callbacks_->onReceiveMessage(std::move(resp1)); - EXPECT_FALSE(stream_close_sent_); - - Http::TestResponseHeaderMapImpl expected_response_headers{}; - EXPECT_TRUE(TestUtility::headerMapEqualIgnoreOrder(expected_response_headers, - immediate_response_headers)); + EXPECT_TRUE(immediate_response_headers.empty()); data_.add("foo"); EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); @@ -351,8 +474,7 @@ TEST_F(HttpFilterTest, RespondImmediatelyGrpcError) { filter_->decodeHeaders(request_headers_, false)); Http::TestResponseHeaderMapImpl immediate_response_headers; - EXPECT_CALL(decoder_callbacks_, continueDecoding()); - EXPECT_CALL(decoder_callbacks_, sendLocalReply(Http::Code::Forbidden, "", _, Eq(999), "")) + EXPECT_CALL(encoder_callbacks_, sendLocalReply(Http::Code::Forbidden, "", _, Eq(999), "")) .WillOnce(Invoke([&immediate_response_headers]( Unused, Unused, std::function modify_headers, @@ -362,11 +484,7 @@ TEST_F(HttpFilterTest, RespondImmediatelyGrpcError) { immediate_response->mutable_status()->set_code(envoy::type::v3::StatusCode::Forbidden); immediate_response->mutable_grpc_status()->set_status(999); stream_callbacks_->onReceiveMessage(std::move(resp1)); - EXPECT_FALSE(stream_close_sent_); - - Http::TestResponseHeaderMapImpl expected_response_headers{}; - EXPECT_TRUE(TestUtility::headerMapEqualIgnoreOrder(expected_response_headers, - immediate_response_headers)); + EXPECT_TRUE(immediate_response_headers.empty()); data_.add("foo"); EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); @@ -400,11 +518,14 @@ TEST_F(HttpFilterTest, PostAndFail) { EXPECT_FALSE(stream_close_sent_); // Oh no! The remote server had a failure! - EXPECT_CALL(decoder_callbacks_, - sendLocalReply(Http::Code::InternalServerError, "", Eq(nullptr), Eq(absl::nullopt), - "ext_proc error: gRPC error 13")); - // In this case, this call includes header encoding - EXPECT_CALL(decoder_callbacks_, encodeHeaders_(_, _)); + Http::TestResponseHeaderMapImpl immediate_response_headers; + EXPECT_CALL(encoder_callbacks_, + sendLocalReply(Http::Code::InternalServerError, "", _, Eq(absl::nullopt), + "ext_proc error: gRPC error 13")) + .WillOnce(Invoke([&immediate_response_headers]( + Unused, Unused, + std::function modify_headers, + Unused, Unused) { modify_headers(immediate_response_headers); })); stream_callbacks_->onGrpcError(Grpc::Status::Internal); data_.add("foo"); @@ -420,12 +541,73 @@ TEST_F(HttpFilterTest, PostAndFail) { filter_->onDestroy(); // The other side closed the stream EXPECT_FALSE(stream_close_sent_); + EXPECT_TRUE(immediate_response_headers.empty()); EXPECT_EQ(1, config_->stats().streams_started_.value()); EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value()); EXPECT_EQ(1, config_->stats().streams_failed_.value()); } +// Using the default configuration, test the filter with a processor that +// returns an error from from the gRPC stream during response header processing. +TEST_F(HttpFilterTest, PostAndFailOnResponse) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + )EOF"); + + EXPECT_FALSE(config_->failureModeAllow()); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_, "POST"); + EXPECT_EQ(FilterHeadersStatus::StopAllIterationAndWatermark, + filter_->decodeHeaders(request_headers_, false)); + + EXPECT_FALSE(last_request_.async_mode()); + EXPECT_FALSE(stream_close_sent_); + ASSERT_TRUE(last_request_.has_request_headers()); + last_request_processed_ = true; + + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + std::unique_ptr resp1 = std::make_unique(); + resp1->mutable_request_headers(); + stream_callbacks_->onReceiveMessage(std::move(resp1)); + + data_.add("foo"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encode100ContinueHeaders(response_headers_)); + EXPECT_EQ(FilterHeadersStatus::StopAllIterationAndWatermark, + filter_->encodeHeaders(response_headers_, false)); + + // Oh no! The remote server had a failure! + Http::TestResponseHeaderMapImpl immediate_response_headers; + EXPECT_CALL(encoder_callbacks_, + sendLocalReply(Http::Code::InternalServerError, "", _, Eq(absl::nullopt), + "ext_proc error: gRPC error 13")) + .WillOnce(Invoke([&immediate_response_headers]( + Unused, Unused, + std::function modify_headers, + Unused, Unused) { modify_headers(immediate_response_headers); })); + stream_callbacks_->onGrpcError(Grpc::Status::Internal); + + data_.add("bar"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, false)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + filter_->onDestroy(); + // The other side closed the stream + EXPECT_FALSE(stream_close_sent_); + EXPECT_TRUE(immediate_response_headers.empty()); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_failed_.value()); +} + // Using the default configuration, test the filter with a processor that // returns an error from the gRPC stream that is ignored because the // failure_mode_allow parameter is set. diff --git a/test/extensions/filters/http/ext_proc/ordering_test.cc b/test/extensions/filters/http/ext_proc/ordering_test.cc new file mode 100644 index 000000000000..8849905ab20b --- /dev/null +++ b/test/extensions/filters/http/ext_proc/ordering_test.cc @@ -0,0 +1,348 @@ +#include "extensions/filters/http/ext_proc/ext_proc.h" + +#include "test/common/http/common.h" +#include "test/extensions/filters/http/ext_proc/mock_server.h" +#include "test/mocks/http/mocks.h" +#include "test/mocks/network/mocks.h" +#include "test/mocks/router/mocks.h" +#include "test/mocks/runtime/mocks.h" +#include "test/mocks/tracing/mocks.h" +#include "test/mocks/upstream/cluster_manager.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { +namespace { + +using envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor; +using envoy::service::ext_proc::v3alpha::ProcessingRequest; +using envoy::service::ext_proc::v3alpha::ProcessingResponse; + +using Http::FilterDataStatus; +using Http::FilterHeadersStatus; +using Http::FilterTrailersStatus; + +using testing::Invoke; +using testing::Unused; + +using namespace std::chrono_literals; + +// These tests directly drive the filter. They concentrate on testing out all the different +// ordering options for the protocol, which means that unlike other tests they do not verify +// every parameter sent to or from the filter. + +class OrderingTest : public testing::Test { +protected: + void initialize(std::function cb) { + client_ = std::make_unique(); + EXPECT_CALL(*client_, start(_, _)).WillOnce(Invoke(this, &OrderingTest::doStart)); + + ExternalProcessor proto_config; + proto_config.mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("ext_proc_server"); + if (cb != nullptr) { + cb(proto_config); + } + config_.reset(new FilterConfig(proto_config, 200ms, stats_store_, "")); + filter_ = std::make_unique(config_, std::move(client_)); + filter_->setEncoderFilterCallbacks(encoder_callbacks_); + filter_->setDecoderFilterCallbacks(decoder_callbacks_); + } + + void TearDown() override { filter_->onDestroy(); } + + // Called by the "start" method on the stream by the filter + ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, + const std::chrono::milliseconds&) { + stream_callbacks_ = &callbacks; + auto stream = std::make_unique(); + EXPECT_CALL(*stream, send(_, _)).WillRepeatedly(Invoke(this, &OrderingTest::doSend)); + EXPECT_CALL(*stream, close()).WillRepeatedly(Invoke(this, &OrderingTest::doSendClose)); + return stream; + } + + // Called on the stream after it's been created. These delegate + // to "stream_delegate_" so that we can have expectations there. + + void doSend(ProcessingRequest&& request, bool end_stream) { + stream_delegate_.send(std::move(request), end_stream); + } + + void doSendClose() { stream_delegate_.close(); } + + // Send data through the filter as if we are the proxy + + void sendRequestHeadersGet(bool expect_callback) { + HttpTestUtility::addDefaultHeaders(request_headers_, "GET"); + EXPECT_EQ(expect_callback ? FilterHeadersStatus::StopAllIterationAndWatermark + : FilterHeadersStatus::Continue, + filter_->decodeHeaders(request_headers_, true)); + } + + void sendResponseHeaders(bool expect_callback) { + response_headers_.setStatus(200); + EXPECT_EQ(expect_callback ? FilterHeadersStatus::StopAllIterationAndWatermark + : FilterHeadersStatus::Continue, + filter_->encodeHeaders(response_headers_, false)); + } + + void sendRequestBody() { + Buffer::OwnedImpl data; + data.add("Dummy data"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data, true)); + } + + void sendResponseBody() { + Buffer::OwnedImpl data; + data.add("Dummy data"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data, true)); + } + + void sendRequestTrailers() { + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + } + + void sendResponseTrailers() { + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + } + + // Make it easier to send responses from the external processor + + void sendRequestHeadersReply() { + auto reply = std::make_unique(); + reply->mutable_request_headers(); + stream_callbacks_->onReceiveMessage(std::move(reply)); + } + + void sendResponseHeadersReply() { + auto reply = std::make_unique(); + reply->mutable_response_headers(); + stream_callbacks_->onReceiveMessage(std::move(reply)); + } + + void sendImmediateResponse500() { + auto reply = std::make_unique(); + auto* ir = reply->mutable_immediate_response(); + ir->mutable_status()->set_code(envoy::type::v3::StatusCode::InternalServerError); + stream_callbacks_->onReceiveMessage(std::move(reply)); + } + + void sendGrpcError() { stream_callbacks_->onGrpcError(Grpc::Status::Internal); } + + void closeGrpcStream() { stream_callbacks_->onGrpcClose(); } + + std::unique_ptr client_; + MockStream stream_delegate_; + ExternalProcessorCallbacks* stream_callbacks_ = nullptr; + NiceMock stats_store_; + FilterConfigSharedPtr config_; + std::unique_ptr filter_; + Http::MockStreamDecoderFilterCallbacks decoder_callbacks_; + Http::MockStreamEncoderFilterCallbacks encoder_callbacks_; + Http::TestRequestHeaderMapImpl request_headers_; + Http::TestResponseHeaderMapImpl response_headers_; + Http::TestRequestTrailerMapImpl request_trailers_; + Http::TestResponseTrailerMapImpl response_trailers_; +}; + +// *** Tests for the normal processing path *** + +// A normal call with the default configuration +TEST_F(OrderingTest, DefaultOrderingGet) { + initialize(nullptr); + + EXPECT_CALL(stream_delegate_, send(_, false)); + sendRequestHeadersGet(true); + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + sendRequestHeadersReply(); + sendRequestTrailers(); + + EXPECT_CALL(stream_delegate_, send(_, false)); + sendResponseHeaders(true); + EXPECT_CALL(encoder_callbacks_, continueEncoding()); + sendResponseHeadersReply(); + sendResponseBody(); + sendResponseTrailers(); + + EXPECT_CALL(stream_delegate_, close()); +} + +// An immediate response on the request path +TEST_F(OrderingTest, ImmediateResponseOnRequest) { + initialize(nullptr); + + EXPECT_CALL(stream_delegate_, send(_, false)); + sendRequestHeadersGet(true); + EXPECT_CALL(encoder_callbacks_, sendLocalReply(Http::Code::InternalServerError, _, _, _, _)); + EXPECT_CALL(stream_delegate_, close()); + sendImmediateResponse500(); + // The rest of the filter isn't necessarily called after this. +} + +// An immediate response on the response path +TEST_F(OrderingTest, ImmediateResponseOnResponse) { + initialize(nullptr); + + EXPECT_CALL(stream_delegate_, send(_, false)); + sendRequestHeadersGet(true); + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + sendRequestHeadersReply(); + sendRequestTrailers(); + + EXPECT_CALL(stream_delegate_, send(_, false)); + sendResponseHeaders(true); + EXPECT_CALL(encoder_callbacks_, sendLocalReply(Http::Code::InternalServerError, _, _, _, _)); + EXPECT_CALL(stream_delegate_, close()); + sendImmediateResponse500(); + sendResponseBody(); + sendResponseTrailers(); +} + +// *** Tests of out-of-order messages *** +// In general, for these the server closes the stream and ignores the +// processor for the rest of the filter lifetime. + +// Receive a response headers reply in response to the request +// headers message -- should close stream and stop sending, but otherwise +// continue without error. +TEST_F(OrderingTest, IncorrectRequestHeadersReply) { + initialize(nullptr); + + EXPECT_CALL(stream_delegate_, send(_, false)); + sendRequestHeadersGet(true); + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + EXPECT_CALL(stream_delegate_, close()); + sendResponseHeadersReply(); + sendRequestTrailers(); + + // Expect us to go on from here normally but send no more stream messages + sendResponseHeaders(false); + sendResponseBody(); + sendResponseTrailers(); +} + +// Receive a request headers reply in response to the response +// headers message -- should continue without error. +TEST_F(OrderingTest, IncorrectResponseHeadersReply) { + initialize(nullptr); + + EXPECT_CALL(stream_delegate_, send(_, false)); + sendRequestHeadersGet(true); + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + sendRequestHeadersReply(); + sendRequestTrailers(); + + EXPECT_CALL(stream_delegate_, send(_, false)); + sendResponseHeaders(true); + EXPECT_CALL(encoder_callbacks_, continueEncoding()); + EXPECT_CALL(stream_delegate_, close()); + sendRequestHeadersReply(); + // Still should ignore the message and go on but send no more stream messages + sendResponseBody(); + sendResponseTrailers(); +} + +// Receive an extra message -- we should ignore it +// and not send anything else to the server +TEST_F(OrderingTest, ExtraReply) { + initialize(nullptr); + + EXPECT_CALL(stream_delegate_, send(_, false)); + sendRequestHeadersGet(true); + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + sendRequestHeadersReply(); + + // Extra call + EXPECT_CALL(stream_delegate_, close()); + sendRequestHeadersReply(); + + // After this we are ignoring the processor + sendRequestTrailers(); + sendResponseHeaders(false); + sendResponseBody(); + sendResponseTrailers(); +} + +// Receive an extra message after the immediate response -- it should +// be ignored. +TEST_F(OrderingTest, ExtraAfterImmediateResponse) { + initialize(nullptr); + + EXPECT_CALL(stream_delegate_, send(_, false)); + sendRequestHeadersGet(true); + EXPECT_CALL(encoder_callbacks_, sendLocalReply(Http::Code::InternalServerError, _, _, _, _)); + EXPECT_CALL(stream_delegate_, close()); + sendImmediateResponse500(); + // Extra messages sent after immediate response shouldn't affect anything + sendRequestHeadersReply(); +} + +// *** Tests of gRPC stream state *** + +// gRPC error in response to message calls results in an error +TEST_F(OrderingTest, GrpcErrorInline) { + initialize(nullptr); + + EXPECT_CALL(stream_delegate_, send(_, false)); + sendRequestHeadersGet(true); + EXPECT_CALL(encoder_callbacks_, sendLocalReply(Http::Code::InternalServerError, _, _, _, _)); + sendGrpcError(); + // The rest of the filter isn't called after this. +} + +// gRPC error in response to message results in connection being dropped +// if failures are ignored +TEST_F(OrderingTest, GrpcErrorInlineIgnored) { + initialize([](ExternalProcessor& cfg) { cfg.set_failure_mode_allow(true); }); + + EXPECT_CALL(stream_delegate_, send(_, false)); + sendRequestHeadersGet(true); + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + sendGrpcError(); + + // After that we ignore the processor + sendRequestTrailers(); + sendResponseHeaders(false); + sendResponseBody(); + sendResponseTrailers(); +} + +// gRPC error in between calls should still be delivered +TEST_F(OrderingTest, GrpcErrorOutOfLine) { + initialize(nullptr); + + EXPECT_CALL(stream_delegate_, send(_, false)); + sendRequestHeadersGet(true); + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + sendRequestHeadersReply(); + sendRequestTrailers(); + + EXPECT_CALL(encoder_callbacks_, sendLocalReply(Http::Code::InternalServerError, _, _, _, _)); + sendGrpcError(); +} + +// gRPC close after a proper message means rest of stream is ignored +TEST_F(OrderingTest, GrpcCloseAfter) { + initialize(nullptr); + + EXPECT_CALL(stream_delegate_, send(_, false)); + sendRequestHeadersGet(true); + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + sendRequestHeadersReply(); + closeGrpcStream(); + + // After that we ignore the processor + sendRequestTrailers(); + sendResponseHeaders(false); + sendResponseBody(); + sendResponseTrailers(); +} + +} // namespace +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/http/ext_proc/utils.h b/test/extensions/filters/http/ext_proc/utils.h index 93ec895d0a04..e8b931dacc7a 100644 --- a/test/extensions/filters/http/ext_proc/utils.h +++ b/test/extensions/filters/http/ext_proc/utils.h @@ -3,6 +3,9 @@ #include "envoy/config/core/v3/base.pb.h" #include "envoy/http/header_map.h" +#include "absl/strings/str_format.h" +#include "gmock/gmock.h" + namespace Envoy { namespace Extensions { namespace HttpFilters { @@ -15,6 +18,23 @@ class ExtProcTestUtility { const envoy::config::core::v3::HeaderMap& actual); }; +MATCHER_P(HeaderProtosEqual, expected, "HTTP header protos match") { + return ExtProcTestUtility::headerProtosEqualIgnoreOrder(expected, arg); +} + +MATCHER_P(HasNoHeader, key, absl::StrFormat("Headers have no value for \"%s\"", key)) { + return arg.get(Http::LowerCaseString(std::string(key))).empty(); +} + +MATCHER_P2(SingleHeaderValueIs, key, value, + absl::StrFormat("Header \"%s\" equals \"%s\"", key, value)) { + const auto hdr = arg.get(Http::LowerCaseString(std::string(key))); + if (hdr.size() != 1) { + return false; + } + return hdr[0]->value() == value; +} + } // namespace ExternalProcessing } // namespace HttpFilters } // namespace Extensions