Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement request header processing in ext_proc #14385

Merged
merged 13 commits into from
Jan 11, 2021
18 changes: 18 additions & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ envoy_cc_library(
srcs = ["ext_proc.cc"],
hdrs = ["ext_proc.h"],
deps = [
":client_interface",
":mutation_utils_lib",
"//include/envoy/http:filter_interface",
"//include/envoy/http:header_map_interface",
"//include/envoy/stats:stats_macros",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"@com_google_absl//absl/strings:str_format",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3alpha:pkg_cc_proto",
],
)
Expand All @@ -27,6 +32,7 @@ envoy_cc_extension(
security_posture = "unknown",
status = "alpha",
deps = [
":client_lib",
":ext_proc",
"//source/extensions/filters/http:well_known_names",
"//source/extensions/filters/http/common:factory_base_lib",
Expand All @@ -43,6 +49,18 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "mutation_utils_lib",
srcs = ["mutation_utils.cc"],
hdrs = ["mutation_utils.h"],
deps = [
"//include/envoy/http:header_map_interface",
"//source/common/http:headers_lib",
"//source/common/protobuf:utility_lib",
"@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_proto",
],
)

envoy_cc_library(
name = "client_lib",
srcs = ["client_impl.cc"],
Expand Down
17 changes: 13 additions & 4 deletions source/extensions/filters/http/ext_proc/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <string>

#include "extensions/filters/http/ext_proc/client_impl.h"
#include "extensions/filters/http/ext_proc/ext_proc.h"

namespace Envoy {
Expand All @@ -11,11 +12,19 @@ namespace ExternalProcessing {

Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& proto_config,
const std::string&, Server::Configuration::FactoryContext&) {
const auto filter_config = std::make_shared<FilterConfig>(proto_config);
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) {
const uint32_t timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config.grpc_service(), timeout, DefaultTimeout);
const auto filter_config = std::make_shared<FilterConfig>(
proto_config, std::chrono::milliseconds(timeout_ms), context.scope(), stats_prefix);

return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) {
callbacks.addStreamFilter(Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config)});
return [filter_config, grpc_service = proto_config.grpc_service(),
&context](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExternalProcessorClientImpl>(
context.clusterManager().grpcAsyncClientManager(), grpc_service, context.scope());

callbacks.addStreamFilter(
Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config, std::move(client))});
};
}

Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/http/ext_proc/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class ExternalProcessingFilterConfig
ExternalProcessingFilterConfig() : FactoryBase(HttpFilterNames::get().ExternalProcessing) {}

private:
static constexpr uint64_t DefaultTimeout = 200;

Http::FilterFactoryCb createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& proto_config,
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) override;
Expand Down
120 changes: 119 additions & 1 deletion source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
@@ -1,11 +1,129 @@
#include "extensions/filters/http/ext_proc/ext_proc.h"

#include "extensions/filters/http/ext_proc/mutation_utils.h"

#include "absl/strings/str_format.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

void Filter::onDestroy() {}
using envoy::service::ext_proc::v3alpha::ProcessingRequest;
using envoy::service::ext_proc::v3alpha::ProcessingResponse;

using Http::FilterHeadersStatus;
using Http::RequestHeaderMap;

static const std::string kErrorPrefix = "ext_proc error";

void Filter::closeStream() {
if (!stream_closed_) {
if (stream_) {
ENVOY_LOG(debug, "Closing gRPC stream to processing server");
stream_->close();
stats_.streams_closed_.inc();
}
stream_closed_ = true;
}
}

void Filter::onDestroy() { closeStream(); }

FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_of_stream) {
// We're at the start, so start the stream and send a headers message
request_headers_ = &headers;
stream_ = client_->start(*this, config_->grpcTimeout());
stats_.streams_started_.inc();
ProcessingRequest req;
auto* headers_req = req.mutable_request_headers();
MutationUtils::buildHttpHeaders(headers, *headers_req->mutable_headers());
headers_req->set_end_of_stream(end_of_stream);
request_state_ = FilterState::HEADERS;
stream_->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();

// Wait until we have a gRPC response before allowing any more callbacks
return FilterHeadersStatus::StopAllIterationAndWatermark;
}

