Skip to content

Commit

Permalink
ext_proc: Implement response path for headers only (envoyproxy#14713)
Browse files Browse the repository at this point in the history
Implement header processing on the response path by sending the
response_headers message to the processor and handling the result.

Also update the docs in the .proto file.

Signed-off-by: Gregory Brail <gregbrail@google.com>
Signed-off-by: Auni Ahsan <auni@google.com>
  • Loading branch information
gbrail authored and auni53 committed Jan 25, 2021
1 parent 87a1807 commit 7eebb43
Show file tree
Hide file tree
Showing 11 changed files with 944 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// The External Processing filter allows an external service to act on HTTP traffic in a flexible way.

// **Current Implementation Status:**
// At this time, the filter will send a "request_headers" message to the server when the
// filter is invoked from the downstream, and apply any header mutations returned by the
// server. No other part of the protocol is implemented yet.
// At this time, the filter will send the "request_headers" and "response_headers" messages
// to the server when the filter is invoked, and apply any header mutations returned by the
// server, and respond to "immediate_response" messages. No other parts of the protocol are implemented yet.

// As designed, the filter supports up to six different processing steps, which are in the
// process of being implemented:
// * Request headers: IMPLEMENTED
// * Request body: NOT IMPLEMENTED
// * Request trailers: NOT IMPLEMENTED
// * Response headers: NOT IMPLEMENTED
// * Response headers: IMPLEMENTED
// * Response body: NOT IMPLEMENTED
// * Response trailers: NOT IMPLEMENTED

Expand Down Expand Up @@ -78,7 +78,6 @@ message ExternalProcessor {
// The filter supports both the "Envoy" and "Google" gRPC clients.
config.core.v3.GrpcService grpc_service = 1;

// [#not-implemented-hide:]
// By default, if the gRPC stream cannot be established, or if it is closed
// prematurely with an error, the filter will fail. Specifically, if the
// response headers have not yet been delivered, then it will return a 500
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

134 changes: 83 additions & 51 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

using envoy::service::ext_proc::v3alpha::HeadersResponse;
using envoy::service::ext_proc::v3alpha::ImmediateResponse;
using envoy::service::ext_proc::v3alpha::ProcessingRequest;
using envoy::service::ext_proc::v3alpha::ProcessingResponse;

using Http::FilterHeadersStatus;
using Http::RequestHeaderMap;
using Http::ResponseHeaderMap;

static const std::string kErrorPrefix = "ext_proc error";

Expand Down Expand Up @@ -48,66 +50,96 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_of
return FilterHeadersStatus::StopAllIterationAndWatermark;
}

FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_of_stream) {
if (stream_closed_) {
return FilterHeadersStatus::Continue;
}

response_headers_ = &headers;
ProcessingRequest req;
auto* headers_req = req.mutable_response_headers();
MutationUtils::buildHttpHeaders(headers, *headers_req->mutable_headers());
headers_req->set_end_of_stream(end_of_stream);
response_state_ = FilterState::HEADERS;
stream_->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();
return FilterHeadersStatus::StopAllIterationAndWatermark;
}

