Skip to content

Commit

Permalink
ext_proc: Implement response path for headers only
Browse files Browse the repository at this point in the history
Implement header processing on the response path by sending the
response_headers message to the processor and handling the result.
Streamline integration tests.

Signed-off-by: Gregory Brail <gregbrail@google.com>
  • Loading branch information
gbrail committed Jan 15, 2021
1 parent 77d7cec commit de1bd48
Show file tree
Hide file tree
Showing 8 changed files with 551 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// The External Processing filter allows an external service to act on HTTP traffic in a flexible way.

// **Current Implementation Status:**
// At this time, the filter will send a "request_headers" message to the server when the
// filter is invoked from the downstream, and apply any header mutations returned by the
// server. No other part of the protocol is implemented yet.
// At this time, the filter will send the "request_headers" and "response_headers" messages
// to the server when the filter is invoked, and apply any header mutations returned by the
// server, and respond to "immediate_response" messages. No other parts of the protocol are implemented yet.

// As designed, the filter supports up to six different processing steps, which are in the
// process of being implemented:
// * Request headers: IMPLEMENTED
// * Request body: NOT IMPLEMENTED
// * Request trailers: NOT IMPLEMENTED
// * Response headers: NOT IMPLEMENTED
// * Response headers: IMPLEMENTED
// * Response body: NOT IMPLEMENTED
// * Response trailers: NOT IMPLEMENTED

Expand Down Expand Up @@ -78,7 +78,6 @@ message ExternalProcessor {
// The filter supports both the "Envoy" and "Google" gRPC clients.
config.core.v3.GrpcService grpc_service = 1;

// [#not-implemented-hide:]
// By default, if the gRPC stream cannot be established, or if it is closed
// prematurely with an error, the filter will fail. Specifically, if the
// response headers have not yet been delivered, then it will return a 500
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

166 changes: 114 additions & 52 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

using envoy::service::ext_proc::v3alpha::HeadersResponse;
using envoy::service::ext_proc::v3alpha::ImmediateResponse;
using envoy::service::ext_proc::v3alpha::ProcessingRequest;
using envoy::service::ext_proc::v3alpha::ProcessingResponse;

using Http::FilterHeadersStatus;
using Http::RequestHeaderMap;
using Http::ResponseHeaderMap;

static const std::string kErrorPrefix = "ext_proc error";

Expand Down Expand Up @@ -48,66 +50,121 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_of
return FilterHeadersStatus::StopAllIterationAndWatermark;
}

FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_of_stream) {
if (stream_closed_) {
return FilterHeadersStatus::Continue;
}

response_headers_ = &headers;
ProcessingRequest req;
auto* headers_req = req.mutable_response_headers();
MutationUtils::buildHttpHeaders(headers, *headers_req->mutable_headers());
headers_req->set_end_of_stream(end_of_stream);
response_state_ = FilterState::HEADERS;
stream_->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();
return FilterHeadersStatus::StopAllIterationAndWatermark;
}

void Filter::onReceiveMessage(
std::unique_ptr<envoy::service::ext_proc::v3alpha::ProcessingResponse>&& r) {
auto response = std::move(r);
bool message_valid = false;
bool message_handled = false;
ENVOY_LOG(debug, "Received gRPC message. State = {}", request_state_);

// This next section will grow as we support the rest of the protocol
if (request_state_ == FilterState::HEADERS) {
if (response->has_request_headers()) {
ENVOY_LOG(debug, "applying request_headers response");
message_valid = true;
const auto& headers_response = response->request_headers();
if (headers_response.has_response()) {
const auto& common_response = headers_response.response();
if (common_response.has_header_mutation()) {
MutationUtils::applyHeaderMutations(common_response.header_mutation(), *request_headers_);
}
}
} else if (response->has_immediate_response()) {
ENVOY_LOG(debug, "Returning immediate response from processor");
sendImmediateResponse(response->immediate_response());
message_valid = true;
}
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
if (response->has_request_headers()) {
message_handled = handleRequestHeadersResponse(response->request_headers());
} else if (response->has_response_headers()) {
message_handled = handleResponseHeadersResponse(response->response_headers());
} else if (response->has_immediate_response()) {
message_handled = handleImmediateResponse(response->immediate_response());
}

if (message_valid) {
if (message_handled) {
stats_.stream_msgs_received_.inc();
} else {
stats_.spurious_msgs_received_.inc();
// Ignore messages received out of order. However, close the stream to
// protect ourselves since the server is not following the protocol.
ENVOY_LOG(warn, "Spurious response message received on gRPC stream");
cleanupState();
closeStream();
}
}

bool Filter::handleRequestHeadersResponse(const HeadersResponse& response) {
if (request_state_ == FilterState::HEADERS) {
ENVOY_LOG(debug, "applying request_headers response");
if (response.has_response()) {
const auto& common_response = response.response();
if (common_response.has_header_mutation()) {
MutationUtils::applyHeaderMutations(common_response.header_mutation(), *request_headers_);
}
}
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
return true;
}
return false;
}

bool Filter::handleResponseHeadersResponse(const HeadersResponse& response) {
if (response_state_ == FilterState::HEADERS) {
ENVOY_LOG(debug, "applying response_headers response");
if (response.has_response()) {
const auto& common_response = response.response();
if (common_response.has_header_mutation()) {
MutationUtils::applyHeaderMutations(common_response.header_mutation(), *response_headers_);
}
}
response_state_ = FilterState::IDLE;
encoder_callbacks_->continueEncoding();
return true;
}
return false;
}

bool Filter::handleImmediateResponse(const ImmediateResponse& response) {
if (response_state_ == FilterState::HEADERS) {
// Waiting for a response headers response, so return immediately now.
// Do this first in case both are in progress.
// We don't want to process any more stream messages after this.
// Close the stream before sending because "sendLocalResponse" triggers
// additional calls to this filter.
response_state_ = FilterState::IDLE;
closeStream();

ENVOY_LOG(debug, "Returning immediate response from processor on encoding path");
sendImmediateResponse(response, false);

return true;

} else if (request_state_ == FilterState::HEADERS) {
ENVOY_LOG(debug, "Returning immediate response from processor on decoding path");
request_state_ = FilterState::IDLE;
closeStream();
sendImmediateResponse(response, true);
return true;
}

return false;
}

void Filter::onGrpcError(Grpc::Status::GrpcStatus status) {
ENVOY_LOG(debug, "Received gRPC error on stream: {}", status);
stream_closed_ = true;
stats_.streams_failed_.inc();

if (config_->failureModeAllow()) {
// Ignore this and treat as a successful close
onGrpcClose();
stats_.failure_mode_allowed_.inc();

} else {
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case FilterState::HEADERS:
request_state_ = FilterState::IDLE;
decoder_callbacks_->sendLocalReply(
Http::Code::InternalServerError, "", nullptr, absl::nullopt,
absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status));
break;
default:
// Nothing else to do
break;
}
stream_closed_ = true;
ImmediateResponse error_response;
error_response.mutable_status()->set_code(envoy::type::v3::StatusCode::InternalServerError);
error_response.set_details(absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status));
handleImmediateResponse(error_response);
}
}

Expand All @@ -117,34 +174,39 @@ void Filter::onGrpcClose() {
stats_.streams_closed_.inc();
// Successful close. We can ignore the stream for the rest of our request
// and response processing.
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case FilterState::HEADERS:
cleanupState();
}

void Filter::cleanupState() {
if (request_state_ != FilterState::IDLE) {
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
break;
default:
// Nothing to do otherwise
break;
}
if (response_state_ != FilterState::IDLE) {
response_state_ = FilterState::IDLE;
encoder_callbacks_->continueEncoding();
}
}

void Filter::sendImmediateResponse(const ImmediateResponse& response) {
void Filter::sendImmediateResponse(const ImmediateResponse& response, bool on_decoding) {
const auto status_code = response.has_status() ? response.status().code() : 200;
const auto grpc_status =
response.has_grpc_status()
? absl::optional<Grpc::Status::GrpcStatus>(response.grpc_status().status())
: absl::nullopt;
const auto mutate_headers = [&response](Http::ResponseHeaderMap& headers) {
if (response.has_headers()) {
MutationUtils::applyHeaderMutations(response.headers(), headers);
}
};

decoder_callbacks_->sendLocalReply(
static_cast<Http::Code>(status_code), response.body(),
[&response](Http::ResponseHeaderMap& headers) {
if (response.has_headers()) {
MutationUtils::applyHeaderMutations(response.headers(), headers);
}
},
grpc_status, response.details());
if (on_decoding) {
decoder_callbacks_->sendLocalReply(static_cast<Http::Code>(status_code), response.body(),
mutate_headers, grpc_status, response.details());
} else {
encoder_callbacks_->sendLocalReply(static_cast<Http::Code>(status_code), response.body(),
mutate_headers, grpc_status, response.details());
}
}

} // namespace ExternalProcessing
Expand Down
23 changes: 16 additions & 7 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,10 @@ class Filter : public Logger::Loggable<Logger::Id::filter>,

void onDestroy() override;

void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override {
decoder_callbacks_ = &callbacks;
}

Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers,
bool end_stream) override;
Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers,
bool end_stream) override;

// ExternalProcessorCallbacks

Expand All @@ -101,23 +99,34 @@ class Filter : public Logger::Loggable<Logger::Id::filter>,

private:
void closeStream();
void sendImmediateResponse(const envoy::service::ext_proc::v3alpha::ImmediateResponse& response);
void cleanupState();
void sendImmediateResponse(const envoy::service::ext_proc::v3alpha::ImmediateResponse& response,
bool on_decoding);

bool
handleRequestHeadersResponse(const envoy::service::ext_proc::v3alpha::HeadersResponse& response);
bool
handleResponseHeadersResponse(const envoy::service::ext_proc::v3alpha::HeadersResponse& response);
bool
handleImmediateResponse(const envoy::service::ext_proc::v3alpha::ImmediateResponse& response);

const FilterConfigSharedPtr config_;
const ExternalProcessorClientPtr client_;
ExtProcFilterStats stats_;

Http::StreamDecoderFilterCallbacks* decoder_callbacks_ = nullptr;

// The state of the request-processing, or "decoding" side of the filter.
// We maintain separate states for encoding and decoding since they may
// be interleaved.
FilterState request_state_ = FilterState::IDLE;

// The state of the response-processing side
FilterState response_state_ = FilterState::IDLE;

ExternalProcessorStreamPtr stream_;
bool stream_closed_ = false;

Http::HeaderMap* request_headers_ = nullptr;
Http::HeaderMap* response_headers_ = nullptr;
};

} // namespace ExternalProcessing
Expand Down
1 change: 1 addition & 0 deletions test/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ envoy_extension_cc_test_library(
deps = [
"//include/envoy/http:header_map_interface",
"//test/test_common:utility_lib",
"@com_google_absl//absl/strings:str_format",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)
Loading

0 comments on commit de1bd48

Please sign in to comment.