Skip to content

Commit

Permalink
Implement request header processing in ext_proc (#14385)
Browse files Browse the repository at this point in the history
Send request headers to the server and apply header mutations based
on the response. The rest of the protocol is still ignored.

Signed-off-by: Gregory Brail <gregbrail@google.com>
  • Loading branch information
gbrail authored Jan 11, 2021
1 parent 12c42b5 commit 845f92a
Show file tree
Hide file tree
Showing 25 changed files with 1,229 additions and 40 deletions.
8 changes: 5 additions & 3 deletions api/envoy/service/ext_proc/v3alpha/external_processor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import "envoy/config/core/v3/base.proto";
import "envoy/extensions/filters/http/ext_proc/v3alpha/processing_mode.proto";
import "envoy/type/v3/http_status.proto";

import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";

import "udpa/annotations/status.proto";
Expand Down Expand Up @@ -289,10 +288,13 @@ message GrpcStatus {
// Change HTTP headers or trailers by appending, replacing, or removing
// headers.
message HeaderMutation {
// Add or replace HTTP headers.
// Add or replace HTTP headers. Attempts to set the value of
// any "x-envoy" header, and attempts to set the ":method",
// ":authority", ":scheme", or "host" headers will be ignored.
repeated config.core.v3.HeaderValueOption set_headers = 1;

// Remove these HTTP headers.
// Remove these HTTP headers. Attempts to remove system headers --
// any header starting with ":", plus "host" -- will be ignored.
repeated string remove_headers = 2;
}

Expand Down
21 changes: 21 additions & 0 deletions docs/root/configuration/http/http_filters/ext_proc_filter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,24 @@ messages, and the server must reply with
:ref:`ProcessingResponse <envoy_v3_api_msg_service.ext_proc.v3alpha.ProcessingResponse>`.

This filter is a work in progress. In its current state, it actually does nothing.

Statistics
----------
This filter outputs statistics in the
*http.<stat_prefix>.ext_proc.* namespace. The :ref:`stat prefix
<envoy_v3_api_field_extensions.filters.network.http_connection_manager.v3.HttpConnectionManager.stat_prefix>`
comes from the owning HTTP connection manager.

The following statistics are supported:

.. csv-table::
:header: Name, Type, Description
:widths: auto

streams_started, Counter, The number of gRPC streams that have been started to send to the external processing service
streams_msgs_sent, Counter, The number of messages sent on those streams
streams_msgs_received, Counter, The number of messages received on those streams
spurious_msgs_received, Counter, The number of unexpected messages received that violated the protocol
streams_closed, Counter, The number of streams successfully closed on either end
streams_failed, Counter, The number of times a stream produced a gRPC error
failure_mode_allowed, Counter, The number of times an error was ignored due to configuration

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions source/common/http/header_utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,5 +309,10 @@ Http::Status HeaderUtility::checkRequiredHeaders(const Http::RequestHeaderMap& h
return Http::okStatus();
}

bool HeaderUtility::isRemovableHeader(absl::string_view header) {
return (header.empty() || header[0] != ':') &&
!absl::EqualsIgnoreCase(header, Headers::get().HostLegacy.get());
}

} // namespace Http
} // namespace Envoy
7 changes: 7 additions & 0 deletions source/common/http/header_utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ class HeaderUtility {
* missing.
*/
static Http::Status checkRequiredHeaders(const Http::RequestHeaderMap& headers);

/**
* Returns true if a header may be safely removed without causing additional
* problems. Effectively, header names beginning with ":" and the "host" header
* may not be removed.
*/
static bool isRemovableHeader(absl::string_view header);
};
} // namespace Http
} // namespace Envoy
1 change: 1 addition & 0 deletions source/common/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ envoy_cc_library(
deps = [
":header_formatter_lib",
"//include/envoy/http:header_map_interface",
"//source/common/http:header_utility_lib",
"//source/common/http:headers_lib",
"//source/common/protobuf:utility_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/header_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "envoy/config/core/v3/base.pb.h"

#include "common/common/assert.h"
#include "common/http/header_utility.h"
#include "common/http/headers.h"
#include "common/protobuf/utility.h"

Expand Down Expand Up @@ -258,7 +259,7 @@ HeaderParserPtr HeaderParser::configure(
// We reject :-prefix (e.g. :path) removal here. This is dangerous, since other aspects of
// request finalization assume their existence and they are needed for well-formedness in most
// cases.
if (header[0] == ':' || Http::LowerCaseString(header).get() == "host") {
if (!Http::HeaderUtility::isRemovableHeader(header)) {
throw EnvoyException(":-prefixed or host headers may not be removed");
}
header_parser->headers_to_remove_.emplace_back(header);
Expand Down
3 changes: 1 addition & 2 deletions source/extensions/filters/http/ext_authz/ext_authz.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ void Filter::onComplete(Filters::Common::ExtAuthz::ResponsePtr&& response) {
for (const auto& header : response->headers_to_remove) {
// We don't allow removing any :-prefixed headers, nor Host, as removing
// them would make the request malformed.
if (absl::StartsWithIgnoreCase(absl::string_view(header.get()), ":") ||
header == Http::Headers::get().HostLegacy) {
if (!Http::HeaderUtility::isRemovableHeader(header.get())) {
continue;
}
ENVOY_STREAM_LOG(trace, "'{}'", *callbacks_, header.get());
Expand Down
18 changes: 18 additions & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ envoy_cc_library(
srcs = ["ext_proc.cc"],
hdrs = ["ext_proc.h"],
deps = [
":client_interface",
":mutation_utils_lib",
"//include/envoy/http:filter_interface",
"//include/envoy/http:header_map_interface",
"//include/envoy/stats:stats_macros",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"@com_google_absl//absl/strings:str_format",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3alpha:pkg_cc_proto",
],
)
Expand All @@ -27,6 +32,7 @@ envoy_cc_extension(
security_posture = "unknown",
status = "alpha",
deps = [
":client_lib",
":ext_proc",
"//source/extensions/filters/http:well_known_names",
"//source/extensions/filters/http/common:factory_base_lib",
Expand All @@ -43,6 +49,18 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "mutation_utils_lib",
srcs = ["mutation_utils.cc"],
hdrs = ["mutation_utils.h"],
deps = [
"//include/envoy/http:header_map_interface",
"//source/common/http:header_utility_lib",
"//source/common/protobuf:utility_lib",
"@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_proto",
],
)

envoy_cc_library(
name = "client_lib",
srcs = ["client_impl.cc"],
Expand Down
17 changes: 13 additions & 4 deletions source/extensions/filters/http/ext_proc/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <string>

#include "extensions/filters/http/ext_proc/client_impl.h"
#include "extensions/filters/http/ext_proc/ext_proc.h"

namespace Envoy {
Expand All @@ -11,11 +12,19 @@ namespace ExternalProcessing {

Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& proto_config,
const std::string&, Server::Configuration::FactoryContext&) {
const auto filter_config = std::make_shared<FilterConfig>(proto_config);
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) {
const uint32_t timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config.grpc_service(), timeout, DefaultTimeout);
const auto filter_config = std::make_shared<FilterConfig>(
proto_config, std::chrono::milliseconds(timeout_ms), context.scope(), stats_prefix);

return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) {
callbacks.addStreamFilter(Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config)});
return [filter_config, grpc_service = proto_config.grpc_service(),
&context](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExternalProcessorClientImpl>(
context.clusterManager().grpcAsyncClientManager(), grpc_service, context.scope());

callbacks.addStreamFilter(
Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config, std::move(client))});
};
}

Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/http/ext_proc/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class ExternalProcessingFilterConfig
ExternalProcessingFilterConfig() : FactoryBase(HttpFilterNames::get().ExternalProcessing) {}

private:
static constexpr uint64_t DefaultTimeout = 200;

Http::FilterFactoryCb createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& proto_config,
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) override;
Expand Down
124 changes: 123 additions & 1 deletion source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
@@ -1,11 +1,133 @@
#include "extensions/filters/http/ext_proc/ext_proc.h"

#include "extensions/filters/http/ext_proc/mutation_utils.h"

#include "absl/strings/str_format.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

void Filter::onDestroy() {}
using envoy::service::ext_proc::v3alpha::ProcessingRequest;
using envoy::service::ext_proc::v3alpha::ProcessingResponse;

using Http::FilterHeadersStatus;
using Http::RequestHeaderMap;

static const std::string kErrorPrefix = "ext_proc error";

void Filter::closeStream() {
if (!stream_closed_) {
if (stream_) {
ENVOY_LOG(debug, "Closing gRPC stream to processing server");
stream_->close();
stats_.streams_closed_.inc();
}
stream_closed_ = true;
}
}

void Filter::onDestroy() { closeStream(); }

FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_of_stream) {
// We're at the start, so start the stream and send a headers message
request_headers_ = &headers;
stream_ = client_->start(*this, config_->grpcTimeout());
stats_.streams_started_.inc();
ProcessingRequest req;
auto* headers_req = req.mutable_request_headers();
MutationUtils::buildHttpHeaders(headers, *headers_req->mutable_headers());
headers_req->set_end_of_stream(end_of_stream);
request_state_ = FilterState::HEADERS;
stream_->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();

// Wait until we have a gRPC response before allowing any more callbacks
return FilterHeadersStatus::StopAllIterationAndWatermark;
}

void Filter::onReceiveMessage(
std::unique_ptr<envoy::service::ext_proc::v3alpha::ProcessingResponse>&& r) {
auto response = std::move(r);
bool message_valid = false;
ENVOY_LOG(debug, "Received gRPC message. State = {}", request_state_);

// This next section will grow as we support the rest of the protocol
if (request_state_ == FilterState::HEADERS) {
if (response->has_request_headers()) {
ENVOY_LOG(debug, "applying request_headers response");
message_valid = true;
const auto& headers_response = response->request_headers();
if (headers_response.has_response()) {
const auto& common_response = headers_response.response();
if (common_response.has_header_mutation()) {
MutationUtils::applyHeaderMutations(common_response.header_mutation(), *request_headers_);
}
}
} else if (response->has_immediate_response()) {
// To be implemented later. Leave stream open to allow people to implement
// correct servers that don't break us.
message_valid = true;
}
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
}

if (message_valid) {
stats_.stream_msgs_received_.inc();
} else {
stats_.spurious_msgs_received_.inc();
// Ignore messages received out of order. However, close the stream to
// protect ourselves since the server is not following the protocol.
ENVOY_LOG(warn, "Spurious response message received on gRPC stream");
closeStream();
}
}

void Filter::onGrpcError(Grpc::Status::GrpcStatus status) {
ENVOY_LOG(debug, "Received gRPC error on stream: {}", status);
stream_closed_ = true;
stats_.streams_failed_.inc();
if (config_->failureModeAllow()) {
// Ignore this and treat as a successful close
onGrpcClose();
stats_.failure_mode_allowed_.inc();
} else {
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case FilterState::HEADERS:
request_state_ = FilterState::IDLE;
decoder_callbacks_->sendLocalReply(
Http::Code::InternalServerError, "", nullptr, absl::nullopt,
absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status));
break;
default:
// Nothing else to do
break;
}
}
}

void Filter::onGrpcClose() {
ENVOY_LOG(debug, "Received gRPC stream close");
stream_closed_ = true;
stats_.streams_closed_.inc();
// Successful close. We can ignore the stream for the rest of our request
// and response processing.
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case FilterState::HEADERS:
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
break;
default:
// Nothing to do otherwise
break;
}
}

} // namespace ExternalProcessing
} // namespace HttpFilters
Expand Down
Loading

0 comments on commit 845f92a

Please sign in to comment.