Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ext_proc: attributes in first message to server #1

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions api/envoy/service/ext_proc/v3/external_processor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ message ProcessingRequest {
// Dynamic metadata associated with the request.
config.core.v3.Metadata metadata_context = 8;

// [#not-implemented-hide:]
// The values of properties selected by the ``request_attributes``
// or ``response_attributes`` list in the configuration. Each entry
// in the list is populated from the standard
Expand Down Expand Up @@ -211,12 +210,9 @@ message HttpHeaders {
config.core.v3.HeaderMap headers = 1;

// [#not-implemented-hide:]
// TODO(jbohanon) reserve field as part of rework detailed in https://github.com/envoyproxy/envoy/issues/32125
// The values of properties selected by the ``request_attributes``
// or ``response_attributes`` list in the configuration. Each entry
// in the list is populated
// from the standard :ref:`attributes <arch_overview_attributes>`
// supported across Envoy.
// This field is deprecated and not implemented. Attributes will be sent in
// the top-level :ref:`attributes <envoy_v3_api_field_service.ext_proc.v3.ProcessingRequest.attributes`
// field.
map<string, google.protobuf.Struct> attributes = 2;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the deprecated annotation to this field [deprecated = true, (envoy.annotations.deprecated_at_minor_version) = "3.0"]


// If true, then there is no message body associated with this
Expand Down
49 changes: 21 additions & 28 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,7 @@ void Filter::onDestroy() {
}

FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream,
ProtobufWkt::Struct* proto) {
Http::RequestOrResponseHeaderMap& headers, bool end_stream) {
switch (openStream()) {
case StreamOpenState::Error:
return FilterHeadersStatus::StopIteration;
Expand All @@ -291,14 +290,12 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
state.setHeaders(&headers);
state.setHasNoBody(end_stream);
ProcessingRequest req;
addAttributes(state, req);
addDynamicMetadata(state, req);
auto* headers_req = state.mutableHeaders(req);
MutationUtils::headersToProto(headers, config_->allowedHeaders(), config_->disallowedHeaders(),
*headers_req->mutable_headers());
headers_req->set_end_of_stream(end_stream);
if (proto != nullptr) {
(*headers_req->mutable_attributes())[FilterName] = *proto;
}
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(),
ProcessorState::CallbackState::HeadersCallback);
ENVOY_LOG(debug, "Sending headers message");
Expand All @@ -317,17 +314,7 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st

FilterHeadersStatus status = FilterHeadersStatus::Continue;
if (decoding_state_.sendHeaders()) {
ProtobufWkt::Struct proto;

if (config_->expressionManager().hasRequestExpr()) {
auto activation_ptr = Filters::Common::Expr::createActivation(
&config_->expressionManager().localInfo(), decoding_state_.callbacks()->streamInfo(),
&headers, nullptr, nullptr);
proto = config_->expressionManager().evaluateRequestAttributes(*activation_ptr);
}

status = onHeaders(decoding_state_, headers, end_stream,
config_->expressionManager().hasRequestExpr() ? &proto : nullptr);
status = onHeaders(decoding_state_, headers, end_stream);
ENVOY_LOG(trace, "onHeaders returning {}", static_cast<int>(status));
} else {
ENVOY_LOG(trace, "decodeHeaders: Skipped header processing");
Expand Down Expand Up @@ -590,7 +577,7 @@ FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap&
FilterTrailersStatus Filter::decodeTrailers(RequestTrailerMap& trailers) {
ENVOY_LOG(trace, "decodeTrailers");
const auto status = onTrailers(decoding_state_, trailers);
ENVOY_LOG(trace, "encodeTrailers returning {}", static_cast<int>(status));
ENVOY_LOG(trace, "decodeTrailers returning {}", static_cast<int>(status));
return status;
}

Expand All @@ -605,17 +592,7 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s

FilterHeadersStatus status = FilterHeadersStatus::Continue;
if (!processing_complete_ && encoding_state_.sendHeaders()) {
ProtobufWkt::Struct proto;

if (config_->expressionManager().hasResponseExpr()) {
auto activation_ptr = Filters::Common::Expr::createActivation(
&config_->expressionManager().localInfo(), encoding_state_.callbacks()->streamInfo(),
nullptr, &headers, nullptr);
proto = config_->expressionManager().evaluateResponseAttributes(*activation_ptr);
}

status = onHeaders(encoding_state_, headers, end_stream,
config_->expressionManager().hasResponseExpr() ? &proto : nullptr);
status = onHeaders(encoding_state_, headers, end_stream);
ENVOY_LOG(trace, "onHeaders returns {}", static_cast<int>(status));
} else {
ENVOY_LOG(trace, "encodeHeaders: Skipped header processing");
Expand Down Expand Up @@ -650,6 +627,7 @@ ProcessingRequest Filter::setupBodyChunk(ProcessorState& state, const Buffer::In
bool end_stream) {
ENVOY_LOG(debug, "Sending a body chunk of {} bytes, end_stream {}", data.length(), end_stream);
ProcessingRequest req;
addAttributes(state, req);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dont we need the same for the response bodies as well? or is this function called in the response body path as well?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called from the onData helper method which is called on both decode and encode

addDynamicMetadata(state, req);
auto* body_req = state.mutableBody(req);
body_req->set_end_of_stream(end_stream);
Expand All @@ -667,6 +645,7 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState

void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers) {
ProcessingRequest req;
addAttributes(state, req);
addDynamicMetadata(state, req);
auto* trailers_req = state.mutableTrailers(req);
MutationUtils::headersToProto(trailers, config_->allowedHeaders(), config_->disallowedHeaders(),
Expand Down Expand Up @@ -771,6 +750,20 @@ void Filter::addDynamicMetadata(const ProcessorState& state, ProcessingRequest&
*req.mutable_metadata_context() = forwarding_metadata;
}

void Filter::addAttributes(const ProcessorState& state, ProcessingRequest& req) {
if (!state.sendAttributes(config_->expressionManager())) {
return;
}

auto activation_ptr = Filters::Common::Expr::createActivation(
&config_->expressionManager().localInfo(), state.callbacks()->streamInfo(),
state.requestHeaders(), state.responseHeaders(), state.trailers());
attributes = config_->expressionManager().evaluateRequestAttributes(*activation_ptr);

state.sentAttributes(true);
(*req.mutable_attributes())[FilterName] = *attributes;
}

void Filter::setDynamicMetadata(Http::StreamFilterCallbacks* cb, const ProcessorState& state,
const ProcessingResponse& response) {
if (state.untypedReceivingMetadataNamespaces().empty() || !response.has_dynamic_metadata()) {
Expand Down
5 changes: 3 additions & 2 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,7 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
void sendImmediateResponse(const envoy::service::ext_proc::v3::ImmediateResponse& response);

Http::FilterHeadersStatus onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream,
ProtobufWkt::Struct* proto);
Http::RequestOrResponseHeaderMap& headers, bool end_stream);

// Return a pair of whether to terminate returning the current result.
std::pair<bool, Http::FilterDataStatus> sendStreamChunk(ProcessorState& state);
Expand All @@ -386,6 +385,8 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
void setDecoderDynamicMetadata(const envoy::service::ext_proc::v3::ProcessingResponse& response);
void addDynamicMetadata(const ProcessorState& state,
envoy::service::ext_proc::v3::ProcessingRequest& req);
void addAttributes(const ProcessorState& state,
envoy::service::ext_proc::v3::ProcessingRequest& req);

const FilterConfigSharedPtr config_;
const ExternalProcessorClientPtr client_;
Expand Down
25 changes: 25 additions & 0 deletions source/extensions/filters/http/ext_proc/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "source/common/common/logger.h"

#include "absl/status/status.h"
#include "matching_utils.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -136,6 +137,9 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {

void setHeaders(Http::RequestOrResponseHeaderMap* headers) { headers_ = headers; }
void setTrailers(Http::HeaderMap* trailers) { trailers_ = trailers; }
virtual const Http::RequestOrResponseHeaderMap* requestHeaders() const PURE;
virtual const Http::RequestOrResponseHeaderMap* responseHeaders() const PURE;
const Http::HeaderMap* responseTrailers() const { return trailers_; }

void onStartProcessorCall(Event::TimerCb cb, std::chrono::milliseconds timeout,
CallbackState callback_state);
Expand Down Expand Up @@ -202,6 +206,10 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {

virtual Http::StreamFilterCallbacks* callbacks() const PURE;

virtual bool sendAttributes(const ExpressionManager& mgr) const PURE;

void sentAttributes(bool sent) { attributes_sent_ = sent; }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: setSentAttributes. But current style is also OK to me.


protected:
void setBodyMode(
envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode body_mode);
Expand Down Expand Up @@ -250,6 +258,9 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
const std::vector<std::string>* typed_forwarding_namespaces_{};
const std::vector<std::string>* untyped_receiving_namespaces_{};

// If true, the attributes for this processing state have already been sent.
bool attributes_sent_{};

private:
virtual void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse&) {}
};
Expand Down Expand Up @@ -324,6 +335,13 @@ class DecodingProcessorState : public ProcessorState {

Http::StreamFilterCallbacks* callbacks() const override { return decoder_callbacks_; }

bool sendAttributes(const ExpressionManager& mgr) const override {
return !attributes_sent_ && mgr.hasRequestExpr();
}

const Http::RequestOrResponseHeaderMap* requestHeaders() const override { return headers_; };
const Http::RequestOrResponseHeaderMap* responseHeaders() const override { return nullptr; }

private:
void setProcessingModeInternal(
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode);
Expand Down Expand Up @@ -404,6 +422,13 @@ class EncodingProcessorState : public ProcessorState {

Http::StreamFilterCallbacks* callbacks() const override { return encoder_callbacks_; }

bool sendAttributes(const ExpressionManager& mgr) const override {
return !attributes_sent_ && mgr.hasResponseExpr();
}

const Http::RequestOrResponseHeaderMap* requestHeaders() const override { return nullptr; };
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the request headers should be available in the response phase. At least you can add a TODO comment here and optimize it in the future.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good catch. I think it would be prudent to have separate fields in the base class for request and response header maps to accomplish this, even though on decoding it will always be nullptr for the response headers.

const Http::RequestOrResponseHeaderMap* responseHeaders() const override { return headers_; }

private:
void setProcessingModeInternal(
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3429,10 +3429,7 @@ TEST_P(ExtProcIntegrationTest, SendAndReceiveDynamicMetadata) {
}

#if defined(USE_CEL_PARSER)
// Test the filter using the default configuration by connecting to
// an ext_proc server that responds to the request_headers message
// by requesting to modify the request headers.
TEST_P(ExtProcIntegrationTest, GetAndSetRequestResponseAttributes) {
TEST_P(ExtProcIntegrationTest, RequestResponseAttributes) {
proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND);
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SEND);
proto_config_.mutable_request_attributes()->Add("request.path");
Expand Down