Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

caching: Support out-of-thread (async) HttpCache implementations #12622

Merged
merged 15 commits into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 78 additions & 43 deletions source/extensions/filters/http/cache/cache_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "extensions/filters/http/cache/cacheability_utils.h"
#include "extensions/filters/http/cache/inline_headers_handles.h"

#include "absl/memory/memory.h"
#include "absl/strings/string_view.h"

namespace Envoy {
Expand All @@ -32,8 +33,12 @@ CacheFilter::CacheFilter(const envoy::extensions::filters::http::cache::v3alpha:
: time_source_(time_source), cache_(http_cache) {}

void CacheFilter::onDestroy() {
lookup_ = nullptr;
insert_ = nullptr;
if (lookup_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to calling onDestroy on lookup_ and insert_, we also need to remember that our onDestroy_ has been called. They might have posted callbacks that haven't happened yet; when we get them, we need to ignore them. I suggest adding a new FilterState for this. (Another approach would be to hand these pointers to dispatcher().deferredDelete, but making it a state seems cleaner.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattklein123 This PR relies on details of event handling that aren't documented, but that I think are intended. Specifically, it assumes that all callbacks posted before the return of CacheFilter::onDestroy will be executed before this CacheFilter is deleted. (If this isn't intended, then I don't understand the purpose of deferred deletion.) Can you confirm? (Either way, we should update the docs.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tracked this PR, but no, this is not guaranteed. Can you describe a bit more what is going on or do you want me to review the entire PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of CacheFilter's plugin cache implementations will complete requests asynchronously, and there's no guarantee of what thread those completions will run on, so their callbacks get posted to our dispatcher. Suppose a plugin posts a callback, then (perhaps due to a client disconnect) CacheFilter::onDestroy is called. That callback must happen before the CacheFilter gets deleted (or else it will access a deleted CacheFilter). AFAICT, the current code does give us the correct order (because it runs pending callbacks before it runs deferred deletion), and I'll be surprised if it ever needs to change, but we need to make sure.

I think that all we need to do is add documentation and tests to lock in the current behavior of running pending callbacks before deferred deletion.

If we can't rely on this, we'll have to go back to a prior iteration of the original CacheFilter PR, and use either weak_ptrs or shared_ptrs to the CacheFilter, but that complicates the code and adds bus-locked operations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The solution here is to not use post, but use a "timer" which can be cancelled. Can we switch to that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmarantz the problem arises if the cache lookup has already finished and posted the callback to the dispatcher, but then the client closes the connection and the filter chain is destroyed. Currently, there's no guarantee that the posted callback will run before the filter is deleted. If the filter is deleted first, the posted callback will run on a destroyed filter.
One solution here is to capture a weak_ptr to the CacheFilter in the posted callback, and check that the CacheFilter is still alive before accessing any of its members/methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we arrange a destroy-notify callback from the filter chain? The cache system could then mark the filter structure as stale and avoid referencing it further.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course weak_ptr does the job too, if the thing being pointed to is already a shared_ptr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more thing: the advantage of a callback over a weak_ptr is that you might be able to actively cancel an outstanding lookup request (if that is possible).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattklein123 I am following the discussion at #12364. Can you elaborate how is it related to our problem here?

It's loosely related in the sense that we have TLS post/callbacks that are being running beyond the acceptable lifetime.

one more thing: the advantage of a callback over a weak_ptr is that you might be able to actively cancel an outstanding lookup request (if that is possible).

This is how a cancellable post would be accomplished within dispatcher. This is what I recommend if you want a core solution, otherwise you can implement this yourself.

lookup_->onDestroy();
}
if (insert_) {
insert_->onDestroy();
}
}

Http::FilterHeadersStatus CacheFilter::decodeHeaders(Http::RequestHeaderMap& headers,
Expand All @@ -58,30 +63,18 @@ Http::FilterHeadersStatus CacheFilter::decodeHeaders(Http::RequestHeaderMap& hea
lookup_ = cache_.makeLookupContext(std::move(lookup_request));

ASSERT(lookup_);
getHeaders(headers);
ENVOY_STREAM_LOG(debug, "CacheFilter::decodeHeaders starting lookup", *decoder_callbacks_);

lookup_->getHeaders(
[this, &headers](LookupResult&& result) { onHeaders(std::move(result), headers); });

// If the cache called onHeaders synchronously it will have advanced the filter_state_.
switch (filter_state_) {
case FilterState::Initial:
// Headers are not fetched from cache yet -- wait until cache lookup is completed.
filter_state_ = FilterState::WaitingForCacheLookup;
return Http::FilterHeadersStatus::StopAllIterationAndWatermark;
case FilterState::DecodeServingFromCache:
case FilterState::ResponseServedFromCache:
// A fresh cached response was found -- no need to continue the decoding stream.
return Http::FilterHeadersStatus::StopAllIterationAndWatermark;
default:
return Http::FilterHeadersStatus::Continue;
}
// Stop the decoding stream until the cache lookup result is ready.
return Http::FilterHeadersStatus::StopAllIterationAndWatermark;
}

Http::FilterHeadersStatus CacheFilter::encodeHeaders(Http::ResponseHeaderMap& headers,
bool end_stream) {
if (filter_state_ == FilterState::DecodeServingFromCache) {
// This call was invoked by decoder_callbacks_->encodeHeaders -- ignore it.
// This call was invoked during decoding by decoder_callbacks_->encodeHeaders because a fresh
// cached response was found and is being added to the encoding stream -- ignore it.
return Http::FilterHeadersStatus::Continue;
}

Expand All @@ -92,12 +85,8 @@ Http::FilterHeadersStatus CacheFilter::encodeHeaders(Http::ResponseHeaderMap& he

if (filter_state_ == FilterState::ValidatingCachedResponse && isResponseNotModified(headers)) {
processSuccessfulValidation(headers);
if (filter_state_ != FilterState::ResponseServedFromCache) {
// Response is still being fetched from cache -- wait until it is fetched & encoded.
filter_state_ = FilterState::WaitingForCacheBody;
return Http::FilterHeadersStatus::StopIteration;
}
return Http::FilterHeadersStatus::Continue;
// Stop the encoding stream until the cached response is fetched & added to the encoding stream.
return Http::FilterHeadersStatus::StopIteration;
}

// Either a cache miss or a cache entry that is no longer valid.
Expand All @@ -113,11 +102,12 @@ Http::FilterHeadersStatus CacheFilter::encodeHeaders(Http::ResponseHeaderMap& he

Http::FilterDataStatus CacheFilter::encodeData(Buffer::Instance& data, bool end_stream) {
if (filter_state_ == FilterState::DecodeServingFromCache) {
// This call was invoked by decoder_callbacks_->encodeData -- ignore it.
// This call was invoked during decoding by decoder_callbacks_->encodeData because a fresh
// cached response was found and is being added to the encoding stream -- ignore it.
return Http::FilterDataStatus::Continue;
}
if (filter_state_ == FilterState::WaitingForCacheBody) {
// Encoding stream stopped waiting for cached body (and trailers) to be encoded.
if (filter_state_ == FilterState::EncodeServingFromCache) {
yosrym93 marked this conversation as resolved.
Show resolved Hide resolved
// Stop the encoding stream until the cached response is fetched & added to the encoding stream.
return Http::FilterDataStatus::StopIterationAndBuffer;
}
if (insert_) {
Expand All @@ -129,23 +119,68 @@ Http::FilterDataStatus CacheFilter::encodeData(Buffer::Instance& data, bool end_
return Http::FilterDataStatus::Continue;
}

void CacheFilter::getHeaders(Http::RequestHeaderMap& request_headers) {
ASSERT(lookup_, "CacheFilter is trying to call getHeaders with no LookupContext");

// The dispatcher needs to be captured because there's no guarantee that
// decoder_callbacks_->dispatcher() is thread-safe.
lookup_->getHeaders([this, &request_headers,
yosrym93 marked this conversation as resolved.
Show resolved Hide resolved
&dispatcher = decoder_callbacks_->dispatcher()](LookupResult&& result) {
yosrym93 marked this conversation as resolved.
Show resolved Hide resolved
// The callback is posted to the dispatcher to make sure it is called on the worker thread.
// The lambda passed to dispatcher.post() needs to be copyable as it will be used to
// initialize a std::function. Therefore, it cannot capture anything non-copyable.
// LookupResult is non-copyable as LookupResult::headers_ is a unique_ptr, which is
// non-copyable. Hence, "result" is decomposed when captured, and re-instantiated inside the
// lambda so that "result.headers_" can be captured as a raw pointer, then wrapped in a
// unique_ptr when the result is re-instantiated.
dispatcher.post(
[this, &request_headers, status = result.cache_entry_status_,
headers = result.headers_.release(), response_ranges = std::move(result.response_ranges_),
content_length = result.content_length_, has_trailers = result.has_trailers_]() mutable {
onHeaders(LookupResult{status, absl::WrapUnique(headers), content_length, response_ranges,
has_trailers},
request_headers);
});
});
}

void CacheFilter::getBody() {
ASSERT(lookup_, "CacheFilter is trying to call getBody with no LookupContext");
ASSERT(!remaining_body_.empty(), "No reason to call getBody when there's no body to get.");
lookup_->getBody(remaining_body_[0],
[this](Buffer::InstancePtr&& body) { onBody(std::move(body)); });

// The dispatcher needs to be captured because there's no guarantee that
// decoder_callbacks_->dispatcher() is thread-safe.
lookup_->getBody(remaining_body_[0], [this, &dispatcher = decoder_callbacks_->dispatcher()](
Buffer::InstancePtr&& body) {
// The callback is posted to the dispatcher to make sure it is called on the worker thread.
// The lambda passed to dispatcher.post() needs to be copyable as it will be used to
// initialize a std::function. Therefore, it cannot capture anything non-copyable.
// "body" is a unique_ptr, which is non-copyable. Hence, it is captured as a raw pointer then
// wrapped in a unique_ptr inside the lambda.
dispatcher.post([this, body = body.release()] { onBody(absl::WrapUnique(body)); });
});
}

void CacheFilter::getTrailers() {
ASSERT(lookup_, "CacheFilter is trying to call getTrailers with no LookupContext");
ASSERT(response_has_trailers_, "No reason to call getTrailers when there's no trailers to get.");
lookup_->getTrailers(
[this](Http::ResponseTrailerMapPtr&& trailers) { onTrailers(std::move(trailers)); });

// The dispatcher needs to be captured because there's no guarantee that
// decoder_callbacks_->dispatcher() is thread-safe.
lookup_->getTrailers([this, &dispatcher = decoder_callbacks_->dispatcher()](
Http::ResponseTrailerMapPtr&& trailers) {
// The callback is posted to the dispatcher to make sure it is called on the worker thread.
// The lambda passed to dispatcher.post() needs to be copyable as it will be used to
// initialize a std::function. Therefore, it cannot capture anything non-copyable.
// "trailers" is a unique_ptr, which is non-copyable. Hence, it is captured as a raw
// pointer then wrapped in a unique_ptr inside the lambda.
dispatcher.post(
[this, trailers = trailers.release()] { onTrailers(absl::WrapUnique(trailers)); });
});
}

void CacheFilter::onHeaders(LookupResult&& result, Http::RequestHeaderMap& request_headers) {
// TODO(yosrym93): Handle request only-if-cached directive.
bool should_continue_decoding = false;
// TODO(yosrym93): Handle request only-if-cached directive
switch (result.cache_entry_status_) {
case CacheEntryStatus::FoundNotModified:
case CacheEntryStatus::NotSatisfiableRange: // TODO(#10132): create 416 response.
Expand All @@ -156,25 +191,23 @@ void CacheFilter::onHeaders(LookupResult&& result, Http::RequestHeaderMap& reque
// If the cache entry was valid, the response status should be 304 (unmodified) and the cache
// entry will be injected in the response body.
lookup_result_ = std::make_unique<LookupResult>(std::move(result));
should_continue_decoding = filter_state_ == FilterState::WaitingForCacheLookup;
filter_state_ = FilterState::ValidatingCachedResponse;
injectValidationHeaders(request_headers);
break;
case CacheEntryStatus::Unusable:
should_continue_decoding = filter_state_ == FilterState::WaitingForCacheLookup;
filter_state_ = FilterState::NoCachedResponseFound;
break;
case CacheEntryStatus::SatisfiableRange: // TODO(#10132): break response content to the ranges
// requested.
case CacheEntryStatus::Ok:
lookup_result_ = std::make_unique<LookupResult>(std::move(result));
filter_state_ = FilterState::DecodeServingFromCache;
encodeCachedResponse();
// Return here so that continueDecoding is not called.
// No need to continue the decoding stream as a cached response is already being served.
return;
}
if (should_continue_decoding) {
// decodeHeaders returned StopIteration waiting for this callback -- continue decoding.
decoder_callbacks_->continueDecoding();
}
// decodeHeaders returned StopIteration waiting for this callback -- continue decoding
decoder_callbacks_->continueDecoding();
}

// TODO(toddmgreer): Handle downstream backpressure.
Expand Down Expand Up @@ -236,7 +269,9 @@ void CacheFilter::processSuccessfulValidation(Http::ResponseHeaderMap& response_
// Check whether the cached entry should be updated before modifying the 304 response.
const bool should_update_cached_entry = shouldUpdateCachedEntry(response_headers);

// Update the 304 response status code and content-length.
filter_state_ = FilterState::EncodeServingFromCache;

// Update the 304 response status code and content-length
response_headers.setStatus(lookup_result_->headers_->getStatusValue());
response_headers.setContentLength(lookup_result_->headers_->getContentLengthValue());

Expand Down Expand Up @@ -344,7 +379,7 @@ void CacheFilter::encodeCachedResponse() {
}

void CacheFilter::finalizeEncodingCachedResponse() {
if (filter_state_ == FilterState::WaitingForCacheBody) {
if (filter_state_ == FilterState::EncodeServingFromCache) {
// encodeHeaders returned StopIteration waiting for finishing encoding the cached response --
// continue encoding.
encoder_callbacks_->continueEncoding();
Expand Down
21 changes: 7 additions & 14 deletions source/extensions/filters/http/cache/cache_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class CacheFilter : public Http::PassThroughFilter,
Http::FilterDataStatus encodeData(Buffer::Instance& buffer, bool end_stream) override;

private:
// Utility functions: make any necessary checks and call the corresponding lookup_ functions.
// Utility functions; make any necessary checks and call the corresponding lookup_ functions
void getHeaders(Http::RequestHeaderMap& request_headers);
void getBody();
void getTrailers();

Expand Down Expand Up @@ -95,28 +96,20 @@ class CacheFilter : public Http::PassThroughFilter,
enum class FilterState {
Initial,

// CacheFilter::decodeHeaders called lookup->getHeaders() but onHeaders was not called yet
// (lookup result not ready) -- the decoding stream should be stopped until the cache lookup
// result is ready.
WaitingForCacheLookup,

// CacheFilter::encodeHeaders called encodeCachedResponse() but encoding the cached response is
// not finished yet -- the encoding stream should be stopped until it is finished.
WaitingForCacheBody,

// Cache lookup did not find a cached response for this request.
NoCachedResponseFound,

// Cache lookup found a cached response that requires validation.
// Cache lookup found a cached response that requires validation
ValidatingCachedResponse,

// Cache lookup found a fresh cached response and it is being added to the encoding stream.
DecodeServingFromCache,

// A cached response was successfully validated and it is being added to the encoding stream
EncodeServingFromCache,

// The cached response was successfully added to the encoding stream (either during decoding or
// encoding).
ResponseServedFromCache
};

FilterState filter_state_ = FilterState::Initial;
};

Expand Down
16 changes: 16 additions & 0 deletions source/extensions/filters/http/cache/http_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,14 @@ class InsertContext {
// Inserts trailers into the cache.
virtual void insertTrailers(const Http::ResponseTrailerMap& trailers) PURE;

// This routine is called prior to an InsertContext being destroyed. InsertContext is responsible
// for making sure that any async events are cleaned up in the context of this routine. This
// includes timers, network calls, etc. The reason there is an onDestroy() method vs. doing this
// type of cleanup in the destructor is due to the deferred deletion model that Envoy uses to
// avoid stack unwind complications. InsertContext must not invoke any callbacks after having
// onDestroy() invoked.
virtual void onDestroy() PURE;
yosrym93 marked this conversation as resolved.
Show resolved Hide resolved

virtual ~InsertContext() = default;
};
using InsertContextPtr = std::unique_ptr<InsertContext>;
Expand Down Expand Up @@ -277,6 +285,14 @@ class LookupContext {
// Http::ResponseTrailerMapPtr passed to cb must not be null.
virtual void getTrailers(LookupTrailersCallback&& cb) PURE;

// This routine is called prior to an LookupContext being destroyed. LookupContext is responsible
// for making sure that any async events are cleaned up in the context of this routine. This
// includes timers, network calls, etc. The reason there is an onDestroy() method vs. doing this
// type of cleanup in the destructor is due to the deferred deletion model that Envoy uses to
// avoid stack unwind complications. LookupContext must not invoke any callbacks after having
// onDestroy() invoked.
virtual void onDestroy() PURE;

virtual ~LookupContext() = default;
};
using LookupContextPtr = std::unique_ptr<LookupContext>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class SimpleLookupContext : public LookupContext {
}

const LookupRequest& request() const { return request_; }
void onDestroy() override {}

private:
SimpleHttpCache& cache_;
Expand Down Expand Up @@ -74,6 +75,8 @@ class SimpleInsertContext : public InsertContext {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE; // TODO(toddmgreer): support trailers
}

void onDestroy() override {}

private:
void commit() {
committed_ = true;
Expand Down
Loading