Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement request header processing in ext_proc #14385

Merged
merged 13 commits into from
Jan 11, 2021
8 changes: 5 additions & 3 deletions api/envoy/service/ext_proc/v3alpha/external_processor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import "envoy/config/core/v3/base.proto";
import "envoy/extensions/filters/http/ext_proc/v3alpha/processing_mode.proto";
import "envoy/type/v3/http_status.proto";

import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";

import "udpa/annotations/status.proto";
Expand Down Expand Up @@ -289,10 +288,13 @@ message GrpcStatus {
// Change HTTP headers or trailers by appending, replacing, or removing
// headers.
message HeaderMutation {
// Add or replace HTTP headers.
// Add or replace HTTP headers. Attempts to set the value of
// any "x-envoy" header, and attempts to set the ":method",
// ":authority", or "host" headers will be ignored.
repeated config.core.v3.HeaderValueOption set_headers = 1;

// Remove these HTTP headers.
// Remove these HTTP headers. Attempts to remove system headers --
// any header starting with ":", plus "host" -- will be ignored.
repeated string remove_headers = 2;
}

Expand Down
21 changes: 21 additions & 0 deletions docs/root/configuration/http/http_filters/ext_proc_filter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,24 @@ messages, and the server must reply with
:ref:`ProcessingResponse <envoy_v3_api_msg_service.ext_proc.v3alpha.ProcessingResponse>`.

This filter is a work in progress. In its current state, it actually does nothing.

Statistics
----------
This filter outputs statistics in the
*http.<stat_prefix>.ext_proc.* namespace. The :ref:`stat prefix
<envoy_v3_api_field_extensions.filters.network.http_connection_manager.v3.HttpConnectionManager.stat_prefix>`
comes from the owning HTTP connection manager.

The following statistics are supported:

.. csv-table::
:header: Name, Type, Description
:widths: auto

streams_started, Counter, The number of gRPC streams that have been started to send to the external processing service
streams_msgs_sent, Counter, The number of messages sent on those streams
streams_msgs_received, Counter, The number of messages received on those streams
spurious_msgs_received, Counter, The number of unexpected messages received that violated the protocol
streams_closed, Counter, The number of streams successfully closed on either end
streams_failed, Counter, The number of times a stream produced a gRPC error
failure_mode_allowed, Counter, The number of times an error was ignored due to configuration

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions source/common/http/header_utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,5 +309,10 @@ Http::Status HeaderUtility::checkRequiredHeaders(const Http::RequestHeaderMap& h
return Http::okStatus();
}

bool HeaderUtility::isRemovableHeader(absl::string_view header) {
return (header.empty() || header[0] != ':') &&
!absl::EqualsIgnoreCase(header, Headers::get().HostLegacy.get());
}

} // namespace Http
} // namespace Envoy
7 changes: 7 additions & 0 deletions source/common/http/header_utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ class HeaderUtility {
* missing.
*/
static Http::Status checkRequiredHeaders(const Http::RequestHeaderMap& headers);

/**
* Returns true if a header may be safely removed without causing additional
* problems. Effectively, header names beginning with ":" and the "host" header
* may not be removed.
*/
static bool isRemovableHeader(absl::string_view header);
};
} // namespace Http
} // namespace Envoy
1 change: 1 addition & 0 deletions source/common/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ envoy_cc_library(
deps = [
":header_formatter_lib",
"//include/envoy/http:header_map_interface",
"//source/common/http:header_utility_lib",
"//source/common/http:headers_lib",
"//source/common/protobuf:utility_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/header_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "envoy/config/core/v3/base.pb.h"

#include "common/common/assert.h"
#include "common/http/header_utility.h"
#include "common/http/headers.h"
#include "common/protobuf/utility.h"

Expand Down Expand Up @@ -258,7 +259,7 @@ HeaderParserPtr HeaderParser::configure(
// We reject :-prefix (e.g. :path) removal here. This is dangerous, since other aspects of
// request finalization assume their existence and they are needed for well-formedness in most
// cases.
if (header[0] == ':' || Http::LowerCaseString(header).get() == "host") {
if (!Http::HeaderUtility::isRemovableHeader(header)) {
throw EnvoyException(":-prefixed or host headers may not be removed");
}
header_parser->headers_to_remove_.emplace_back(header);
Expand Down
3 changes: 1 addition & 2 deletions source/extensions/filters/http/ext_authz/ext_authz.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ void Filter::onComplete(Filters::Common::ExtAuthz::ResponsePtr&& response) {
for (const auto& header : response->headers_to_remove) {
// We don't allow removing any :-prefixed headers, nor Host, as removing
// them would make the request malformed.
if (absl::StartsWithIgnoreCase(absl::string_view(header.get()), ":") ||
header == Http::Headers::get().HostLegacy) {
if (!Http::HeaderUtility::isRemovableHeader(header.get())) {
continue;
}
ENVOY_STREAM_LOG(trace, "'{}'", *callbacks_, header.get());
Expand Down
18 changes: 18 additions & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ envoy_cc_library(
srcs = ["ext_proc.cc"],
hdrs = ["ext_proc.h"],
deps = [
":client_interface",
":mutation_utils_lib",
"//include/envoy/http:filter_interface",
"//include/envoy/http:header_map_interface",
"//include/envoy/stats:stats_macros",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"@com_google_absl//absl/strings:str_format",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3alpha:pkg_cc_proto",
],
)
Expand All @@ -27,6 +32,7 @@ envoy_cc_extension(
security_posture = "unknown",
status = "alpha",
deps = [
":client_lib",
":ext_proc",
"//source/extensions/filters/http:well_known_names",
"//source/extensions/filters/http/common:factory_base_lib",
Expand All @@ -43,6 +49,18 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "mutation_utils_lib",
srcs = ["mutation_utils.cc"],
hdrs = ["mutation_utils.h"],
deps = [
"//include/envoy/http:header_map_interface",
"//source/common/http:header_utility_lib",
"//source/common/protobuf:utility_lib",
"@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_proto",
],
)

envoy_cc_library(
name = "client_lib",
srcs = ["client_impl.cc"],
Expand Down
17 changes: 13 additions & 4 deletions source/extensions/filters/http/ext_proc/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <string>

#include "extensions/filters/http/ext_proc/client_impl.h"
#include "extensions/filters/http/ext_proc/ext_proc.h"

namespace Envoy {
Expand All @@ -11,11 +12,19 @@ namespace ExternalProcessing {

Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& proto_config,
const std::string&, Server::Configuration::FactoryContext&) {
const auto filter_config = std::make_shared<FilterConfig>(proto_config);
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) {
const uint32_t timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config.grpc_service(), timeout, DefaultTimeout);
gbrail marked this conversation as resolved.
Show resolved Hide resolved
const auto filter_config = std::make_shared<FilterConfig>(
proto_config, std::chrono::milliseconds(timeout_ms), context.scope(), stats_prefix);

return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) {
callbacks.addStreamFilter(Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config)});
return [filter_config, grpc_service = proto_config.grpc_service(),
&context](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExternalProcessorClientImpl>(
context.clusterManager().grpcAsyncClientManager(), grpc_service, context.scope());

callbacks.addStreamFilter(
Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config, std::move(client))});
};
}

Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/http/ext_proc/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class ExternalProcessingFilterConfig
ExternalProcessingFilterConfig() : FactoryBase(HttpFilterNames::get().ExternalProcessing) {}

private:
static constexpr uint64_t DefaultTimeout = 200;

Http::FilterFactoryCb createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& proto_config,
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) override;
Expand Down
124 changes: 123 additions & 1 deletion source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
@@ -1,11 +1,133 @@
#include "extensions/filters/http/ext_proc/ext_proc.h"

#include "extensions/filters/http/ext_proc/mutation_utils.h"

#include "absl/strings/str_format.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

void Filter::onDestroy() {}
using envoy::service::ext_proc::v3alpha::ProcessingRequest;
using envoy::service::ext_proc::v3alpha::ProcessingResponse;

using Http::FilterHeadersStatus;
using Http::RequestHeaderMap;

static const std::string kErrorPrefix = "ext_proc error";

void Filter::closeStream() {
if (!stream_closed_) {
if (stream_) {
ENVOY_LOG(debug, "Closing gRPC stream to processing server");
stream_->close();
stats_.streams_closed_.inc();
}
stream_closed_ = true;
}
}

void Filter::onDestroy() { closeStream(); }

FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_of_stream) {
// We're at the start, so start the stream and send a headers message
request_headers_ = &headers;
stream_ = client_->start(*this, config_->grpcTimeout());
stats_.streams_started_.inc();
ProcessingRequest req;
auto* headers_req = req.mutable_request_headers();
MutationUtils::buildHttpHeaders(headers, *headers_req->mutable_headers());
headers_req->set_end_of_stream(end_of_stream);
request_state_ = FilterState::HEADERS;
stream_->send(std::move(req), false);
gbrail marked this conversation as resolved.
Show resolved Hide resolved
stats_.stream_msgs_sent_.inc();

// Wait until we have a gRPC response before allowing any more callbacks
return FilterHeadersStatus::StopAllIterationAndWatermark;
}

void Filter::onReceiveMessage(
std::unique_ptr<envoy::service::ext_proc::v3alpha::ProcessingResponse>&& r) {
auto response = std::move(r);
bool message_valid = false;
ENVOY_LOG(debug, "Received gRPC message. State = {}", request_state_);

// This next section will grow as we support the rest of the protocol
if (request_state_ == FilterState::HEADERS) {
if (response->has_request_headers()) {
gbrail marked this conversation as resolved.
Show resolved Hide resolved
ENVOY_LOG(debug, "applying request_headers response");
message_valid = true;
const auto& headers_response = response->request_headers();
if (headers_response.has_response()) {
const auto& common_response = headers_response.response();
if (common_response.has_header_mutation()) {
MutationUtils::applyHeaderMutations(common_response.header_mutation(), *request_headers_);
}
}
} else if (response->has_immediate_response()) {
// To be implemented later. Leave stream open to allow people to implement
// correct servers that don't break us.
message_valid = true;
}
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
}

if (message_valid) {
stats_.stream_msgs_received_.inc();
gbrail marked this conversation as resolved.
Show resolved Hide resolved
} else {
stats_.spurious_msgs_received_.inc();
// Ignore messages received out of order. However, close the stream to
// protect ourselves since the server is not following the protocol.
ENVOY_LOG(warn, "Spurious response message received on gRPC stream");
gbrail marked this conversation as resolved.
Show resolved Hide resolved
closeStream();
}
}

void Filter::onGrpcError(Grpc::Status::GrpcStatus status) {
ENVOY_LOG(debug, "Received gRPC error on stream: {}", status);
stream_closed_ = true;
stats_.streams_failed_.inc();
if (config_->failureModeAllow()) {
// Ignore this and treat as a successful close
onGrpcClose();
stats_.failure_mode_allowed_.inc();
} else {
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case FilterState::HEADERS:
request_state_ = FilterState::IDLE;
decoder_callbacks_->sendLocalReply(
Http::Code::InternalServerError, "", nullptr, absl::nullopt,
absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status));
break;
default:
// Nothing else to do
break;
}
}
}

void Filter::onGrpcClose() {
ENVOY_LOG(debug, "Received gRPC stream close");
stream_closed_ = true;
stats_.streams_closed_.inc();
// Successful close. We can ignore the stream for the rest of our request
// and response processing.
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case FilterState::HEADERS:
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
break;
default:
// Nothing to do otherwise
break;
}
}

} // namespace ExternalProcessing
} // namespace HttpFilters
Expand Down
Loading