diff --git a/api/envoy/service/ext_proc/v3alpha/external_processor.proto b/api/envoy/service/ext_proc/v3alpha/external_processor.proto index cac1dde34d9e..5b0696bfc3b0 100644 --- a/api/envoy/service/ext_proc/v3alpha/external_processor.proto +++ b/api/envoy/service/ext_proc/v3alpha/external_processor.proto @@ -6,7 +6,6 @@ import "envoy/config/core/v3/base.proto"; import "envoy/extensions/filters/http/ext_proc/v3alpha/processing_mode.proto"; import "envoy/type/v3/http_status.proto"; -import "google/protobuf/duration.proto"; import "google/protobuf/struct.proto"; import "udpa/annotations/status.proto"; @@ -289,10 +288,13 @@ message GrpcStatus { // Change HTTP headers or trailers by appending, replacing, or removing // headers. message HeaderMutation { - // Add or replace HTTP headers. + // Add or replace HTTP headers. Attempts to set the value of + // any "x-envoy" header, and attempts to set the ":method", + // ":authority", ":scheme", or "host" headers will be ignored. repeated config.core.v3.HeaderValueOption set_headers = 1; - // Remove these HTTP headers. + // Remove these HTTP headers. Attempts to remove system headers -- + // any header starting with ":", plus "host" -- will be ignored. repeated string remove_headers = 2; } diff --git a/docs/root/configuration/http/http_filters/ext_proc_filter.rst b/docs/root/configuration/http/http_filters/ext_proc_filter.rst index 851dea86e1c4..f00eb27d1bf1 100644 --- a/docs/root/configuration/http/http_filters/ext_proc_filter.rst +++ b/docs/root/configuration/http/http_filters/ext_proc_filter.rst @@ -17,3 +17,24 @@ messages, and the server must reply with :ref:`ProcessingResponse `. This filter is a work in progress. In its current state, it actually does nothing. + +Statistics +---------- +This filter outputs statistics in the +*http..ext_proc.* namespace. The :ref:`stat prefix +` +comes from the owning HTTP connection manager. + +The following statistics are supported: + +.. csv-table:: + :header: Name, Type, Description + :widths: auto + + streams_started, Counter, The number of gRPC streams that have been started to send to the external processing service + streams_msgs_sent, Counter, The number of messages sent on those streams + streams_msgs_received, Counter, The number of messages received on those streams + spurious_msgs_received, Counter, The number of unexpected messages received that violated the protocol + streams_closed, Counter, The number of streams successfully closed on either end + streams_failed, Counter, The number of times a stream produced a gRPC error + failure_mode_allowed, Counter, The number of times an error was ignored due to configuration diff --git a/generated_api_shadow/envoy/service/ext_proc/v3alpha/external_processor.proto b/generated_api_shadow/envoy/service/ext_proc/v3alpha/external_processor.proto index cac1dde34d9e..5b0696bfc3b0 100644 --- a/generated_api_shadow/envoy/service/ext_proc/v3alpha/external_processor.proto +++ b/generated_api_shadow/envoy/service/ext_proc/v3alpha/external_processor.proto @@ -6,7 +6,6 @@ import "envoy/config/core/v3/base.proto"; import "envoy/extensions/filters/http/ext_proc/v3alpha/processing_mode.proto"; import "envoy/type/v3/http_status.proto"; -import "google/protobuf/duration.proto"; import "google/protobuf/struct.proto"; import "udpa/annotations/status.proto"; @@ -289,10 +288,13 @@ message GrpcStatus { // Change HTTP headers or trailers by appending, replacing, or removing // headers. message HeaderMutation { - // Add or replace HTTP headers. + // Add or replace HTTP headers. Attempts to set the value of + // any "x-envoy" header, and attempts to set the ":method", + // ":authority", ":scheme", or "host" headers will be ignored. repeated config.core.v3.HeaderValueOption set_headers = 1; - // Remove these HTTP headers. + // Remove these HTTP headers. Attempts to remove system headers -- + // any header starting with ":", plus "host" -- will be ignored. repeated string remove_headers = 2; } diff --git a/source/common/http/header_utility.cc b/source/common/http/header_utility.cc index 41c92645963d..4c2ff9904af2 100644 --- a/source/common/http/header_utility.cc +++ b/source/common/http/header_utility.cc @@ -309,5 +309,10 @@ Http::Status HeaderUtility::checkRequiredHeaders(const Http::RequestHeaderMap& h return Http::okStatus(); } +bool HeaderUtility::isRemovableHeader(absl::string_view header) { + return (header.empty() || header[0] != ':') && + !absl::EqualsIgnoreCase(header, Headers::get().HostLegacy.get()); +} + } // namespace Http } // namespace Envoy diff --git a/source/common/http/header_utility.h b/source/common/http/header_utility.h index 291200134552..6ccda5423b3f 100644 --- a/source/common/http/header_utility.h +++ b/source/common/http/header_utility.h @@ -186,6 +186,13 @@ class HeaderUtility { * missing. */ static Http::Status checkRequiredHeaders(const Http::RequestHeaderMap& headers); + + /** + * Returns true if a header may be safely removed without causing additional + * problems. Effectively, header names beginning with ":" and the "host" header + * may not be removed. + */ + static bool isRemovableHeader(absl::string_view header); }; } // namespace Http } // namespace Envoy diff --git a/source/common/router/BUILD b/source/common/router/BUILD index f4873e98aa27..2920805d7c8b 100644 --- a/source/common/router/BUILD +++ b/source/common/router/BUILD @@ -368,6 +368,7 @@ envoy_cc_library( deps = [ ":header_formatter_lib", "//include/envoy/http:header_map_interface", + "//source/common/http:header_utility_lib", "//source/common/http:headers_lib", "//source/common/protobuf:utility_lib", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", diff --git a/source/common/router/header_parser.cc b/source/common/router/header_parser.cc index 1e4a9f4e3098..557a888c53fe 100644 --- a/source/common/router/header_parser.cc +++ b/source/common/router/header_parser.cc @@ -7,6 +7,7 @@ #include "envoy/config/core/v3/base.pb.h" #include "common/common/assert.h" +#include "common/http/header_utility.h" #include "common/http/headers.h" #include "common/protobuf/utility.h" @@ -258,7 +259,7 @@ HeaderParserPtr HeaderParser::configure( // We reject :-prefix (e.g. :path) removal here. This is dangerous, since other aspects of // request finalization assume their existence and they are needed for well-formedness in most // cases. - if (header[0] == ':' || Http::LowerCaseString(header).get() == "host") { + if (!Http::HeaderUtility::isRemovableHeader(header)) { throw EnvoyException(":-prefixed or host headers may not be removed"); } header_parser->headers_to_remove_.emplace_back(header); diff --git a/source/extensions/filters/http/ext_authz/ext_authz.cc b/source/extensions/filters/http/ext_authz/ext_authz.cc index 18a5e1e0c885..37247272d879 100644 --- a/source/extensions/filters/http/ext_authz/ext_authz.cc +++ b/source/extensions/filters/http/ext_authz/ext_authz.cc @@ -212,8 +212,7 @@ void Filter::onComplete(Filters::Common::ExtAuthz::ResponsePtr&& response) { for (const auto& header : response->headers_to_remove) { // We don't allow removing any :-prefixed headers, nor Host, as removing // them would make the request malformed. - if (absl::StartsWithIgnoreCase(absl::string_view(header.get()), ":") || - header == Http::Headers::get().HostLegacy) { + if (!Http::HeaderUtility::isRemovableHeader(header.get())) { continue; } ENVOY_STREAM_LOG(trace, "'{}'", *callbacks_, header.get()); diff --git a/source/extensions/filters/http/ext_proc/BUILD b/source/extensions/filters/http/ext_proc/BUILD index 93b55a5586f8..e9b25c691e39 100644 --- a/source/extensions/filters/http/ext_proc/BUILD +++ b/source/extensions/filters/http/ext_proc/BUILD @@ -14,8 +14,13 @@ envoy_cc_library( srcs = ["ext_proc.cc"], hdrs = ["ext_proc.h"], deps = [ + ":client_interface", + ":mutation_utils_lib", "//include/envoy/http:filter_interface", + "//include/envoy/http:header_map_interface", + "//include/envoy/stats:stats_macros", "//source/extensions/filters/http/common:pass_through_filter_lib", + "@com_google_absl//absl/strings:str_format", "@envoy_api//envoy/extensions/filters/http/ext_proc/v3alpha:pkg_cc_proto", ], ) @@ -27,6 +32,7 @@ envoy_cc_extension( security_posture = "unknown", status = "alpha", deps = [ + ":client_lib", ":ext_proc", "//source/extensions/filters/http:well_known_names", "//source/extensions/filters/http/common:factory_base_lib", @@ -43,6 +49,18 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "mutation_utils_lib", + srcs = ["mutation_utils.cc"], + hdrs = ["mutation_utils.h"], + deps = [ + "//include/envoy/http:header_map_interface", + "//source/common/http:header_utility_lib", + "//source/common/protobuf:utility_lib", + "@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_proto", + ], +) + envoy_cc_library( name = "client_lib", srcs = ["client_impl.cc"], diff --git a/source/extensions/filters/http/ext_proc/config.cc b/source/extensions/filters/http/ext_proc/config.cc index 83d6ac7de14c..7db245641a54 100644 --- a/source/extensions/filters/http/ext_proc/config.cc +++ b/source/extensions/filters/http/ext_proc/config.cc @@ -2,6 +2,7 @@ #include +#include "extensions/filters/http/ext_proc/client_impl.h" #include "extensions/filters/http/ext_proc/ext_proc.h" namespace Envoy { @@ -11,11 +12,19 @@ namespace ExternalProcessing { Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromProtoTyped( const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& proto_config, - const std::string&, Server::Configuration::FactoryContext&) { - const auto filter_config = std::make_shared(proto_config); + const std::string& stats_prefix, Server::Configuration::FactoryContext& context) { + const uint32_t timeout_ms = + PROTOBUF_GET_MS_OR_DEFAULT(proto_config.grpc_service(), timeout, DefaultTimeout); + const auto filter_config = std::make_shared( + proto_config, std::chrono::milliseconds(timeout_ms), context.scope(), stats_prefix); - return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) { - callbacks.addStreamFilter(Http::StreamFilterSharedPtr{std::make_shared(filter_config)}); + return [filter_config, grpc_service = proto_config.grpc_service(), + &context](Http::FilterChainFactoryCallbacks& callbacks) { + auto client = std::make_unique( + context.clusterManager().grpcAsyncClientManager(), grpc_service, context.scope()); + + callbacks.addStreamFilter( + Http::StreamFilterSharedPtr{std::make_shared(filter_config, std::move(client))}); }; } diff --git a/source/extensions/filters/http/ext_proc/config.h b/source/extensions/filters/http/ext_proc/config.h index c6e961ffa9e3..d6b0df8ded2c 100644 --- a/source/extensions/filters/http/ext_proc/config.h +++ b/source/extensions/filters/http/ext_proc/config.h @@ -21,6 +21,8 @@ class ExternalProcessingFilterConfig ExternalProcessingFilterConfig() : FactoryBase(HttpFilterNames::get().ExternalProcessing) {} private: + static constexpr uint64_t DefaultTimeout = 200; + Http::FilterFactoryCb createFilterFactoryFromProtoTyped( const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& proto_config, const std::string& stats_prefix, Server::Configuration::FactoryContext& context) override; diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 7beb30461201..807a84de4ac3 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -1,11 +1,133 @@ #include "extensions/filters/http/ext_proc/ext_proc.h" +#include "extensions/filters/http/ext_proc/mutation_utils.h" + +#include "absl/strings/str_format.h" + namespace Envoy { namespace Extensions { namespace HttpFilters { namespace ExternalProcessing { -void Filter::onDestroy() {} +using envoy::service::ext_proc::v3alpha::ProcessingRequest; +using envoy::service::ext_proc::v3alpha::ProcessingResponse; + +using Http::FilterHeadersStatus; +using Http::RequestHeaderMap; + +static const std::string kErrorPrefix = "ext_proc error"; + +void Filter::closeStream() { + if (!stream_closed_) { + if (stream_) { + ENVOY_LOG(debug, "Closing gRPC stream to processing server"); + stream_->close(); + stats_.streams_closed_.inc(); + } + stream_closed_ = true; + } +} + +void Filter::onDestroy() { closeStream(); } + +FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_of_stream) { + // We're at the start, so start the stream and send a headers message + request_headers_ = &headers; + stream_ = client_->start(*this, config_->grpcTimeout()); + stats_.streams_started_.inc(); + ProcessingRequest req; + auto* headers_req = req.mutable_request_headers(); + MutationUtils::buildHttpHeaders(headers, *headers_req->mutable_headers()); + headers_req->set_end_of_stream(end_of_stream); + request_state_ = FilterState::HEADERS; + stream_->send(std::move(req), false); + stats_.stream_msgs_sent_.inc(); + + // Wait until we have a gRPC response before allowing any more callbacks + return FilterHeadersStatus::StopAllIterationAndWatermark; +} + +void Filter::onReceiveMessage( + std::unique_ptr&& r) { + auto response = std::move(r); + bool message_valid = false; + ENVOY_LOG(debug, "Received gRPC message. State = {}", request_state_); + + // This next section will grow as we support the rest of the protocol + if (request_state_ == FilterState::HEADERS) { + if (response->has_request_headers()) { + ENVOY_LOG(debug, "applying request_headers response"); + message_valid = true; + const auto& headers_response = response->request_headers(); + if (headers_response.has_response()) { + const auto& common_response = headers_response.response(); + if (common_response.has_header_mutation()) { + MutationUtils::applyHeaderMutations(common_response.header_mutation(), *request_headers_); + } + } + } else if (response->has_immediate_response()) { + // To be implemented later. Leave stream open to allow people to implement + // correct servers that don't break us. + message_valid = true; + } + request_state_ = FilterState::IDLE; + decoder_callbacks_->continueDecoding(); + } + + if (message_valid) { + stats_.stream_msgs_received_.inc(); + } else { + stats_.spurious_msgs_received_.inc(); + // Ignore messages received out of order. However, close the stream to + // protect ourselves since the server is not following the protocol. + ENVOY_LOG(warn, "Spurious response message received on gRPC stream"); + closeStream(); + } +} + +void Filter::onGrpcError(Grpc::Status::GrpcStatus status) { + ENVOY_LOG(debug, "Received gRPC error on stream: {}", status); + stream_closed_ = true; + stats_.streams_failed_.inc(); + if (config_->failureModeAllow()) { + // Ignore this and treat as a successful close + onGrpcClose(); + stats_.failure_mode_allowed_.inc(); + } else { + // Use a switch here now because there will be more than two + // cases very soon. + switch (request_state_) { + case FilterState::HEADERS: + request_state_ = FilterState::IDLE; + decoder_callbacks_->sendLocalReply( + Http::Code::InternalServerError, "", nullptr, absl::nullopt, + absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status)); + break; + default: + // Nothing else to do + break; + } + } +} + +void Filter::onGrpcClose() { + ENVOY_LOG(debug, "Received gRPC stream close"); + stream_closed_ = true; + stats_.streams_closed_.inc(); + // Successful close. We can ignore the stream for the rest of our request + // and response processing. + // Use a switch here now because there will be more than two + // cases very soon. + switch (request_state_) { + case FilterState::HEADERS: + request_state_ = FilterState::IDLE; + decoder_callbacks_->continueDecoding(); + break; + default: + // Nothing to do otherwise + break; + } +} } // namespace ExternalProcessing } // namespace HttpFilters diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 069cc3008c17..5ccfb49453df 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -1,41 +1,121 @@ #pragma once +#include #include #include "envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.pb.h" #include "envoy/grpc/async_client.h" #include "envoy/http/filter.h" +#include "envoy/stats/scope.h" +#include "envoy/stats/stats_macros.h" #include "common/common/logger.h" #include "extensions/filters/http/common/pass_through_filter.h" +#include "extensions/filters/http/ext_proc/client.h" namespace Envoy { namespace Extensions { namespace HttpFilters { namespace ExternalProcessing { +#define ALL_EXT_PROC_FILTER_STATS(COUNTER) \ + COUNTER(streams_started) \ + COUNTER(stream_msgs_sent) \ + COUNTER(stream_msgs_received) \ + COUNTER(spurious_msgs_received) \ + COUNTER(streams_closed) \ + COUNTER(streams_failed) \ + COUNTER(failure_mode_allowed) + +struct ExtProcFilterStats { + ALL_EXT_PROC_FILTER_STATS(GENERATE_COUNTER_STRUCT) +}; + class FilterConfig { public: - FilterConfig(const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& config) - : failure_mode_allow_(config.failure_mode_allow()) {} + FilterConfig(const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& config, + const std::chrono::milliseconds grpc_timeout, Stats::Scope& scope, + const std::string& stats_prefix) + : failure_mode_allow_(config.failure_mode_allow()), grpc_timeout_(grpc_timeout), + stats_(generateStats(stats_prefix, config.stat_prefix(), scope)) {} bool failureModeAllow() const { return failure_mode_allow_; } + const std::chrono::milliseconds& grpcTimeout() const { return grpc_timeout_; } + + const ExtProcFilterStats& stats() const { return stats_; } + private: + ExtProcFilterStats generateStats(const std::string& prefix, + const std::string& filter_stats_prefix, Stats::Scope& scope) { + const std::string final_prefix = absl::StrCat(prefix, "ext_proc.", filter_stats_prefix); + return {ALL_EXT_PROC_FILTER_STATS(POOL_COUNTER_PREFIX(scope, final_prefix))}; + } + const bool failure_mode_allow_; + const std::chrono::milliseconds grpc_timeout_; + + ExtProcFilterStats stats_; }; using FilterConfigSharedPtr = std::shared_ptr; -class Filter : public Logger::Loggable, public Http::PassThroughFilter { +class Filter : public Logger::Loggable, + public Http::PassThroughFilter, + public ExternalProcessorCallbacks { + // The state of filter execution -- this is used to determine + // how to handle gRPC callbacks. + enum class FilterState { + // The filter is not waiting for anything, so any response on the + // gRPC stream is spurious and will result in the filter closing + // the stream. + IDLE, + // The filter is waiting for a "request_headers" or a "response_headers" message. + // Any other response on the gRPC stream will be treated as spurious. + HEADERS, + }; + public: - Filter(const FilterConfigSharedPtr& config) : config_(config) {} + Filter(const FilterConfigSharedPtr& config, ExternalProcessorClientPtr&& client) + : config_(config), client_(std::move(client)), stats_(config->stats()) {} void onDestroy() override; + void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override { + decoder_callbacks_ = &callbacks; + } + + Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers, + bool end_stream) override; + + // ExternalProcessorCallbacks + + void onReceiveMessage( + std::unique_ptr&& response) override; + + void onGrpcError(Grpc::Status::GrpcStatus error) override; + + void onGrpcClose() override; + private: - FilterConfigSharedPtr config_; + void closeStream(); + + const FilterConfigSharedPtr config_; + const ExternalProcessorClientPtr client_; + ExtProcFilterStats stats_; + + Http::StreamDecoderFilterCallbacks* decoder_callbacks_ = nullptr; + + // The state of the request-processing, or "decoding" side of the filter. + // We maintain separate states for encoding and decoding since they may + // be interleaved. + FilterState request_state_ = FilterState::IDLE; + + ExternalProcessorStreamPtr stream_; + bool stream_closed_ = false; + + Http::HeaderMap* request_headers_ = nullptr; }; } // namespace ExternalProcessing diff --git a/source/extensions/filters/http/ext_proc/mutation_utils.cc b/source/extensions/filters/http/ext_proc/mutation_utils.cc new file mode 100644 index 000000000000..4f31b45a4751 --- /dev/null +++ b/source/extensions/filters/http/ext_proc/mutation_utils.cc @@ -0,0 +1,68 @@ +#include "extensions/filters/http/ext_proc/mutation_utils.h" + +#include "envoy/http/header_map.h" + +#include "common/http/header_utility.h" +#include "common/http/headers.h" +#include "common/protobuf/utility.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { + +using Http::Headers; +using Http::LowerCaseString; + +void MutationUtils::buildHttpHeaders(const Http::HeaderMap& headers_in, + envoy::config::core::v3::HeaderMap& headers_out) { + headers_in.iterate([&headers_out](const Http::HeaderEntry& e) -> Http::HeaderMap::Iterate { + auto* new_header = headers_out.add_headers(); + new_header->set_key(std::string(e.key().getStringView())); + new_header->set_value(std::string(e.value().getStringView())); + return Http::HeaderMap::Iterate::Continue; + }); +} + +void MutationUtils::applyHeaderMutations( + const envoy::service::ext_proc::v3alpha::HeaderMutation& mutation, Http::HeaderMap& headers) { + for (const auto& remove_header : mutation.remove_headers()) { + if (Http::HeaderUtility::isRemovableHeader(remove_header)) { + headers.remove(LowerCaseString(remove_header)); + } + } + + for (const auto& sh : mutation.set_headers()) { + if (!sh.has_header()) { + continue; + } + if (isSettableHeader(sh.header().key())) { + // Make "false" the default. This is logical and matches the ext_authz + // filter. However, the router handles this same protobuf and uses "true" + // as the default instead. + const bool append = PROTOBUF_GET_WRAPPED_OR_DEFAULT(sh, append, false); + if (append) { + headers.addCopy(LowerCaseString(sh.header().key()), sh.header().value()); + } else { + headers.setCopy(LowerCaseString(sh.header().key()), sh.header().value()); + } + } + } +} + +// Ignore attempts to set certain sensitive headers that can break later processing. +// We may re-enable some of these after further testing. This logic is specific +// to the ext_proc filter so it is not shared with HeaderUtils. +bool MutationUtils::isSettableHeader(absl::string_view key) { + const auto& headers = Headers::get(); + return !absl::EqualsIgnoreCase(key, headers.HostLegacy.get()) && + !absl::EqualsIgnoreCase(key, headers.Host.get()) && + !absl::EqualsIgnoreCase(key, headers.Method.get()) && + !absl::EqualsIgnoreCase(key, headers.Scheme.get()) && + !absl::StartsWithIgnoreCase(key, headers.prefix()); +} + +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy \ No newline at end of file diff --git a/source/extensions/filters/http/ext_proc/mutation_utils.h b/source/extensions/filters/http/ext_proc/mutation_utils.h new file mode 100644 index 000000000000..cad4929a3d61 --- /dev/null +++ b/source/extensions/filters/http/ext_proc/mutation_utils.h @@ -0,0 +1,29 @@ +#pragma once + +#include "envoy/http/header_map.h" +#include "envoy/service/ext_proc/v3alpha/external_processor.pb.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { + +class MutationUtils { +public: + // Convert a header map until a protobuf + static void buildHttpHeaders(const Http::HeaderMap& headers_in, + envoy::config::core::v3::HeaderMap& headers_out); + + // Modify header map based on a set of mutations from a protobuf + static void + applyHeaderMutations(const envoy::service::ext_proc::v3alpha::HeaderMutation& mutation, + Http::HeaderMap& headers); + +private: + static bool isSettableHeader(absl::string_view key); +}; + +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy \ No newline at end of file diff --git a/test/common/http/header_utility_test.cc b/test/common/http/header_utility_test.cc index 1d6548a29211..4e60542f0be0 100644 --- a/test/common/http/header_utility_test.cc +++ b/test/common/http/header_utility_test.cc @@ -692,5 +692,14 @@ TEST(PercentEncoding, ShouldCloseConnection) { Protocol::Http11, TestRequestHeaderMapImpl{{"proxy-connection", "foo,close"}})); } +TEST(RequiredHeaders, IsRemovableHeader) { + EXPECT_FALSE(HeaderUtility::isRemovableHeader(":path")); + EXPECT_FALSE(HeaderUtility::isRemovableHeader("host")); + EXPECT_FALSE(HeaderUtility::isRemovableHeader("Host")); + EXPECT_TRUE(HeaderUtility::isRemovableHeader("")); + EXPECT_TRUE(HeaderUtility::isRemovableHeader("hostname")); + EXPECT_TRUE(HeaderUtility::isRemovableHeader("Content-Type")); +} + } // namespace Http } // namespace Envoy diff --git a/test/extensions/filters/http/ext_proc/BUILD b/test/extensions/filters/http/ext_proc/BUILD index ff8393e41984..8838adb851cf 100644 --- a/test/extensions/filters/http/ext_proc/BUILD +++ b/test/extensions/filters/http/ext_proc/BUILD @@ -5,6 +5,7 @@ load( load( "//test/extensions:extensions_build_system.bzl", "envoy_extension_cc_test", + "envoy_extension_cc_test_library", ) licenses(["notice"]) # Apache 2 @@ -27,7 +28,10 @@ envoy_extension_cc_test( srcs = ["filter_test.cc"], extension_name = "envoy.filters.http.ext_proc", deps = [ + ":mock_server_lib", + ":utils_lib", "//source/extensions/filters/http/ext_proc", + "//test/common/http:common_lib", "//test/mocks/server:factory_context_mocks", "//test/test_common:test_runtime_lib", ], @@ -46,3 +50,51 @@ envoy_extension_cc_test( "@envoy_api//envoy/config/core/v3:pkg_cc_proto", ], ) + +envoy_extension_cc_test( + name = "mutation_utils_test", + srcs = ["mutation_utils_test.cc"], + extension_name = "envoy.filters.http.ext_proc", + deps = [ + ":utils_lib", + "//source/extensions/filters/http/ext_proc:mutation_utils_lib", + "//test/test_common:utility_lib", + ], +) + +envoy_extension_cc_test( + name = "ext_proc_integration_test", + srcs = ["ext_proc_integration_test.cc"], + extension_name = "envoy.filters.http.ext_proc", + deps = [ + ":utils_lib", + "//source/extensions/filters/http/ext_proc:config", + "//test/common/http:common_lib", + "//test/integration:http_integration_lib", + "//test/test_common:utility_lib", + "@envoy_api//envoy/extensions/filters/http/ext_proc/v3alpha:pkg_cc_proto", + "@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_proto", + ], +) + +envoy_extension_cc_test_library( + name = "mock_server_lib", + srcs = ["mock_server.cc"], + hdrs = ["mock_server.h"], + extension_name = "envoy.filters.http.ext_proc", + deps = [ + "//source/extensions/filters/http/ext_proc:client_interface", + ], +) + +envoy_extension_cc_test_library( + name = "utils_lib", + srcs = ["utils.cc"], + hdrs = ["utils.h"], + extension_name = "envoy.filters.http.ext_proc", + deps = [ + "//include/envoy/http:header_map_interface", + "//test/test_common:utility_lib", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + ], +) diff --git a/test/extensions/filters/http/ext_proc/config_test.cc b/test/extensions/filters/http/ext_proc/config_test.cc index 59d3757457ab..34e38c8df433 100644 --- a/test/extensions/filters/http/ext_proc/config_test.cc +++ b/test/extensions/filters/http/ext_proc/config_test.cc @@ -36,7 +36,7 @@ TEST(HttpExtProcConfigTest, CorrectConfig) { ProtobufTypes::MessagePtr proto_config = factory.createEmptyConfigProto(); TestUtility::loadFromYaml(yaml, *proto_config); - testing::StrictMock context; + testing::NiceMock context; EXPECT_CALL(context, messageValidationVisitor()); Http::FilterFactoryCb cb = factory.createFilterFactoryFromProto(*proto_config, "stats", context); Http::MockFilterChainFactoryCallbacks filter_callback; diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc new file mode 100644 index 000000000000..0e921e71bb09 --- /dev/null +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -0,0 +1,191 @@ +#include "envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.pb.h" +#include "envoy/network/address.h" +#include "envoy/service/ext_proc/v3alpha/external_processor.pb.h" + +#include "extensions/filters/http/ext_proc/config.h" + +#include "test/common/http/common.h" +#include "test/extensions/filters/http/ext_proc/utils.h" +#include "test/integration/http_integration.h" +#include "test/test_common/utility.h" + +#include "gtest/gtest.h" + +namespace Envoy { + +using envoy::service::ext_proc::v3alpha::ProcessingRequest; +using envoy::service::ext_proc::v3alpha::ProcessingResponse; +using Extensions::HttpFilters::ExternalProcessing::ExtProcTestUtility; + +using Http::LowerCaseString; + +class ExtProcIntegrationTest : public HttpIntegrationTest, + public Grpc::GrpcClientIntegrationParamTest { +protected: + ExtProcIntegrationTest() : HttpIntegrationTest(Http::CodecClient::Type::HTTP1, ipVersion()) {} + + void createUpstreams() override { + // Need to create a separate "upstream" for the gRPC server + HttpIntegrationTest::createUpstreams(); + addFakeUpstream(FakeHttpConnection::Type::HTTP2); + } + + void TearDown() override { + if (processor_connection_) { + ASSERT_TRUE(processor_connection_->close()); + ASSERT_TRUE(processor_connection_->waitForDisconnect()); + } + cleanupUpstreamAndDownstream(); + } + + void initializeConfig() { + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + // This is the cluster for our gRPC server, starting by copying an existing cluster + auto* server_cluster = bootstrap.mutable_static_resources()->add_clusters(); + server_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + server_cluster->set_name("ext_proc_server"); + server_cluster->mutable_load_assignment()->set_cluster_name("ext_proc_server"); + ConfigHelper::setHttp2(*server_cluster); + + // Load configuration of the server from YAML and use a helper to add a grpc_service + // stanza pointing to the cluster that we just made + setGrpcService(*proto_config_.mutable_grpc_service(), "ext_proc_server", + fake_upstreams_.back()->localAddress()); + + // Construct a configuration proto for our filter and then re-write it + // to JSON so that we can add it to the overall config + envoy::config::listener::v3::Filter ext_proc_filter; + ext_proc_filter.set_name("envoy.filters.http.ext_proc"); + ext_proc_filter.mutable_typed_config()->PackFrom(proto_config_); + config_helper_.addFilter(MessageUtil::getJsonStringFromMessage(ext_proc_filter)); + }); + } + + void waitForFirstMessage(ProcessingRequest& request) { + ASSERT_TRUE(fake_upstreams_.back()->waitForHttpConnection(*dispatcher_, processor_connection_)); + ASSERT_TRUE(processor_connection_->waitForNewStream(*dispatcher_, processor_stream_)); + ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, request)); + } + + envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor proto_config_{}; + FakeHttpConnectionPtr processor_connection_; + FakeStreamPtr processor_stream_; +}; + +INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, ExtProcIntegrationTest, + GRPC_CLIENT_INTEGRATION_PARAMS); + +TEST_P(ExtProcIntegrationTest, GetAndCloseStream) { + initializeConfig(); + setDownstreamProtocol(Http::CodecClient::Type::HTTP2); + HttpIntegrationTest::initialize(); + + auto conn = makeClientConnection(lookupPort("http")); + codec_client_ = makeHttpConnection(std::move(conn)); + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + auto response = codec_client_->makeHeaderOnlyRequest(headers); + + ProcessingRequest request_headers_msg; + waitForFirstMessage(request_headers_msg); + // Just close the stream without doing anything + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + processor_stream_->encodeTrailers(Http::TestResponseTrailerMapImpl{{"grpc-status", "0"}}); + + // Now expect a message to the real upstream + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + + // Respond from the upstream with a simple 200 + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(100, true); + + // Now expect a response to the original request + response->waitForEndStream(); + + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().getStatusValue()); +} + +TEST_P(ExtProcIntegrationTest, GetAndFailStream) { + initializeConfig(); + setDownstreamProtocol(Http::CodecClient::Type::HTTP2); + HttpIntegrationTest::initialize(); + + auto conn = makeClientConnection(lookupPort("http")); + codec_client_ = makeHttpConnection(std::move(conn)); + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + auto response = codec_client_->makeHeaderOnlyRequest(headers); + + ProcessingRequest request_headers_msg; + waitForFirstMessage(request_headers_msg); + // Fail the stream immediately + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "500"}}, true); + + response->waitForEndStream(); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("500", response->headers().getStatusValue()); +} + +TEST_P(ExtProcIntegrationTest, GetAndSetHeaders) { + initializeConfig(); + setDownstreamProtocol(Http::CodecClient::Type::HTTP2); + HttpIntegrationTest::initialize(); + + auto conn = makeClientConnection(lookupPort("http")); + codec_client_ = makeHttpConnection(std::move(conn)); + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + headers.addCopy(LowerCaseString("x-remove-this"), "yes"); + auto response = codec_client_->makeHeaderOnlyRequest(headers); + + ProcessingRequest request_headers_msg; + waitForFirstMessage(request_headers_msg); + + EXPECT_TRUE(request_headers_msg.has_request_headers()); + const auto request_headers = request_headers_msg.request_headers(); + Http::TestRequestHeaderMapImpl expected_request_headers{{":scheme", "http"}, + {":method", "GET"}, + {"host", "host"}, + {":path", "/"}, + {"x-remove-this", "yes"}}; + EXPECT_TRUE(ExtProcTestUtility::headerProtosEqualIgnoreOrder(expected_request_headers, + request_headers.headers())); + + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + + // Ask to change the headers + ProcessingResponse response_msg; + auto response_headers_msg = response_msg.mutable_request_headers(); + auto response_header_mutation = + response_headers_msg->mutable_response()->mutable_header_mutation(); + auto mut1 = response_header_mutation->add_set_headers(); + mut1->mutable_header()->set_key("x-new-header"); + mut1->mutable_header()->set_value("new"); + response_header_mutation->add_remove_headers("x-remove-this"); + processor_stream_->sendGrpcMessage(response_msg); + + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + + auto has_hdr1 = upstream_request_->headers().get(LowerCaseString("x-remove-this")); + EXPECT_TRUE(has_hdr1.empty()); + auto has_hdr2 = upstream_request_->headers().get(LowerCaseString("x-new-header")); + EXPECT_EQ(has_hdr2.size(), 1); + EXPECT_EQ(has_hdr2[0]->value(), "new"); + + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(100, true); + + response->waitForEndStream(); + + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().getStatusValue()); +} + +} // namespace Envoy \ No newline at end of file diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 7f821c6b130e..69968a23c886 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -1,5 +1,8 @@ #include "extensions/filters/http/ext_proc/ext_proc.h" +#include "test/common/http/common.h" +#include "test/extensions/filters/http/ext_proc/mock_server.h" +#include "test/extensions/filters/http/ext_proc/utils.h" #include "test/mocks/http/mocks.h" #include "test/mocks/network/mocks.h" #include "test/mocks/router/mocks.h" @@ -18,21 +21,63 @@ namespace HttpFilters { namespace ExternalProcessing { namespace { -template class HttpFilterTestBase : public T { -public: - HttpFilterTestBase() = default; +using envoy::service::ext_proc::v3alpha::ProcessingRequest; +using envoy::service::ext_proc::v3alpha::ProcessingResponse; +using Http::FilterDataStatus; +using Http::FilterHeadersStatus; +using Http::FilterTrailersStatus; +using Http::LowerCaseString; + +using testing::Invoke; + +using namespace std::chrono_literals; + +class HttpFilterTest : public testing::Test { +protected: void initialize(std::string&& yaml) { + client_ = std::make_unique(); + EXPECT_CALL(*client_, start(_, _)).WillOnce(Invoke(this, &HttpFilterTest::doStart)); + envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor proto_config{}; if (!yaml.empty()) { TestUtility::loadFromYaml(yaml, proto_config); } - config_.reset(new FilterConfig(proto_config)); - filter_ = std::make_unique(config_); + config_.reset(new FilterConfig(proto_config, 200ms, stats_store_, "")); + filter_ = std::make_unique(config_, std::move(client_)); filter_->setEncoderFilterCallbacks(encoder_callbacks_); filter_->setDecoderFilterCallbacks(decoder_callbacks_); } + ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, + const std::chrono::milliseconds& timeout) { + stream_callbacks_ = &callbacks; + stream_timeout_ = timeout; + + auto stream = std::make_unique(); + EXPECT_CALL(*stream, send(_, _)).WillRepeatedly(Invoke(this, &HttpFilterTest::doSend)); + EXPECT_CALL(*stream, close()).WillRepeatedly(Invoke(this, &HttpFilterTest::doSendClose)); + return stream; + } + + void doSend(ProcessingRequest&& request, bool end_stream) { + ASSERT_FALSE(stream_close_sent_); + last_request_ = std::move(request); + if (end_stream) { + stream_close_sent_ = true; + } + } + + void doSendClose() { + ASSERT_FALSE(stream_close_sent_); + stream_close_sent_ = true; + } + + std::unique_ptr client_; + ExternalProcessorCallbacks* stream_callbacks_ = nullptr; + ProcessingRequest last_request_; + bool stream_close_sent_ = false; + std::chrono::milliseconds stream_timeout_; NiceMock stats_store_; FilterConfigSharedPtr config_; std::unique_ptr filter_; @@ -45,34 +90,342 @@ template class HttpFilterTestBase : public T { Buffer::OwnedImpl data_; }; -class HttpFilterTest : public HttpFilterTestBase { -public: - HttpFilterTest() = default; -}; - TEST_F(HttpFilterTest, SimplestPost) { initialize(R"EOF( grpc_service: envoy_grpc: - cluster_name: "ext_authz_server" + cluster_name: "ext_proc_server" + failure_mode_allow: true + )EOF"); + + EXPECT_TRUE(config_->failureModeAllow()); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_, "POST"); + request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + request_headers_.addCopy(LowerCaseString("content-length"), 10); + request_headers_.addCopy(LowerCaseString("x-some-other-header"), "yes"); + + EXPECT_EQ(FilterHeadersStatus::StopAllIterationAndWatermark, + filter_->decodeHeaders(request_headers_, false)); + + // Verify that call was received by mock gRPC server + EXPECT_FALSE(last_request_.async_mode()); + EXPECT_FALSE(stream_close_sent_); + ASSERT_TRUE(last_request_.has_request_headers()); + const auto request_headers = last_request_.request_headers(); + EXPECT_FALSE(request_headers.end_of_stream()); + + Http::TestRequestHeaderMapImpl expected{{":path", "/"}, + {":method", "POST"}, + {":scheme", "http"}, + {"host", "host"}, + {"content-type", "text/plain"}, + {"content-length", "10"}, + {"x-some-other-header", "yes"}}; + EXPECT_TRUE( + ExtProcTestUtility::headerProtosEqualIgnoreOrder(expected, request_headers.headers())); + + // Send back a response + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + std::unique_ptr resp1 = std::make_unique(); + resp1->mutable_request_headers(); + stream_callbacks_->onReceiveMessage(std::move(resp1)); + + data_.add("foo"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encode100ContinueHeaders(response_headers_)); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true)); + data_.add("bar"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, false)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + filter_->onDestroy(); + EXPECT_TRUE(stream_close_sent_); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); +} + +TEST_F(HttpFilterTest, PostAndChangeHeaders) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + )EOF"); + + HttpTestUtility::addDefaultHeaders(request_headers_, "POST"); + request_headers_.addCopy(LowerCaseString("x-some-other-header"), "yes"); + request_headers_.addCopy(LowerCaseString("x-do-we-want-this"), "no"); + + EXPECT_EQ(FilterHeadersStatus::StopAllIterationAndWatermark, + filter_->decodeHeaders(request_headers_, false)); + + EXPECT_FALSE(last_request_.async_mode()); + EXPECT_FALSE(stream_close_sent_); + ASSERT_TRUE(last_request_.has_request_headers()); + + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + std::unique_ptr resp1 = std::make_unique(); + auto req_headers_response = resp1->mutable_request_headers(); + auto headers_mut = req_headers_response->mutable_response()->mutable_header_mutation(); + auto add1 = headers_mut->add_set_headers(); + add1->mutable_header()->set_key("x-new-header"); + add1->mutable_header()->set_value("new"); + add1->mutable_append()->set_value(false); + auto add2 = headers_mut->add_set_headers(); + add2->mutable_header()->set_key("x-some-other-header"); + add2->mutable_header()->set_value("no"); + add2->mutable_append()->set_value(true); + *headers_mut->add_remove_headers() = "x-do-we-want-this"; + stream_callbacks_->onReceiveMessage(std::move(resp1)); + + // We should now have changed the original header a bit + request_headers_.iterate([](const Http::HeaderEntry& e) -> Http::HeaderMap::Iterate { + std::cerr << e.key().getStringView() << ": " << e.value().getStringView() << '\n'; + return Http::HeaderMap::Iterate::Continue; + }); + auto get1 = request_headers_.get(LowerCaseString("x-new-header")); + EXPECT_EQ(get1.size(), 1); + EXPECT_EQ(get1[0]->key(), "x-new-header"); + EXPECT_EQ(get1[0]->value(), "new"); + auto get2 = request_headers_.get(LowerCaseString("x-some-other-header")); + EXPECT_EQ(get2.size(), 2); + EXPECT_EQ(get2[0]->key(), "x-some-other-header"); + EXPECT_EQ(get2[0]->value(), "yes"); + EXPECT_EQ(get2[1]->key(), "x-some-other-header"); + EXPECT_EQ(get2[1]->value(), "no"); + auto get3 = request_headers_.get(LowerCaseString("x-do-we-want-this")); + EXPECT_TRUE(get3.empty()); + + data_.add("foo"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encode100ContinueHeaders(response_headers_)); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true)); + data_.add("bar"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, false)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + filter_->onDestroy(); + EXPECT_TRUE(stream_close_sent_); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); +} + +TEST_F(HttpFilterTest, PostAndRespondImmediately) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + )EOF"); + + HttpTestUtility::addDefaultHeaders(request_headers_, "POST"); + + EXPECT_EQ(FilterHeadersStatus::StopAllIterationAndWatermark, + filter_->decodeHeaders(request_headers_, false)); + + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + std::unique_ptr resp1 = std::make_unique(); + auto* immediate_response = resp1->mutable_immediate_response(); + immediate_response->mutable_status()->set_code(envoy::type::v3::StatusCode::BadRequest); + immediate_response->set_body("Bad request"); + immediate_response->set_details("Got a bad request"); + stream_callbacks_->onReceiveMessage(std::move(resp1)); + + // Immediate response processing not yet implemented -- all we can expect + // at this point is that continueDecoding is called and that the + // stream is not yet closed. + EXPECT_FALSE(stream_close_sent_); + + data_.add("foo"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encode100ContinueHeaders(response_headers_)); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true)); + data_.add("bar"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, false)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + filter_->onDestroy(); + EXPECT_TRUE(stream_close_sent_); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); +} + +TEST_F(HttpFilterTest, PostAndFail) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + )EOF"); + + EXPECT_FALSE(config_->failureModeAllow()); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_, "POST"); + EXPECT_EQ(FilterHeadersStatus::StopAllIterationAndWatermark, + filter_->decodeHeaders(request_headers_, false)); + EXPECT_FALSE(stream_close_sent_); + + // Oh no! The remote server had a failure! + EXPECT_CALL(decoder_callbacks_, sendLocalReply(Http::Code::InternalServerError, _, _, _, _)); + stream_callbacks_->onGrpcError(Grpc::Status::Internal); + + data_.add("foo"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encode100ContinueHeaders(response_headers_)); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true)); + data_.add("bar"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, false)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + filter_->onDestroy(); + // The other side closed the stream + EXPECT_FALSE(stream_close_sent_); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(1, config_->stats().streams_failed_.value()); +} + +TEST_F(HttpFilterTest, PostAndIgnoreFailure) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" failure_mode_allow: true )EOF"); EXPECT_TRUE(config_->failureModeAllow()); - EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, true)); + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_, "POST"); + EXPECT_EQ(FilterHeadersStatus::StopAllIterationAndWatermark, + filter_->decodeHeaders(request_headers_, false)); + EXPECT_FALSE(stream_close_sent_); + + // Oh no! The remote server had a failure which we will ignore + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + stream_callbacks_->onGrpcError(Grpc::Status::Internal); + data_.add("foo"); - EXPECT_EQ(Http::FilterDataStatus::Continue, filter_->decodeData(data_, true)); - EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); - EXPECT_EQ(Http::FilterHeadersStatus::Continue, - filter_->encode100ContinueHeaders(response_headers_)); - EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true)); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encode100ContinueHeaders(response_headers_)); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true)); data_.add("bar"); - EXPECT_EQ(Http::FilterDataStatus::Continue, filter_->encodeData(data_, false)); - EXPECT_EQ(Http::FilterDataStatus::Continue, filter_->encodeData(data_, true)); - EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, false)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); filter_->onDestroy(); + // The other side closed the stream + EXPECT_FALSE(stream_close_sent_); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); + EXPECT_EQ(1, config_->stats().failure_mode_allowed_.value()); +} + +TEST_F(HttpFilterTest, PostAndClose) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + )EOF"); + + EXPECT_FALSE(config_->failureModeAllow()); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_, "POST"); + EXPECT_EQ(FilterHeadersStatus::StopAllIterationAndWatermark, + filter_->decodeHeaders(request_headers_, false)); + + EXPECT_FALSE(last_request_.async_mode()); + EXPECT_FALSE(stream_close_sent_); + ASSERT_TRUE(last_request_.has_request_headers()); + + // Close the stream, which should tell the filter to keep on going + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + stream_callbacks_->onGrpcClose(); + + data_.add("foo"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encode100ContinueHeaders(response_headers_)); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true)); + data_.add("bar"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, false)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + filter_->onDestroy(); + + // The other side closed the stream + EXPECT_FALSE(stream_close_sent_); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); +} + +TEST_F(HttpFilterTest, OutOfOrder) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + )EOF"); + + HttpTestUtility::addDefaultHeaders(request_headers_, "POST"); + EXPECT_EQ(FilterHeadersStatus::StopAllIterationAndWatermark, + filter_->decodeHeaders(request_headers_, false)); + + EXPECT_FALSE(last_request_.async_mode()); + EXPECT_FALSE(stream_close_sent_); + ASSERT_TRUE(last_request_.has_request_headers()); + + // Return an out-of-order message. The server should close the stream + // and continue as if nothing happened. + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + std::unique_ptr resp1 = std::make_unique(); + resp1->mutable_request_body(); + stream_callbacks_->onReceiveMessage(std::move(resp1)); + + data_.add("foo"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encode100ContinueHeaders(response_headers_)); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true)); + data_.add("bar"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, false)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + filter_->onDestroy(); + + // We closed the stream + EXPECT_TRUE(stream_close_sent_); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(1, config_->stats().spurious_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); } } // namespace diff --git a/test/extensions/filters/http/ext_proc/mock_server.cc b/test/extensions/filters/http/ext_proc/mock_server.cc new file mode 100644 index 000000000000..888144e38518 --- /dev/null +++ b/test/extensions/filters/http/ext_proc/mock_server.cc @@ -0,0 +1,17 @@ +#include "test/extensions/filters/http/ext_proc/mock_server.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { + +MockClient::MockClient() = default; +MockClient::~MockClient() = default; + +MockStream::MockStream() = default; +MockStream::~MockStream() = default; + +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy \ No newline at end of file diff --git a/test/extensions/filters/http/ext_proc/mock_server.h b/test/extensions/filters/http/ext_proc/mock_server.h new file mode 100644 index 000000000000..6e3654d5f9c8 --- /dev/null +++ b/test/extensions/filters/http/ext_proc/mock_server.h @@ -0,0 +1,31 @@ +#pragma once + +#include "extensions/filters/http/ext_proc/client.h" + +#include "gmock/gmock.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { + +class MockClient : public ExternalProcessorClient { +public: + MockClient(); + ~MockClient() override; + MOCK_METHOD(ExternalProcessorStreamPtr, start, + (ExternalProcessorCallbacks&, const std::chrono::milliseconds&)); +}; + +class MockStream : public ExternalProcessorStream { +public: + MockStream(); + ~MockStream() override; + MOCK_METHOD(void, send, (envoy::service::ext_proc::v3alpha::ProcessingRequest&&, bool)); + MOCK_METHOD(void, close, ()); +}; + +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy \ No newline at end of file diff --git a/test/extensions/filters/http/ext_proc/mutation_utils_test.cc b/test/extensions/filters/http/ext_proc/mutation_utils_test.cc new file mode 100644 index 000000000000..fd31976332e3 --- /dev/null +++ b/test/extensions/filters/http/ext_proc/mutation_utils_test.cc @@ -0,0 +1,125 @@ +#include "extensions/filters/http/ext_proc/mutation_utils.h" + +#include "test/extensions/filters/http/ext_proc/utils.h" +#include "test/test_common/utility.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { +namespace { + +using Http::LowerCaseString; + +TEST(MutationUtils, TestBuildHeaders) { + Http::TestRequestHeaderMapImpl headers{ + {":method", "GET"}, + {":path", "/foo/the/bar?size=123"}, + {"content-type", "text/plain; encoding=UTF8"}, + {"x-something-else", "yes"}, + }; + LowerCaseString reference_key("x-reference"); + std::string reference_value("Foo"); + headers.addReference(reference_key, reference_value); + headers.addCopy(LowerCaseString("x-number"), 9999); + + envoy::config::core::v3::HeaderMap proto_headers; + MutationUtils::buildHttpHeaders(headers, proto_headers); + + Http::TestRequestHeaderMapImpl expected{{":method", "GET"}, + {":path", "/foo/the/bar?size=123"}, + {"content-type", "text/plain; encoding=UTF8"}, + {"x-something-else", "yes"}, + {"x-reference", "Foo"}, + {"x-number", "9999"}}; + EXPECT_TRUE(ExtProcTestUtility::headerProtosEqualIgnoreOrder(expected, proto_headers)); +} + +TEST(MutationUtils, TestApplyMutations) { + Http::TestRequestHeaderMapImpl headers{ + {":scheme", "https"}, + {":method", "GET"}, + {":path", "/foo/the/bar?size=123"}, + {"host", "localhost:1000"}, + {":authority", "localhost:1000"}, + {"content-type", "text/plain; encoding=UTF8"}, + {"x-append-this", "1"}, + {"x-replace-this", "Yes"}, + {"x-remove-this", "Yes"}, + {"x-envoy-strange-thing", "No"}, + }; + + envoy::service::ext_proc::v3alpha::HeaderMutation mutation; + auto* s = mutation.add_set_headers(); + s->mutable_append()->set_value(true); + s->mutable_header()->set_key("x-append-this"); + s->mutable_header()->set_value("2"); + s = mutation.add_set_headers(); + s->mutable_append()->set_value(true); + s->mutable_header()->set_key("x-append-this"); + s->mutable_header()->set_value("3"); + s = mutation.add_set_headers(); + s->mutable_append()->set_value(false); + s->mutable_header()->set_key("x-replace-this"); + s->mutable_header()->set_value("no"); + // Default of "append" is "false" and mutations + // are applied in order. + s = mutation.add_set_headers(); + s->mutable_header()->set_key("x-replace-this"); + s->mutable_header()->set_value("nope"); + // Incomplete structures should be ignored + mutation.add_set_headers(); + + mutation.add_remove_headers("x-remove-this"); + // Attempts to remove ":" and "host" headers should be ignored + mutation.add_remove_headers("host"); + mutation.add_remove_headers(":method"); + mutation.add_remove_headers(""); + + // Attempts to set method, host, authority, and x-envoy headers + // should be ignored until we explicitly allow them. + s = mutation.add_set_headers(); + s->mutable_header()->set_key("host"); + s->mutable_header()->set_value("invalid:123"); + s = mutation.add_set_headers(); + s->mutable_header()->set_key("Host"); + s->mutable_header()->set_value("invalid:456"); + s = mutation.add_set_headers(); + s->mutable_header()->set_key(":authority"); + s->mutable_header()->set_value("invalid:789"); + s = mutation.add_set_headers(); + s->mutable_header()->set_key(":method"); + s->mutable_header()->set_value("PATCH"); + s = mutation.add_set_headers(); + s->mutable_header()->set_key(":scheme"); + s->mutable_header()->set_value("http"); + s = mutation.add_set_headers(); + s->mutable_header()->set_key("X-Envoy-StrangeThing"); + s->mutable_header()->set_value("Yes"); + + MutationUtils::applyHeaderMutations(mutation, headers); + + Http::TestRequestHeaderMapImpl expected_headers{ + {":scheme", "https"}, + {":method", "GET"}, + {":path", "/foo/the/bar?size=123"}, + {"host", "localhost:1000"}, + {":authority", "localhost:1000"}, + {"content-type", "text/plain; encoding=UTF8"}, + {"x-append-this", "1"}, + {"x-append-this", "2"}, + {"x-append-this", "3"}, + {"x-replace-this", "nope"}, + {"x-envoy-strange-thing", "No"}, + }; + + EXPECT_TRUE(TestUtility::headerMapEqualIgnoreOrder(headers, expected_headers)); +} + +} // namespace +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy \ No newline at end of file diff --git a/test/extensions/filters/http/ext_proc/utils.cc b/test/extensions/filters/http/ext_proc/utils.cc new file mode 100644 index 000000000000..1bc4464b1de8 --- /dev/null +++ b/test/extensions/filters/http/ext_proc/utils.cc @@ -0,0 +1,24 @@ +#include "test/extensions/filters/http/ext_proc/utils.h" + +#include "test/test_common/utility.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { + +bool ExtProcTestUtility::headerProtosEqualIgnoreOrder( + const Http::HeaderMap& expected, const envoy::config::core::v3::HeaderMap& actual) { + // Comparing header maps is hard because they have duplicates in them. + // So we're going to turn them into a HeaderMap and let Envoy do the work. + Http::TestRequestHeaderMapImpl actual_headers; + for (const auto& header : actual.headers()) { + actual_headers.addCopy(header.key(), header.value()); + } + return TestUtility::headerMapEqualIgnoreOrder(expected, actual_headers); +} + +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy \ No newline at end of file diff --git a/test/extensions/filters/http/ext_proc/utils.h b/test/extensions/filters/http/ext_proc/utils.h new file mode 100644 index 000000000000..93ec895d0a04 --- /dev/null +++ b/test/extensions/filters/http/ext_proc/utils.h @@ -0,0 +1,21 @@ +#pragma once + +#include "envoy/config/core/v3/base.pb.h" +#include "envoy/http/header_map.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { + +class ExtProcTestUtility { +public: + // Compare a reference header map to a proto + static bool headerProtosEqualIgnoreOrder(const Http::HeaderMap& expected, + const envoy::config::core::v3::HeaderMap& actual); +}; + +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy