-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ext_proc: attributes in first message to server #1
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -276,8 +276,7 @@ void Filter::onDestroy() { | |
} | ||
|
||
FilterHeadersStatus Filter::onHeaders(ProcessorState& state, | ||
Http::RequestOrResponseHeaderMap& headers, bool end_stream, | ||
ProtobufWkt::Struct* proto) { | ||
Http::RequestOrResponseHeaderMap& headers, bool end_stream) { | ||
switch (openStream()) { | ||
case StreamOpenState::Error: | ||
return FilterHeadersStatus::StopIteration; | ||
|
@@ -291,14 +290,12 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state, | |
state.setHeaders(&headers); | ||
state.setHasNoBody(end_stream); | ||
ProcessingRequest req; | ||
addAttributes(state, req); | ||
addDynamicMetadata(state, req); | ||
auto* headers_req = state.mutableHeaders(req); | ||
MutationUtils::headersToProto(headers, config_->allowedHeaders(), config_->disallowedHeaders(), | ||
*headers_req->mutable_headers()); | ||
headers_req->set_end_of_stream(end_stream); | ||
if (proto != nullptr) { | ||
(*headers_req->mutable_attributes())[FilterName] = *proto; | ||
} | ||
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), | ||
ProcessorState::CallbackState::HeadersCallback); | ||
ENVOY_LOG(debug, "Sending headers message"); | ||
|
@@ -317,17 +314,7 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st | |
|
||
FilterHeadersStatus status = FilterHeadersStatus::Continue; | ||
if (decoding_state_.sendHeaders()) { | ||
ProtobufWkt::Struct proto; | ||
|
||
if (config_->expressionManager().hasRequestExpr()) { | ||
auto activation_ptr = Filters::Common::Expr::createActivation( | ||
&config_->expressionManager().localInfo(), decoding_state_.callbacks()->streamInfo(), | ||
&headers, nullptr, nullptr); | ||
proto = config_->expressionManager().evaluateRequestAttributes(*activation_ptr); | ||
} | ||
|
||
status = onHeaders(decoding_state_, headers, end_stream, | ||
config_->expressionManager().hasRequestExpr() ? &proto : nullptr); | ||
status = onHeaders(decoding_state_, headers, end_stream); | ||
ENVOY_LOG(trace, "onHeaders returning {}", static_cast<int>(status)); | ||
} else { | ||
ENVOY_LOG(trace, "decodeHeaders: Skipped header processing"); | ||
|
@@ -590,7 +577,7 @@ FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap& | |
FilterTrailersStatus Filter::decodeTrailers(RequestTrailerMap& trailers) { | ||
ENVOY_LOG(trace, "decodeTrailers"); | ||
const auto status = onTrailers(decoding_state_, trailers); | ||
ENVOY_LOG(trace, "encodeTrailers returning {}", static_cast<int>(status)); | ||
ENVOY_LOG(trace, "decodeTrailers returning {}", static_cast<int>(status)); | ||
return status; | ||
} | ||
|
||
|
@@ -605,17 +592,7 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s | |
|
||
FilterHeadersStatus status = FilterHeadersStatus::Continue; | ||
if (!processing_complete_ && encoding_state_.sendHeaders()) { | ||
ProtobufWkt::Struct proto; | ||
|
||
if (config_->expressionManager().hasResponseExpr()) { | ||
auto activation_ptr = Filters::Common::Expr::createActivation( | ||
&config_->expressionManager().localInfo(), encoding_state_.callbacks()->streamInfo(), | ||
nullptr, &headers, nullptr); | ||
proto = config_->expressionManager().evaluateResponseAttributes(*activation_ptr); | ||
} | ||
|
||
status = onHeaders(encoding_state_, headers, end_stream, | ||
config_->expressionManager().hasResponseExpr() ? &proto : nullptr); | ||
status = onHeaders(encoding_state_, headers, end_stream); | ||
ENVOY_LOG(trace, "onHeaders returns {}", static_cast<int>(status)); | ||
} else { | ||
ENVOY_LOG(trace, "encodeHeaders: Skipped header processing"); | ||
|
@@ -650,6 +627,7 @@ ProcessingRequest Filter::setupBodyChunk(ProcessorState& state, const Buffer::In | |
bool end_stream) { | ||
ENVOY_LOG(debug, "Sending a body chunk of {} bytes, end_stream {}", data.length(), end_stream); | ||
ProcessingRequest req; | ||
addAttributes(state, req); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dont we need the same for the response bodies as well? or is this function called in the response body path as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is called from the |
||
addDynamicMetadata(state, req); | ||
auto* body_req = state.mutableBody(req); | ||
body_req->set_end_of_stream(end_stream); | ||
|
@@ -667,6 +645,7 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState | |
|
||
void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers) { | ||
ProcessingRequest req; | ||
addAttributes(state, req); | ||
addDynamicMetadata(state, req); | ||
auto* trailers_req = state.mutableTrailers(req); | ||
MutationUtils::headersToProto(trailers, config_->allowedHeaders(), config_->disallowedHeaders(), | ||
|
@@ -771,6 +750,20 @@ void Filter::addDynamicMetadata(const ProcessorState& state, ProcessingRequest& | |
*req.mutable_metadata_context() = forwarding_metadata; | ||
} | ||
|
||
void Filter::addAttributes(const ProcessorState& state, ProcessingRequest& req) { | ||
if (!state.sendAttributes(config_->expressionManager())) { | ||
return; | ||
} | ||
|
||
auto activation_ptr = Filters::Common::Expr::createActivation( | ||
&config_->expressionManager().localInfo(), state.callbacks()->streamInfo(), | ||
state.requestHeaders(), state.responseHeaders(), state.trailers()); | ||
attributes = config_->expressionManager().evaluateRequestAttributes(*activation_ptr); | ||
|
||
state.sentAttributes(true); | ||
(*req.mutable_attributes())[FilterName] = *attributes; | ||
} | ||
|
||
void Filter::setDynamicMetadata(Http::StreamFilterCallbacks* cb, const ProcessorState& state, | ||
const ProcessingResponse& response) { | ||
if (state.untypedReceivingMetadataNamespaces().empty() || !response.has_dynamic_metadata()) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
#include "source/common/common/logger.h" | ||
|
||
#include "absl/status/status.h" | ||
#include "matching_utils.h" | ||
|
||
namespace Envoy { | ||
namespace Extensions { | ||
|
@@ -136,6 +137,9 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> { | |
|
||
void setHeaders(Http::RequestOrResponseHeaderMap* headers) { headers_ = headers; } | ||
void setTrailers(Http::HeaderMap* trailers) { trailers_ = trailers; } | ||
virtual const Http::RequestOrResponseHeaderMap* requestHeaders() const PURE; | ||
virtual const Http::RequestOrResponseHeaderMap* responseHeaders() const PURE; | ||
const Http::HeaderMap* responseTrailers() const { return trailers_; } | ||
|
||
void onStartProcessorCall(Event::TimerCb cb, std::chrono::milliseconds timeout, | ||
CallbackState callback_state); | ||
|
@@ -202,6 +206,10 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> { | |
|
||
virtual Http::StreamFilterCallbacks* callbacks() const PURE; | ||
|
||
virtual bool sendAttributes(const ExpressionManager& mgr) const PURE; | ||
|
||
void sentAttributes(bool sent) { attributes_sent_ = sent; } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
|
||
protected: | ||
void setBodyMode( | ||
envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode body_mode); | ||
|
@@ -250,6 +258,9 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> { | |
const std::vector<std::string>* typed_forwarding_namespaces_{}; | ||
const std::vector<std::string>* untyped_receiving_namespaces_{}; | ||
|
||
// If true, the attributes for this processing state have already been sent. | ||
bool attributes_sent_{}; | ||
|
||
private: | ||
virtual void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse&) {} | ||
}; | ||
|
@@ -324,6 +335,13 @@ class DecodingProcessorState : public ProcessorState { | |
|
||
Http::StreamFilterCallbacks* callbacks() const override { return decoder_callbacks_; } | ||
|
||
bool sendAttributes(const ExpressionManager& mgr) const override { | ||
return !attributes_sent_ && mgr.hasRequestExpr(); | ||
} | ||
|
||
const Http::RequestOrResponseHeaderMap* requestHeaders() const override { return headers_; }; | ||
const Http::RequestOrResponseHeaderMap* responseHeaders() const override { return nullptr; } | ||
|
||
private: | ||
void setProcessingModeInternal( | ||
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode); | ||
|
@@ -404,6 +422,13 @@ class EncodingProcessorState : public ProcessorState { | |
|
||
Http::StreamFilterCallbacks* callbacks() const override { return encoder_callbacks_; } | ||
|
||
bool sendAttributes(const ExpressionManager& mgr) const override { | ||
return !attributes_sent_ && mgr.hasResponseExpr(); | ||
} | ||
|
||
const Http::RequestOrResponseHeaderMap* requestHeaders() const override { return nullptr; }; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe the request headers should be available in the response phase. At least you can add a TODO comment here and optimize it in the future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good catch. I think it would be prudent to have separate fields in the base class for request and response header maps to accomplish this, even though on decoding it will always be |
||
const Http::RequestOrResponseHeaderMap* responseHeaders() const override { return headers_; } | ||
|
||
private: | ||
void setProcessingModeInternal( | ||
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the deprecated annotation to this field
[deprecated = true, (envoy.annotations.deprecated_at_minor_version) = "3.0"]