Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: support passing match result action to filter #14462

Merged
merged 8 commits into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,12 @@ class StreamFilterBase {
* onDestroy().
*/
virtual void onDestroy() PURE;

/**
* Called when a match result occurs that isn't handled by the filter manager.
* @param action the resulting match action
*/
virtual void onMatchCallback(const Matcher::Action&) {}
};

/**
Expand Down
58 changes: 42 additions & 16 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ void ActiveStreamFilterBase::clearRouteCache() {
parent_.filter_manager_callbacks_.clearRouteCache();
}

void ActiveStreamFilterBase::evaluateMatchTreeWithNewData(
void FilterMatchState::evaluateMatchTreeWithNewData(
std::function<void(HttpMatchingDataImpl&)> update_func) {
if (match_tree_evaluated_ || !matching_data_) {
return;
Expand All @@ -256,7 +256,12 @@ void ActiveStreamFilterBase::evaluateMatchTreeWithNewData(

if (match_tree_evaluated_ && match_result.result_) {
if (SkipAction().typeUrl() == match_result.result_->typeUrl()) {
skip_ = true;
callback_handling_filter_->skip_ = true;
if (secondary_filter_) {
secondary_filter_->skip_ = true;
}
} else {
callback_handling_filter_->onMatchCallback(*match_result.result_);
}
}
}
Expand Down Expand Up @@ -404,11 +409,22 @@ void ActiveStreamDecoderFilter::requestDataTooLarge() {
}
}

void FilterManager::addStreamDecoderFilterWorker(
StreamDecoderFilterSharedPtr filter, Matcher::MatchTreeSharedPtr<HttpMatchingData> matcher,
HttpMatchingDataImplSharedPtr matching_data, bool dual_filter) {
ActiveStreamDecoderFilterPtr wrapper(new ActiveStreamDecoderFilter(
*this, filter, std::move(matcher), std::move(matching_data), dual_filter));
void FilterManager::addStreamDecoderFilterWorker(StreamDecoderFilterSharedPtr filter,
FilterMatchStateSharedPtr match_state,
bool dual_filter) {
ActiveStreamDecoderFilterPtr wrapper(
new ActiveStreamDecoderFilter(*this, filter, match_state, dual_filter));

// If we're a dual handling filter, have the encoding wrapper be the only thing registering itself
// as the handling filter.
Comment on lines +429 to +430
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused by this. Doesn't this need to be configurable or potentially context specific? Or the idea here is that a single filter registered twice would then only get the callbacks once and it doesn't actually matter whether they are in the encoding or decoding path? And this is really only needed for state that has to go through to the underlying impl like skip? Is skip the only case where this matters?

Given that the pointers are going to be the same if they are both set (right?), could the skip case be handled by just calling a method on the filter base which somehow knows how to set skip for both encoding and decoding direction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that we have two distinct filter wrappers around the same underlying filter: https://github.com/envoyproxy/envoy/blob/master/source/common/http/filter_manager.h#L705-L708

As a result, we need both wrappers to know about the fact that a skip action has been evaluated, since the skip action isn't associated with a specific filter wrapper, but to the filter itself. This here is one strategy where we have the FilterMatchState responsible for doing the matching and setting the skip_ flag on each of the wrappers. I can imagine another approach where we instead have the wrappers check the FilterMatchState to see whether it should skip?

For the filter action in the dual filter case we have two wrappers but we only want to notify the filter once about the match action, so we can get away with just arbitrarily picking one of the wrappers to use. Alternatively we could just hold onto a reference to the underlying filter instead: that way we're bypassing the wrappers in this case and things might be a bit clearer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can imagine another approach where we instead have the wrappers check the FilterMatchState to see whether it should skip?

I think this is what I would do since the state is shared?

Alternatively we could just hold onto a reference to the underlying filter instead: that way we're bypassing the wrappers in this case and things might be a bit clearer?

Yeah +1.

So yeah personally I would make the above changes but if you don't can you add some more comments somewhere leaving a trail about this?

/wait

if (match_state) {
if (dual_filter) {
match_state->secondary_filter_ = wrapper.get();
} else {
match_state->callback_handling_filter_ = wrapper.get();
}
}

filter->setDecoderFilterCallbacks(*wrapper);
// Note: configured decoder filters are appended to decoder_filters_.
// This means that if filters are configured in the following order (assume all three filters are
Expand All @@ -421,11 +437,16 @@ void FilterManager::addStreamDecoderFilterWorker(
LinkedList::moveIntoListBack(std::move(wrapper), decoder_filters_);
}

void FilterManager::addStreamEncoderFilterWorker(
StreamEncoderFilterSharedPtr filter, Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree,
HttpMatchingDataImplSharedPtr matching_data, bool dual_filter) {
ActiveStreamEncoderFilterPtr wrapper(new ActiveStreamEncoderFilter(
*this, filter, std::move(match_tree), std::move(matching_data), dual_filter));
void FilterManager::addStreamEncoderFilterWorker(StreamEncoderFilterSharedPtr filter,
FilterMatchStateSharedPtr match_state,
bool dual_filter) {
ActiveStreamEncoderFilterPtr wrapper(
new ActiveStreamEncoderFilter(*this, filter, match_state, dual_filter));

if (match_state) {
match_state->callback_handling_filter_ = wrapper.get();
}

filter->setEncoderFilterCallbacks(*wrapper);
// Note: configured encoder filters are prepended to encoder_filters_.
// This means that if filters are configured in the following order (assume all three filters are
Expand Down Expand Up @@ -463,8 +484,11 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
std::list<ActiveStreamDecoderFilterPtr>::iterator continue_data_entry = decoder_filters_.end();

for (; entry != decoder_filters_.end(); entry++) {
(*entry)->evaluateMatchTreeWithNewData(
[&](auto& matching_data) { matching_data.onRequestHeaders(headers); });
if ((*entry)->filter_match_state_) {
(*entry)->filter_match_state_->evaluateMatchTreeWithNewData(
[&](auto& matching_data) { matching_data.onRequestHeaders(headers); });
}

if ((*entry)->skip_) {
continue;
}
Expand Down Expand Up @@ -970,8 +994,10 @@ void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHea
std::list<ActiveStreamEncoderFilterPtr>::iterator continue_data_entry = encoder_filters_.end();

for (; entry != encoder_filters_.end(); entry++) {
(*entry)->evaluateMatchTreeWithNewData(
[&headers](auto& matching_data) { matching_data.onResponseHeaders(headers); });
if ((*entry)->filter_match_state_) {
(*entry)->filter_match_state_->evaluateMatchTreeWithNewData(
[&headers](auto& matching_data) { matching_data.onResponseHeaders(headers); });
}
if ((*entry)->skip_) {
continue;
}
Expand Down
118 changes: 77 additions & 41 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,41 @@ class HttpResponseHeadersDataInput : public HttpHeadersDataInputBase<ResponseHea
class SkipAction : public Matcher::ActionBase<
envoy::extensions::filters::common::matcher::action::v3::SkipFilter> {};

struct ActiveStreamFilterBase;

/**
* Manages the shared match state between one or two filters.
* The need for this class comes from the fact that a single instantiated filter can be wrapped by
* two different ActiveStreamFilters, one for encoding and one for decoding. Certain match actions
* should be made visible to both wrappers (e.g. the skip action), while other actions should be
* sent to the underlying filter exactly once.
*/
class FilterMatchState {
public:
FilterMatchState(Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree,
HttpMatchingDataImplSharedPtr matching_data)
: match_tree_(std::move(match_tree)), matching_data_(std::move(matching_data)),
match_tree_evaluated_(false) {}

void evaluateMatchTreeWithNewData(std::function<void(HttpMatchingDataImpl&)> update_func);

// Some callbacks should be sent to all filters (e.g. filter specific match actions), while
// others should trigger updates to all filters (e.g. filter skip). The callback_handling_filter_
// should always be present and is the filter wrapper that handles the per filter actions, while
// secondary_filter_ is present for dual filters to allow setting a value for both wrappers.
// TODO(snowp): Should we instead snap a handle to the underlying filter and a list of filter
// wrappers?
ActiveStreamFilterBase* callback_handling_filter_{};
ActiveStreamFilterBase* secondary_filter_{};

private:
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree_;
HttpMatchingDataImplSharedPtr matching_data_;
bool match_tree_evaluated_ : 1;
};