void Filter::onReceiveMessage(
std::unique_ptr<envoy::service::ext_proc::v3alpha::ProcessingResponse>&& r) {
auto response = std::move(r);
bool message_valid = false;
ENVOY_LOG(debug, "Received gRPC message. State = {}", request_state_);

// This next section will grow as we support the rest of the protocol
if (request_state_ == FilterState::HEADERS) {
if (response->has_request_headers()) {
ENVOY_LOG(debug, "applying request_headers response");
message_valid = true;
const auto& headers_response = response->request_headers();
if (headers_response.has_response()) {
const auto& common_response = headers_response.response();
if (common_response.has_header_mutation()) {
MutationUtils::applyHeaderMutations(common_response.header_mutation(), *request_headers_);
}
}
}
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
}

if (message_valid) {
stats_.stream_msgs_received_.inc();
} else {
stats_.spurious_msgs_received_.inc();
// Ignore messages received out of order. However, close the stream to
// protect ourselves since the server is not following the protocol.
ENVOY_LOG(warn, "Spurious response message received on gRPC stream");
closeStream();
}
}

void Filter::onGrpcError(Grpc::Status::GrpcStatus status) {
ENVOY_LOG(debug, "Received gRPC error on stream: {}", status);
stream_closed_ = true;
stats_.streams_failed_.inc();
if (config_->failureModeAllow()) {
// Ignore this and treat as a successful close
onGrpcClose();
stats_.failure_mode_allowed_.inc();
} else {
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case FilterState::HEADERS:
request_state_ = FilterState::IDLE;
decoder_callbacks_->sendLocalReply(
Http::Code::InternalServerError, "", nullptr, absl::nullopt,
absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status));
break;
default:
// Nothing else to do
break;
}
}
}

void Filter::onGrpcClose() {
ENVOY_LOG(debug, "Received gRPC stream close");
stream_closed_ = true;
stats_.streams_closed_.inc();
// Successful close. We can ignore the stream for the rest of our request
// and response processing.
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case FilterState::HEADERS:
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
break;
default:
// Nothing to do otherwise
break;
}
}

} // namespace ExternalProcessing
} // namespace HttpFilters
Expand Down
90 changes: 85 additions & 5 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
@@ -1,41 +1,121 @@
#pragma once

#include <chrono>
#include <memory>

#include "envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.pb.h"
#include "envoy/grpc/async_client.h"
#include "envoy/http/filter.h"
#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"

#include "common/common/logger.h"

#include "extensions/filters/http/common/pass_through_filter.h"
#include "extensions/filters/http/ext_proc/client.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

#define ALL_EXT_PROC_FILTER_STATS(COUNTER) \
COUNTER(streams_started) \
COUNTER(stream_msgs_sent) \
COUNTER(stream_msgs_received) \
COUNTER(spurious_msgs_received) \
COUNTER(streams_closed) \
COUNTER(streams_failed) \
COUNTER(failure_mode_allowed)

struct ExtProcFilterStats {
ALL_EXT_PROC_FILTER_STATS(GENERATE_COUNTER_STRUCT)
};

class FilterConfig {
public:
FilterConfig(const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& config)
: failure_mode_allow_(config.failure_mode_allow()) {}
FilterConfig(const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& config,
const std::chrono::milliseconds grpc_timeout, Stats::Scope& scope,
const std::string& stats_prefix)
: failure_mode_allow_(config.failure_mode_allow()), grpc_timeout_(grpc_timeout),
stats_(generateStats(stats_prefix, config.stat_prefix(), scope)) {}

bool failureModeAllow() const { return failure_mode_allow_; }

const std::chrono::milliseconds& grpcTimeout() const { return grpc_timeout_; }

const ExtProcFilterStats& stats() const { return stats_; }

private:
ExtProcFilterStats generateStats(const std::string& prefix,
const std::string& filter_stats_prefix, Stats::Scope& scope) {
const std::string final_prefix = absl::StrCat(prefix, "ext_proc.", filter_stats_prefix);
return {ALL_EXT_PROC_FILTER_STATS(POOL_COUNTER_PREFIX(scope, final_prefix))};
}

const bool failure_mode_allow_;
const std::chrono::milliseconds grpc_timeout_;

ExtProcFilterStats stats_;
};