void Filter::onReceiveMessage(
std::unique_ptr<envoy::service::ext_proc::v3alpha::ProcessingResponse>&& r) {
auto response = std::move(r);
bool message_valid = false;
bool message_handled = false;
ENVOY_LOG(debug, "Received gRPC message. State = {}", request_state_);

// This next section will grow as we support the rest of the protocol
if (request_state_ == FilterState::HEADERS) {
if (response->has_request_headers()) {
ENVOY_LOG(debug, "applying request_headers response");
message_valid = true;
const auto& headers_response = response->request_headers();
if (headers_response.has_response()) {
const auto& common_response = headers_response.response();
if (common_response.has_header_mutation()) {
MutationUtils::applyHeaderMutations(common_response.header_mutation(), *request_headers_);
}
}
} else if (response->has_immediate_response()) {
ENVOY_LOG(debug, "Returning immediate response from processor");
sendImmediateResponse(response->immediate_response());
message_valid = true;
}
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
if (response->has_request_headers()) {
message_handled = handleRequestHeadersResponse(response->request_headers());
} else if (response->has_response_headers()) {
message_handled = handleResponseHeadersResponse(response->response_headers());
} else if (response->has_immediate_response()) {
handleImmediateResponse(response->immediate_response());
message_handled = true;
}

if (message_valid) {
if (message_handled) {
stats_.stream_msgs_received_.inc();
} else {
stats_.spurious_msgs_received_.inc();
// Ignore messages received out of order. However, close the stream to
// protect ourselves since the server is not following the protocol.
ENVOY_LOG(warn, "Spurious response message received on gRPC stream");
cleanupState();
closeStream();
}
}

bool Filter::handleRequestHeadersResponse(const HeadersResponse& response) {
if (request_state_ == FilterState::HEADERS) {
ENVOY_LOG(debug, "applying request_headers response");
MutationUtils::applyCommonHeaderResponse(response, *request_headers_);
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
return true;
}
return false;
}

bool Filter::handleResponseHeadersResponse(const HeadersResponse& response) {
if (response_state_ == FilterState::HEADERS) {
ENVOY_LOG(debug, "applying response_headers response");
MutationUtils::applyCommonHeaderResponse(response, *response_headers_);
response_state_ = FilterState::IDLE;
encoder_callbacks_->continueEncoding();
return true;
}
return false;
}

void Filter::handleImmediateResponse(const ImmediateResponse& response) {
// We don't want to process any more stream messages after this.
// Close the stream before sending because "sendLocalResponse" triggers
// additional calls to this filter.
request_state_ = FilterState::IDLE;
response_state_ = FilterState::IDLE;
closeStream();
sendImmediateResponse(response);
}

void Filter::onGrpcError(Grpc::Status::GrpcStatus status) {
ENVOY_LOG(debug, "Received gRPC error on stream: {}", status);
stream_closed_ = true;
stats_.streams_failed_.inc();

if (config_->failureModeAllow()) {
// Ignore this and treat as a successful close
onGrpcClose();
stats_.failure_mode_allowed_.inc();

} else {
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case FilterState::HEADERS:
request_state_ = FilterState::IDLE;
decoder_callbacks_->sendLocalReply(
Http::Code::InternalServerError, "", nullptr, absl::nullopt,
absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status));
break;
default:
// Nothing else to do
break;
}
stream_closed_ = true;
ImmediateResponse errorResponse;
errorResponse.mutable_status()->set_code(envoy::type::v3::StatusCode::InternalServerError);
errorResponse.set_details(absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status));
handleImmediateResponse(errorResponse);
}
}

Expand All @@ -117,16 +149,17 @@ void Filter::onGrpcClose() {
stats_.streams_closed_.inc();
// Successful close. We can ignore the stream for the rest of our request
// and response processing.
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case FilterState::HEADERS:
cleanupState();
}

void Filter::cleanupState() {
if (request_state_ != FilterState::IDLE) {
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
break;
default:
// Nothing to do otherwise
break;
}
if (response_state_ != FilterState::IDLE) {
response_state_ = FilterState::IDLE;
encoder_callbacks_->continueEncoding();
}
}

Expand All @@ -136,15 +169,14 @@ void Filter::sendImmediateResponse(const ImmediateResponse& response) {
response.has_grpc_status()
? absl::optional<Grpc::Status::GrpcStatus>(response.grpc_status().status())
: absl::nullopt;
const auto mutate_headers = [&response](Http::ResponseHeaderMap& headers) {
if (response.has_headers()) {
MutationUtils::applyHeaderMutations(response.headers(), headers);
}
};

decoder_callbacks_->sendLocalReply(
static_cast<Http::Code>(status_code), response.body(),
[&response](Http::ResponseHeaderMap& headers) {
if (response.has_headers()) {
MutationUtils::applyHeaderMutations(response.headers(), headers);
}
},
grpc_status, response.details());
encoder_callbacks_->sendLocalReply(static_cast<Http::Code>(status_code), response.body(),
mutate_headers, grpc_status, response.details());
}

} // namespace ExternalProcessing
Expand Down
20 changes: 14 additions & 6 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,10 @@ class Filter : public Logger::Loggable<Logger::Id::filter>,

void onDestroy() override;

void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override {
decoder_callbacks_ = &callbacks;
}

Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers,
bool end_stream) override;
Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers,
bool end_stream) override;

// ExternalProcessorCallbacks

Expand All @@ -101,23 +99,33 @@ class Filter : public Logger::Loggable<Logger::Id::filter>,

private:
void closeStream();
void cleanupState();
void sendImmediateResponse(const envoy::service::ext_proc::v3alpha::ImmediateResponse& response);

bool
handleRequestHeadersResponse(const envoy::service::ext_proc::v3alpha::HeadersResponse& response);
bool
handleResponseHeadersResponse(const envoy::service::ext_proc::v3alpha::HeadersResponse& response);
void
handleImmediateResponse(const envoy::service::ext_proc::v3alpha::ImmediateResponse& response);

const FilterConfigSharedPtr config_;
const ExternalProcessorClientPtr client_;
ExtProcFilterStats stats_;

Http::StreamDecoderFilterCallbacks* decoder_callbacks_ = nullptr;

// The state of the request-processing, or "decoding" side of the filter.
// We maintain separate states for encoding and decoding since they may
// be interleaved.
FilterState request_state_ = FilterState::IDLE;

// The state of the response-processing side
FilterState response_state_ = FilterState::IDLE;

ExternalProcessorStreamPtr stream_;
bool stream_closed_ = false;

Http::HeaderMap* request_headers_ = nullptr;
Http::HeaderMap* response_headers_ = nullptr;
};

} // namespace ExternalProcessing
Expand Down
16 changes: 14 additions & 2 deletions source/extensions/filters/http/ext_proc/mutation_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ namespace ExternalProcessing {
using Http::Headers;
using Http::LowerCaseString;

using envoy::service::ext_proc::v3alpha::HeaderMutation;
using envoy::service::ext_proc::v3alpha::HeadersResponse;

void MutationUtils::buildHttpHeaders(const Http::HeaderMap& headers_in,
envoy::config::core::v3::HeaderMap& headers_out) {
headers_in.iterate([&headers_out](const Http::HeaderEntry& e) -> Http::HeaderMap::Iterate {
Expand All @@ -24,8 +27,17 @@ void MutationUtils::buildHttpHeaders(const Http::HeaderMap& headers_in,
});
}

void MutationUtils::applyHeaderMutations(
const envoy::service::ext_proc::v3alpha::HeaderMutation& mutation, Http::HeaderMap& headers) {
void MutationUtils::applyCommonHeaderResponse(const HeadersResponse& response,
Http::HeaderMap& headers) {
if (response.has_response()) {
const auto& common_response = response.response();
if (common_response.has_header_mutation()) {
applyHeaderMutations(common_response.header_mutation(), headers);
}
}
}

void MutationUtils::applyHeaderMutations(const HeaderMutation& mutation, Http::HeaderMap& headers) {
for (const auto& remove_header : mutation.remove_headers()) {
if (Http::HeaderUtility::isRemovableHeader(remove_header)) {
headers.remove(LowerCaseString(remove_header));
Expand Down
5 changes: 5 additions & 0 deletions source/extensions/filters/http/ext_proc/mutation_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ class MutationUtils {
static void buildHttpHeaders(const Http::HeaderMap& headers_in,
envoy::config::core::v3::HeaderMap& headers_out);

// Apply mutations that are common to header responses.
static void
applyCommonHeaderResponse(const envoy::service::ext_proc::v3alpha::HeadersResponse& response,
Http::HeaderMap& headers);

// Modify header map based on a set of mutations from a protobuf
static void
applyHeaderMutations(const envoy::service::ext_proc::v3alpha::HeaderMutation& mutation,
Expand Down
14 changes: 14 additions & 0 deletions test/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ envoy_extension_cc_test(
],
)

envoy_extension_cc_test(
name = "ordering_test",
srcs = ["ordering_test.cc"],
extension_name = "envoy.filters.http.ext_proc",
deps = [
":mock_server_lib",
"//source/extensions/filters/http/ext_proc",
"//test/common/http:common_lib",
"//test/mocks/server:factory_context_mocks",
"//test/test_common:test_runtime_lib",
],
)

envoy_extension_cc_test(
name = "client_test",
srcs = ["client_test.cc"],
Expand Down Expand Up @@ -95,6 +108,7 @@ envoy_extension_cc_test_library(
deps = [
"//include/envoy/http:header_map_interface",
"//test/test_common:utility_lib",
"@com_google_absl//absl/strings:str_format",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)
Loading

0 comments on commit 7eebb43

Please sign in to comment.