using FilterMatchStateSharedPtr = std::shared_ptr<FilterMatchState>;

/**
* Base class wrapper for both stream encoder and decoder filters.
*
Expand All @@ -126,14 +161,12 @@ class SkipAction : public Matcher::ActionBase<
struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks,
Logger::Loggable<Logger::Id::http> {
ActiveStreamFilterBase(FilterManager& parent, bool dual_filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree,
HttpMatchingDataImplSharedPtr matching_data)
FilterMatchStateSharedPtr match_state)
: parent_(parent), iteration_state_(IterationState::Continue),
match_tree_(std::move(match_tree)), matching_data_(std::move(matching_data)),
iterate_from_current_filter_(false), headers_continued_(false),
continue_headers_continued_(false), end_stream_(false), dual_filter_(dual_filter),
decode_headers_called_(false), encode_headers_called_(false), match_tree_evaluated_(false),
skip_(false) {}
filter_match_state_(std::move(match_state)), iterate_from_current_filter_(false),
headers_continued_(false), continue_headers_continued_(false), end_stream_(false),
dual_filter_(dual_filter), decode_headers_called_(false), encode_headers_called_(false),
match_tree_evaluated_(false), skip_(false) {}

// Functions in the following block are called after the filter finishes processing
// corresponding data. Those functions handle state updates and data storage (if needed)
Expand Down Expand Up @@ -166,6 +199,8 @@ struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks,
// TODO(soya3129): make this pure when adding impl to encoder filter.
virtual void handleMetadataAfterHeadersCallback() PURE;

virtual void onMatchCallback(const Matcher::Action& action) PURE;

// Http::StreamFilterCallbacks
const Network::Connection* connection() override;
Event::Dispatcher& dispatcher() override;
Expand Down Expand Up @@ -203,8 +238,6 @@ struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks,
return saved_response_metadata_.get();
}

void evaluateMatchTreeWithNewData(std::function<void(HttpMatchingDataImpl&)> update_func);

// A vector to save metadata when the current filter's [de|en]codeMetadata() can not be called,
// either because [de|en]codeHeaders() of the current filter returns StopAllIteration or because
// [de|en]codeHeaders() adds new metadata to [de|en]code, but we don't know
Expand All @@ -223,9 +256,7 @@ struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks,
FilterManager& parent_;
IterationState iteration_state_;

Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree_;
HttpMatchingDataImplSharedPtr matching_data_;

FilterMatchStateSharedPtr filter_match_state_;
// If the filter resumes iteration from a StopAllBuffer/Watermark state, the current filter
// hasn't parsed data and trailers. As a result, the filter iteration should start with the
// current filter instead of the next one. If true, filter iteration starts with the current
Expand All @@ -240,6 +271,8 @@ struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks,
bool encode_headers_called_ : 1;
bool match_tree_evaluated_ : 1;
bool skip_ : 1;

friend FilterMatchState;
};

/**
Expand All @@ -249,11 +282,8 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase,
public StreamDecoderFilterCallbacks,
LinkedObject<ActiveStreamDecoderFilter> {
ActiveStreamDecoderFilter(FilterManager& parent, StreamDecoderFilterSharedPtr filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree,
HttpMatchingDataImplSharedPtr matching_data, bool dual_filter)
: ActiveStreamFilterBase(parent, dual_filter, std::move(match_tree),
std::move(matching_data)),
handle_(filter) {}
FilterMatchStateSharedPtr match_state, bool dual_filter)
: ActiveStreamFilterBase(parent, dual_filter, std::move(match_state)), handle_(filter) {}

// ActiveStreamFilterBase
bool canContinue() override;
Expand All @@ -275,6 +305,9 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase,
void drainSavedRequestMetadata();
// This function is called after the filter calls decodeHeaders() to drain accumulated metadata.
void handleMetadataAfterHeadersCallback() override;
void onMatchCallback(const Matcher::Action& action) override {
handle_->onMatchCallback(std::move(action));
}

// Http::StreamDecoderFilterCallbacks
void addDecodedData(Buffer::Instance& data, bool streaming) override;
Expand Down Expand Up @@ -338,11 +371,8 @@ struct ActiveStreamEncoderFilter : public ActiveStreamFilterBase,
public StreamEncoderFilterCallbacks,
LinkedObject<ActiveStreamEncoderFilter> {
ActiveStreamEncoderFilter(FilterManager& parent, StreamEncoderFilterSharedPtr filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree,
HttpMatchingDataImplSharedPtr matching_data, bool dual_filter)
: ActiveStreamFilterBase(parent, dual_filter, std::move(match_tree),
std::move(matching_data)),
handle_(filter) {}
FilterMatchStateSharedPtr match_state, bool dual_filter)
: ActiveStreamFilterBase(parent, dual_filter, std::move(match_state)), handle_(filter) {}

// ActiveStreamFilterBase
bool canContinue() override { return true; }
Expand All @@ -355,6 +385,7 @@ struct ActiveStreamEncoderFilter : public ActiveStreamFilterBase,
void doData(bool end_stream) override;
void drainSavedResponseMetadata();
void handleMetadataAfterHeadersCallback() override;
void onMatchCallback(const Matcher::Action& action) override { handle_->onMatchCallback(action); }

void doMetadata() override {
if (saved_response_metadata_ != nullptr) {
Expand Down Expand Up @@ -639,34 +670,40 @@ class FilterManager : public ScopeTrackedObject,

// Http::FilterChainFactoryCallbacks
void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter) override {
addStreamDecoderFilterWorker(filter, nullptr, nullptr, false);
addStreamDecoderFilterWorker(filter, nullptr, false);
}
void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree) override {
if (match_tree) {
auto matching_data = std::make_shared<HttpMatchingDataImpl>();
addStreamDecoderFilterWorker(filter, std::move(match_tree), std::move(matching_data), false);
addStreamDecoderFilterWorker(
filter,
std::make_shared<FilterMatchState>(std::move(match_tree),
std::make_shared<HttpMatchingDataImpl>()),
false);
return;
}

addStreamDecoderFilterWorker(filter, nullptr, nullptr, false);
addStreamDecoderFilterWorker(filter, nullptr, false);
}
void addStreamEncoderFilter(StreamEncoderFilterSharedPtr filter) override {
addStreamEncoderFilterWorker(filter, nullptr, nullptr, false);
addStreamEncoderFilterWorker(filter, nullptr, false);
}
void addStreamEncoderFilter(StreamEncoderFilterSharedPtr filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree) override {
if (match_tree) {
addStreamEncoderFilterWorker(filter, std::move(match_tree),
std::make_shared<HttpMatchingDataImpl>(), false);
addStreamEncoderFilterWorker(
filter,
std::make_shared<FilterMatchState>(std::move(match_tree),
std::make_shared<HttpMatchingDataImpl>()),
false);
return;
}

addStreamEncoderFilterWorker(filter, nullptr, nullptr, false);
addStreamEncoderFilterWorker(filter, nullptr, false);
}
void addStreamFilter(StreamFilterSharedPtr filter) override {
addStreamDecoderFilterWorker(filter, nullptr, nullptr, true);
addStreamEncoderFilterWorker(filter, nullptr, nullptr, true);
addStreamDecoderFilterWorker(filter, nullptr, true);
addStreamEncoderFilterWorker(filter, nullptr, true);
}
void addStreamFilter(StreamFilterSharedPtr filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree) override {
Expand All @@ -675,14 +712,15 @@ class FilterManager : public ScopeTrackedObject,
// TODO(snowp): The match tree might be fully evaluated twice, ideally we should expose
// the result to both filters after the first match evaluation.
if (match_tree) {
auto matching_data = std::make_shared<HttpMatchingDataImpl>();
addStreamDecoderFilterWorker(filter, match_tree, matching_data, true);
addStreamEncoderFilterWorker(filter, std::move(match_tree), std::move(matching_data), true);
auto matching_state = std::make_shared<FilterMatchState>(
std::move(match_tree), std::make_shared<HttpMatchingDataImpl>());
addStreamDecoderFilterWorker(filter, matching_state, true);
addStreamEncoderFilterWorker(filter, std::move(matching_state), true);
return;
}

addStreamDecoderFilterWorker(filter, nullptr, nullptr, true);
addStreamEncoderFilterWorker(filter, nullptr, nullptr, true);
addStreamDecoderFilterWorker(filter, nullptr, true);
addStreamEncoderFilterWorker(filter, nullptr, true);
}
void addAccessLogHandler(AccessLog::InstanceSharedPtr handler) override;

Expand Down Expand Up @@ -765,11 +803,9 @@ class FilterManager : public ScopeTrackedObject,

// TODO(snowp): Make private as filter chain construction is moved into FM.
void addStreamDecoderFilterWorker(StreamDecoderFilterSharedPtr filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree,
HttpMatchingDataImplSharedPtr matching_data, bool dual_filter);
FilterMatchStateSharedPtr match_state, bool dual_filter);
void addStreamEncoderFilterWorker(StreamEncoderFilterSharedPtr filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree,
HttpMatchingDataImplSharedPtr matching_data, bool dual_filter);
FilterMatchStateSharedPtr match_state, bool dual_filter);

void disarmRequestTimeout();

Expand Down
Loading