using FilterConfigSharedPtr = std::shared_ptr<FilterConfig>;

class Filter : public Logger::Loggable<Logger::Id::filter>, public Http::PassThroughFilter {
class Filter : public Logger::Loggable<Logger::Id::filter>,
public Http::PassThroughFilter,
public ExternalProcessorCallbacks {
// The state of filter execution -- this is used to determine
// how to handle gRPC callbacks.
enum class FilterState {
// The filter is not waiting for anything, so any response on the
// gRPC stream is spurious and will result in the filter closing
// the stream.
IDLE,
// The filter is waiting for a "request_headers" or a "response_headers" message.
// Any other response on the gRPC stream will be treated as spurious.
HEADERS,
};

public:
Filter(const FilterConfigSharedPtr& config) : config_(config) {}
Filter(const FilterConfigSharedPtr& config, ExternalProcessorClientPtr&& client)
: config_(config), client_(std::move(client)), stats_(config->stats()) {}

void onDestroy() override;

void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override {
decoder_callbacks_ = &callbacks;
}

Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers,
bool end_stream) override;

// ExternalProcessorCallbacks

void onReceiveMessage(
std::unique_ptr<envoy::service::ext_proc::v3alpha::ProcessingResponse>&& response) override;

void onGrpcError(Grpc::Status::GrpcStatus error) override;

void onGrpcClose() override;

private:
FilterConfigSharedPtr config_;
void closeStream();

const FilterConfigSharedPtr config_;
const ExternalProcessorClientPtr client_;
ExtProcFilterStats stats_;

Http::StreamDecoderFilterCallbacks* decoder_callbacks_ = nullptr;

// The state of the request-processing, or "decoding" side of the filter.
// We maintain separate states for encoding and decoding since they may
// be interleaved.
FilterState request_state_ = FilterState::IDLE;

ExternalProcessorStreamPtr stream_;
bool stream_closed_ = false;

Http::HeaderMap* request_headers_ = nullptr;
};

} // namespace ExternalProcessing
Expand Down
57 changes: 57 additions & 0 deletions source/extensions/filters/http/ext_proc/mutation_utils.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#include "extensions/filters/http/ext_proc/mutation_utils.h"

#include "common/http/headers.h"
#include "common/protobuf/utility.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

using Http::LowerCaseString;

void MutationUtils::buildHttpHeaders(const Http::HeaderMap& headers_in,
envoy::config::core::v3::HeaderMap& headers_out) {
headers_in.iterate([&headers_out](const Http::HeaderEntry& e) -> Http::HeaderMap::Iterate {
auto* new_header = headers_out.add_headers();
new_header->set_key(std::string(e.key().getStringView()));
new_header->set_value(std::string(e.value().getStringView()));
return Http::HeaderMap::Iterate::Continue;
});
}

void MutationUtils::applyHeaderMutations(
const envoy::service::ext_proc::v3alpha::HeaderMutation& mutation, Http::HeaderMap& headers) {
for (const auto& rh : mutation.remove_headers()) {
// The "router" component removes headers first when processing this protobuf
// Like that component and "ext_auth", don't allow removing any system headers
// (with ":") and don't allow removal of "host".
if (rh.empty() || rh[0] == ':') {
continue;
}
const LowerCaseString header(rh);
if (header != Http::Headers::get().HostLegacy) {
headers.remove(header);
}
}

for (const auto& sh : mutation.set_headers()) {
if (!sh.has_header()) {
continue;
}
// Make "false" the default. This is logical and matches the ext_authz
// filter. However, the router handles this same protobuf and uses "true"
// as the default instead.
const bool append = PROTOBUF_GET_WRAPPED_OR_DEFAULT(sh, append, false);
if (append) {
headers.addCopy(LowerCaseString(sh.header().key()), sh.header().value());
} else {
headers.setCopy(LowerCaseString(sh.header().key()), sh.header().value());
}
}
}

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Loading