diff --git a/docs/configuration/http_conn_man/route_config/route.rst b/docs/configuration/http_conn_man/route_config/route.rst index a1b6be1942e8..7db47e782efc 100644 --- a/docs/configuration/http_conn_man/route_config/route.rst +++ b/docs/configuration/http_conn_man/route_config/route.rst @@ -22,6 +22,7 @@ next (e.g., redirect, forward, rewrite, etc.). "runtime": "{...}", "retry_policy": "{...}", "rate_limit": "{...}", + "shadow": "{...}" } prefix @@ -85,6 +86,9 @@ content_type :ref:`rate_limit ` *(optional, object)* Indicates that the route has a rate limit policy. +:ref:`shadow ` + *(optional, object)* Indicates that the route has a shadow policy. + .. _config_http_conn_man_route_table_route_runtime: Runtime @@ -151,3 +155,26 @@ global *(optional, boolean)* Specifies whether the global rate limit service should be called for a request that matches this route. This information is used by the :ref:`rate limit filter ` if it is installed. Defaults to false if not specified. + +.. _config_http_conn_man_route_table_route_shadow: + +Shadow +------ + +.. code-block:: json + + { + "cluster": "...", + "runtime_key": "..." + } + +cluster + *(required, string)* Specifies the cluster that requests will be shadowed to. The cluster must + exist in the :ref:`cluster manager configuration `. + +runtime_key + *(optional, string)* If not specified, **all** requests to the target cluster will be shadowed. + If specified, Envoy will lookup the runtime key to get the % of requests to shadow. Valid values are + from 0 to 10000, allowing for increments of 0.01% of requests to be shadowed. If the runtime key + is specified in the configuration but not present in runtime, 0 is the default and thus 0% of + requests will be shadowed. diff --git a/include/envoy/http/async_client.h b/include/envoy/http/async_client.h index 59d6fef6c301..50dd53dfafae 100644 --- a/include/envoy/http/async_client.h +++ b/include/envoy/http/async_client.h @@ -15,7 +15,7 @@ class AsyncClient { */ enum class FailureReason { // Request timeout has been reached. - RequestTimemout, + RequestTimeout, // The stream has been reset. Reset }; diff --git a/include/envoy/router/router.h b/include/envoy/router/router.h index 2e25a13de5b0..2580af4a5f44 100644 --- a/include/envoy/router/router.h +++ b/include/envoy/router/router.h @@ -90,6 +90,28 @@ class RateLimitPolicy { virtual bool doGlobalLimiting() const PURE; }; +/** + * Per route policy for request shadowing. + */ +class ShadowPolicy { +public: + virtual ~ShadowPolicy() {} + + /** + * @return the name of the cluster that a matching request should be shadowed to. Returns empty + * string if no shadowing should take place. + */ + virtual const std::string& cluster() const PURE; + + /** + * @return the runtime key that will be used to determine whether an individual request should + * be shadowed. The lack of a key means that all requests will be shadowed. If a key is + * present it will be used to drive random selection in the range 0-10000 for 0.01% + * increments. + */ + virtual const std::string& runtimeKey() const PURE; +}; + /** * An individual resolved route entry. */ @@ -121,6 +143,12 @@ class RouteEntry { */ virtual const RetryPolicy& retryPolicy() const PURE; + /** + * @return const ShadowPolicy& the shadow policy for the route. All routes have a shadow policy + * even if no shadowing takes place. + */ + virtual const ShadowPolicy& shadowPolicy() const PURE; + /** * @return std::chrono::milliseconds the route's timeout. */ diff --git a/include/envoy/router/shadow_writer.h b/include/envoy/router/shadow_writer.h new file mode 100644 index 000000000000..36fddddaaffc --- /dev/null +++ b/include/envoy/router/shadow_writer.h @@ -0,0 +1,28 @@ +#pragma once + +#include "envoy/http/message.h" + +namespace Router { + +/** + * Interface used to shadow complete requests to an alternate upstream cluster in a "fire and + * forget" fashion. Right now this interface takes a fully buffered request and cannot be used for + * streaming. This is sufficient for current use cases. + */ +class ShadowWriter { +public: + virtual ~ShadowWriter() {} + + /** + * Shadow a request. + * @param cluster supplies the cluster name to shadow to. + * @param message supplies the complete request to shadow. + * @param timeout supplies the shadowed request timeout. + */ + virtual void shadow(const std::string& cluster, Http::MessagePtr&& request, + std::chrono::milliseconds timeout) PURE; +}; + +typedef std::unique_ptr ShadowWriterPtr; + +} // Router diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index d467af2e0dd3..eb4bc747a2e9 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -83,6 +83,7 @@ add_library( router/config_impl.cc router/retry_state_impl.cc router/router.cc + router/shadow_writer_impl.cc runtime/runtime_impl.cc runtime/uuid_util.cc ssl/connection_impl.cc diff --git a/source/common/grpc/rpc_channel_impl.cc b/source/common/grpc/rpc_channel_impl.cc index 99f1bb418bf3..09816ab5737c 100644 --- a/source/common/grpc/rpc_channel_impl.cc +++ b/source/common/grpc/rpc_channel_impl.cc @@ -123,7 +123,7 @@ void RpcChannelImpl::onFailure(Http::AsyncClient::FailureReason reason) { case Http::AsyncClient::FailureReason::Reset: onFailureWorker(Optional(), "stream reset"); break; - case Http::AsyncClient::FailureReason::RequestTimemout: + case Http::AsyncClient::FailureReason::RequestTimeout: onFailureWorker(Optional(), "request timeout"); break; } diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index b1e51a2a3221..b178063a4bd1 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -136,7 +136,7 @@ void AsyncRequestImpl::onRequestTimeout() { CodeUtility::chargeResponseStat(info); parent_.cluster_.stats().upstream_rq_timeout_.inc(); stream_encoder_->resetStream(); - callbacks_.onFailure(AsyncClient::FailureReason::RequestTimemout); + callbacks_.onFailure(AsyncClient::FailureReason::RequestTimeout); cleanup(); } diff --git a/source/common/http/message_impl.h b/source/common/http/message_impl.h index f352dda03f38..5d441b00b75a 100644 --- a/source/common/http/message_impl.h +++ b/source/common/http/message_impl.h @@ -34,6 +34,7 @@ class MessageImpl : public Http::Message { class RequestMessageImpl : public MessageImpl { public: RequestMessageImpl() : MessageImpl(HeaderMapPtr{new HeaderMapImpl()}) {} + RequestMessageImpl(HeaderMapPtr&& headers) : MessageImpl(std::move(headers)) {} }; class ResponseMessageImpl : public MessageImpl { diff --git a/source/common/router/config_impl.cc b/source/common/router/config_impl.cc index 721eb720040e..4cf8f8b79aa6 100644 --- a/source/common/router/config_impl.cc +++ b/source/common/router/config_impl.cc @@ -36,6 +36,15 @@ RateLimitPolicyImpl::RateLimitPolicyImpl(const Json::Object& config) { do_global_limiting_ = config.getObject("rate_limit").getBoolean("global", false); } +ShadowPolicyImpl::ShadowPolicyImpl(const Json::Object& config) { + if (!config.hasObject("shadow")) { + return; + } + + cluster_ = config.getObject("shadow").getString("cluster"); + runtime_key_ = config.getObject("shadow").getString("runtime_key", ""); +} + RouteEntryImplBase::RouteEntryImplBase(const VirtualHost& vhost, const Json::Object& route, Runtime::Loader& loader) : case_sensitive_(route.getBoolean("case_sensitive", true)), @@ -46,7 +55,8 @@ RouteEntryImplBase::RouteEntryImplBase(const VirtualHost& vhost, const Json::Obj runtime_(loadRuntimeData(route)), loader_(loader), host_redirect_(route.getString("host_redirect", "")), path_redirect_(route.getString("path_redirect", "")), retry_policy_(route), - content_type_(route.getString("content_type", "")), rate_limit_policy_(route) { + content_type_(route.getString("content_type", "")), rate_limit_policy_(route), + shadow_policy_(route) { // Check to make sure that we are either a redirect route or we have a cluster. if (!(isRedirect() ^ !cluster_name_.empty())) { @@ -199,6 +209,13 @@ VirtualHost::VirtualHost(const Json::Object& virtual_host, Runtime::Loader& runt fmt::format("route: unknown cluster '{}'", routes_.back()->clusterName())); } } + + if (!routes_.back()->shadowPolicy().cluster().empty()) { + if (!cm.get(routes_.back()->shadowPolicy().cluster())) { + throw EnvoyException(fmt::format("route: unknown shadow cluster '{}'", + routes_.back()->shadowPolicy().cluster())); + } + } } if (virtual_host.hasObject("virtual_clusters")) { diff --git a/source/common/router/config_impl.h b/source/common/router/config_impl.h index cc112ddea30a..60ee57fb0620 100644 --- a/source/common/router/config_impl.h +++ b/source/common/router/config_impl.h @@ -103,6 +103,22 @@ class RateLimitPolicyImpl : public RateLimitPolicy { bool do_global_limiting_{}; }; +/** + * Implementation of ShadowPolicy that reads from the JSON route config. + */ +class ShadowPolicyImpl : public ShadowPolicy { +public: + ShadowPolicyImpl(const Json::Object& config); + + // Router::ShadowPolicy + const std::string& cluster() const override { return cluster_; } + const std::string& runtimeKey() const override { return runtime_key_; } + +private: + std::string cluster_; + std::string runtime_key_; +}; + /** * Base implementation for all route entries. */ @@ -117,6 +133,7 @@ class RouteEntryImplBase : public RouteEntry, public Matchable, public RedirectE void finalizeRequestHeaders(Http::HeaderMap& headers) const override; const RateLimitPolicy& rateLimitPolicy() const override { return rate_limit_policy_; } const RetryPolicy& retryPolicy() const override { return retry_policy_; } + const ShadowPolicy& shadowPolicy() const override { return shadow_policy_; } const std::string& virtualClusterName(const Http::HeaderMap& headers) const override { return vhost_.virtualClusterFromEntries(headers); } @@ -157,6 +174,7 @@ class RouteEntryImplBase : public RouteEntry, public Matchable, public RedirectE const RetryPolicyImpl retry_policy_; const std::string content_type_; const RateLimitPolicyImpl rate_limit_policy_; + const ShadowPolicyImpl shadow_policy_; }; /** diff --git a/source/common/router/router.cc b/source/common/router/router.cc index ff9c01e90edb..2b28ac8d938e 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -16,11 +16,26 @@ #include "common/http/codes.h" #include "common/http/header_map_impl.h" #include "common/http/headers.h" +#include "common/http/message_impl.h" #include "common/http/pooled_stream_encoder.h" #include "common/http/utility.h" namespace Router { +bool FilterUtility::shouldShadow(const ShadowPolicy& policy, Runtime::Loader& runtime, + uint64_t stable_random) { + if (policy.cluster().empty()) { + return false; + } + + if (!policy.runtimeKey().empty() && + !runtime.snapshot().featureEnabled(policy.runtimeKey(), 0, stable_random, 10000UL)) { + return false; + } + + return true; +} + FilterUtility::TimeoutData FilterUtility::finalTimeout(const RouteEntry& route, Http::HeaderMap& request_headers) { // See if there is a user supplied timeout in a request header. If there is we take that, @@ -65,10 +80,7 @@ FilterUtility::TimeoutData FilterUtility::finalTimeout(const RouteEntry& route, return timeout; } -Filter::Filter(const std::string& stat_prefix, Stats::Store& stats, Upstream::ClusterManager& cm, - Runtime::Loader& runtime, Runtime::RandomGenerator& random) - : stats_store_(stats), cm_(cm), runtime_(runtime), random_(random), - stats_{ALL_ROUTER_STATS(POOL_COUNTER_PREFIX(stats, stat_prefix))} {} +Filter::Filter(FilterConfigPtr config) : config_(config) {} Filter::~Filter() { // Upstream resources should already have been cleaned. @@ -79,7 +91,7 @@ Filter::~Filter() { void Filter::chargeUpstreamCode(const Http::HeaderMap& response_headers) { if (!callbacks_->requestInfo().healthCheck()) { Http::CodeUtility::ResponseStatInfo info{ - stats_store_, stat_prefix_, response_headers, + config_->stats_store_, stat_prefix_, response_headers, downstream_headers_->get(Http::Headers::get().EnvoyInternalRequest) == "true", route_->virtualHostName(), request_vcluster_name_}; @@ -87,7 +99,7 @@ void Filter::chargeUpstreamCode(const Http::HeaderMap& response_headers) { for (const std::string& alt_prefix : alt_stat_prefixes_) { Http::CodeUtility::ResponseStatInfo info{ - stats_store_, alt_prefix, response_headers, + config_->stats_store_, alt_prefix, response_headers, downstream_headers_->get(Http::Headers::get().EnvoyInternalRequest) == "true", "", ""}; Http::CodeUtility::chargeResponseStat(info); @@ -106,12 +118,12 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e // Only increment rq total stat if we actually decode headers here. This does not count requests // that get handled by earlier filters. - stats_.rq_total_.inc(); + config_->stats_.rq_total_.inc(); // First determine if we need to do a redirect before we do anything else. const RedirectEntry* redirect = callbacks_->routeTable().redirectRequest(headers); if (redirect) { - stats_.rq_redirect_.inc(); + config_->stats_.rq_redirect_.inc(); Http::Utility::sendRedirect(*callbacks_, redirect->newPath(headers)); return Http::FilterHeadersStatus::StopIteration; } @@ -119,7 +131,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e // Determine if there is a route match. route_ = callbacks_->routeTable().routeForRequest(headers); if (!route_) { - stats_.no_route_.inc(); + config_->stats_.no_route_.inc(); stream_log_debug("no cluster match for URL '{}'", *callbacks_, headers.get(Http::Headers::get().Path)); @@ -136,7 +148,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e stream_log_debug("cluster '{}' match for URL '{}'", *callbacks_, route_->clusterName(), headers.get(Http::Headers::get().Path)); - const Upstream::Cluster& cluster = *cm_.get(route_->clusterName()); + const Upstream::Cluster& cluster = *config_->cm_.get(route_->clusterName()); const std::string& cluster_alt_name = cluster.altStatName(); if (!cluster_alt_name.empty()) { alt_stat_prefixes_.push_back(fmt::format("cluster.{}.", cluster_alt_name)); @@ -150,7 +162,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e headers.remove(Http::Headers::get().EnvoyUpstreamAltStatName); // See if we are supposed to immediately kill some percentage of this cluster's traffic. - if (runtime_.snapshot().featureEnabled( + if (config_->runtime_.snapshot().featureEnabled( fmt::format("upstream.maintenance_mode.{}", route_->clusterName()), 0)) { callbacks_->requestInfo().onFailedResponse(Http::AccessLog::FailureReason::UpstreamOverflow); chargeUpstreamCode(Http::Code::ServiceUnavailable); @@ -159,7 +171,8 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e } // Fetch a connection pool for the upstream cluster. - Http::ConnectionPool::Instance* conn_pool = cm_.httpConnPoolForCluster(route_->clusterName()); + Http::ConnectionPool::Instance* conn_pool = + config_->cm_.httpConnPoolForCluster(route_->clusterName()); if (!conn_pool) { sendNoHealthyUpstreamResponse(); return Http::FilterHeadersStatus::StopIteration; @@ -167,8 +180,10 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e timeout_ = FilterUtility::finalTimeout(*route_, headers); route_->finalizeRequestHeaders(headers); - retry_state_ = createRetryState(route_->retryPolicy(), headers, cluster, runtime_, random_, - callbacks_->dispatcher()); + retry_state_ = createRetryState(route_->retryPolicy(), headers, cluster, config_->runtime_, + config_->random_, callbacks_->dispatcher()); + do_shadowing_ = FilterUtility::shouldShadow(route_->shadowPolicy(), config_->runtime_, + callbacks_->streamId()); #ifndef NDEBUG headers.iterate([this](const Http::LowerCaseString& key, const std::string& value) @@ -196,12 +211,13 @@ Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_strea onRequestComplete(); } - // If we are potentially going to retry this request we need to buffer. - return retry_state_->enabled() ? Http::FilterDataStatus::StopIterationAndBuffer - : Http::FilterDataStatus::StopIterationNoBuffer; + // If we are potentially going to retry or shadow this request we need to buffer. + return retry_state_->enabled() || do_shadowing_ ? Http::FilterDataStatus::StopIterationAndBuffer + : Http::FilterDataStatus::StopIterationNoBuffer; } Http::FilterTrailersStatus Filter::decodeTrailers(Http::HeaderMap& trailers) { + downstream_trailers_ = &trailers; upstream_request_->upstream_encoder_->encodeTrailers(trailers); onRequestComplete(); return Http::FilterTrailersStatus::StopIteration; @@ -216,11 +232,34 @@ void Filter::cleanup() { } } +void Filter::maybeDoShadowing() { + if (!do_shadowing_) { + return; + } + + ASSERT(!route_->shadowPolicy().cluster().empty()); + Http::MessagePtr request(new Http::RequestMessageImpl( + Http::HeaderMapPtr{new Http::HeaderMapImpl(*downstream_headers_)})); + if (callbacks_->decodingBuffer()) { + request->body(Buffer::InstancePtr{new Buffer::OwnedImpl(*callbacks_->decodingBuffer())}); + } + if (downstream_trailers_) { + request->trailers(Http::HeaderMapPtr{new Http::HeaderMapImpl(*downstream_trailers_)}); + } + + config_->shadowWriter().shadow(route_->shadowPolicy().cluster(), std::move(request), + timeout_.global_timeout_); +} + void Filter::onRequestComplete() { downstream_end_stream_ = true; // Possible that we got an immediate reset. if (upstream_request_) { + // Even if we got an immediate reset, we could still shadow, but that is a riskier change and + // seems unnecessary right now. + maybeDoShadowing(); + upstream_request_->setupPerTryTimeout(); if (timeout_.global_timeout_.count() > 0) { response_timeout_ = @@ -240,7 +279,7 @@ void Filter::onResetStream() { void Filter::onResponseTimeout() { stream_log_debug("upstream timeout", *callbacks_); - cm_.get(route_->clusterName())->stats().upstream_rq_timeout_.inc(); + config_->cm_.get(route_->clusterName())->stats().upstream_rq_timeout_.inc(); // It's possible to timeout during a retry backoff delay when we have no upstream request. In // this case we fake a reset since onUpstreamReset() doesn't care. @@ -318,7 +357,7 @@ void Filter::onUpstreamHeaders(Http::HeaderMapPtr&& headers, bool end_stream) { [this]() -> void { doRetry(); }) && setupRetry(end_stream)) { Http::CodeUtility::chargeBasicResponseStat( - stats_store_, stat_prefix_ + "retry.", + config_->stats_store_, stat_prefix_ + "retry.", static_cast(Http::Utility::getResponseStatus(*headers))); return; } else { @@ -371,7 +410,8 @@ void Filter::onUpstreamComplete() { if (!callbacks_->requestInfo().healthCheck()) { Http::CodeUtility::ResponseTimingInfo info{ - stats_store_, stat_prefix_, upstream_request_->upstream_encoder_->requestCompleteTime(), + config_->stats_store_, stat_prefix_, + upstream_request_->upstream_encoder_->requestCompleteTime(), upstream_request_->upstream_canary_, downstream_headers_->get(Http::Headers::get().EnvoyInternalRequest) == "true", route_->virtualHostName(), request_vcluster_name_}; @@ -380,7 +420,8 @@ void Filter::onUpstreamComplete() { for (const std::string& alt_prefix : alt_stat_prefixes_) { Http::CodeUtility::ResponseTimingInfo info{ - stats_store_, alt_prefix, upstream_request_->upstream_encoder_->requestCompleteTime(), + config_->stats_store_, alt_prefix, + upstream_request_->upstream_encoder_->requestCompleteTime(), upstream_request_->upstream_canary_, downstream_headers_->get(Http::Headers::get().EnvoyInternalRequest) == "true", "", ""}; @@ -411,7 +452,8 @@ bool Filter::setupRetry(bool end_stream) { } void Filter::doRetry() { - Http::ConnectionPool::Instance* conn_pool = cm_.httpConnPoolForCluster(route_->clusterName()); + Http::ConnectionPool::Instance* conn_pool = + config_->cm_.httpConnPoolForCluster(route_->clusterName()); if (!conn_pool) { sendNoHealthyUpstreamResponse(); cleanup(); @@ -421,14 +463,19 @@ void Filter::doRetry() { ASSERT(response_timeout_ || timeout_.global_timeout_.count() == 0); ASSERT(!upstream_request_); upstream_request_.reset(new UpstreamRequest(*this, *conn_pool)); - upstream_request_->upstream_encoder_->encodeHeaders(*downstream_headers_, - !callbacks_->decodingBuffer()); + upstream_request_->upstream_encoder_->encodeHeaders( + *downstream_headers_, !callbacks_->decodingBuffer() && !downstream_trailers_); // It's possible we got immediately reset. - if (upstream_request_ && callbacks_->decodingBuffer()) { - upstream_request_->upstream_encoder_->encodeData(*callbacks_->decodingBuffer(), true); - } - if (upstream_request_) { + if (callbacks_->decodingBuffer()) { + upstream_request_->upstream_encoder_->encodeData(*callbacks_->decodingBuffer(), + !downstream_trailers_); + } + + if (downstream_trailers_) { + upstream_request_->upstream_encoder_->encodeTrailers(*downstream_trailers_); + } + upstream_request_->setupPerTryTimeout(); } } @@ -473,7 +520,9 @@ void Filter::UpstreamRequest::setupPerTryTimeout() { void Filter::UpstreamRequest::onPerTryTimeout() { stream_log_debug("upstream per try timeout", *parent_.callbacks_); - parent_.cm_.get(parent_.route_->clusterName())->stats().upstream_rq_per_try_timeout_.inc(); + parent_.config_->cm_.get(parent_.route_->clusterName()) + ->stats() + .upstream_rq_per_try_timeout_.inc(); upstream_encoder_->resetStream(); parent_.onUpstreamReset(UpstreamResetType::PerTryTimeout, Optional()); } diff --git a/source/common/router/router.h b/source/common/router/router.h index 8356cb3cb0e2..7a11a51049de 100644 --- a/source/common/router/router.h +++ b/source/common/router/router.h @@ -3,6 +3,7 @@ #include "envoy/http/codec.h" #include "envoy/http/codes.h" #include "envoy/http/filter.h" +#include "envoy/router/shadow_writer.h" #include "envoy/runtime/runtime.h" #include "envoy/stats/stats_macros.h" #include "envoy/upstream/cluster_manager.h" @@ -39,6 +40,17 @@ class FilterUtility { std::chrono::milliseconds per_try_timeout_{0}; }; + /** + * Determine whether a request should be shadowed. + * @param policy supplies the route's shadow policy. + * @param runtime supplies the runtime to lookup the shadow key in. + * @param stable_random supplies the random number to use when determining whether shadowing + * should take place. + * @return TRUE if shadowing should take place. + */ + static bool shouldShadow(const ShadowPolicy& policy, Runtime::Loader& runtime, + uint64_t stable_random); + /** * Determine the final timeout to use based on the route as well as the request headers. * @param route supplies the request route. @@ -48,13 +60,38 @@ class FilterUtility { static TimeoutData finalTimeout(const RouteEntry& route, Http::HeaderMap& request_headers); }; +/** + * Configuration for the router filter. + */ +class FilterConfig { +public: + FilterConfig(const std::string& stat_prefix, Stats::Store& stats, Upstream::ClusterManager& cm, + Runtime::Loader& runtime, Runtime::RandomGenerator& random, + ShadowWriterPtr&& shadow_writer) + : stats_store_(stats), cm_(cm), runtime_(runtime), random_(random), + stats_{ALL_ROUTER_STATS(POOL_COUNTER_PREFIX(stats, stat_prefix))}, + shadow_writer_(std::move(shadow_writer)) {} + + ShadowWriter& shadowWriter() { return *shadow_writer_; } + + Stats::Store& stats_store_; + Upstream::ClusterManager& cm_; + Runtime::Loader& runtime_; + Runtime::RandomGenerator& random_; + FilterStats stats_; + +private: + ShadowWriterPtr shadow_writer_; +}; + +typedef std::shared_ptr FilterConfigPtr; + /** * Service routing filter. */ class Filter : Logger::Loggable, public Http::StreamDecoderFilter { public: - Filter(const std::string& stat_prefix, Stats::Store& stats, Upstream::ClusterManager& cm, - Runtime::Loader& runtime, Runtime::RandomGenerator& random); + Filter(FilterConfigPtr); ~Filter(); // Http::StreamDecoderFilter @@ -110,6 +147,7 @@ class Filter : Logger::Loggable, public Http::StreamDecoderF const Upstream::Cluster& cluster, Runtime::Loader& runtime, Runtime::RandomGenerator& random, Event::Dispatcher& dispatcher) PURE; + void maybeDoShadowing(); void onRequestComplete(); void onResetStream(); void onResponseTimeout(); @@ -123,10 +161,7 @@ class Filter : Logger::Loggable, public Http::StreamDecoderF bool setupRetry(bool end_stream); void doRetry(); - Stats::Store& stats_store_; - Upstream::ClusterManager& cm_; - Runtime::Loader& runtime_; - Runtime::RandomGenerator& random_; + FilterConfigPtr config_; Http::StreamDecoderFilterCallbacks* callbacks_{}; const RouteEntry* route_; std::string stat_prefix_; @@ -134,12 +169,13 @@ class Filter : Logger::Loggable, public Http::StreamDecoderF std::string request_vcluster_name_; bool downstream_response_started_{}; Event::TimerPtr response_timeout_; - FilterStats stats_; FilterUtility::TimeoutData timeout_; UpstreamRequestPtr upstream_request_; RetryStatePtr retry_state_; Http::HeaderMap* downstream_headers_{}; + Http::HeaderMap* downstream_trailers_{}; bool downstream_end_stream_{}; + bool do_shadowing_{}; }; class ProdFilter : public Filter { diff --git a/source/common/router/shadow_writer_impl.cc b/source/common/router/shadow_writer_impl.cc new file mode 100644 index 000000000000..3bdf5148a164 --- /dev/null +++ b/source/common/router/shadow_writer_impl.cc @@ -0,0 +1,13 @@ +#include "shadow_writer_impl.h" + +namespace Router { + +void ShadowWriterImpl::shadow(const std::string& cluster, Http::MessagePtr&& request, + std::chrono::milliseconds timeout) { + // Configuration should guarantee that cluster exists before calling here. This is basically + // fire and forget. We don't handle cancelling. + cm_.httpAsyncClientForCluster(cluster) + .send(std::move(request), *this, Optional(timeout)); +} + +} // Router diff --git a/source/common/router/shadow_writer_impl.h b/source/common/router/shadow_writer_impl.h new file mode 100644 index 000000000000..658c3375d954 --- /dev/null +++ b/source/common/router/shadow_writer_impl.h @@ -0,0 +1,28 @@ +#pragma once + +#include "envoy/router/shadow_writer.h" +#include "envoy/upstream/cluster_manager.h" + +namespace Router { + +/** + * Implementation of ShadowWriter that takes incoming requests to shadow and implements "fire and + * forget" behavior using an async client. + */ +class ShadowWriterImpl : public ShadowWriter, public Http::AsyncClient::Callbacks { +public: + ShadowWriterImpl(Upstream::ClusterManager& cm) : cm_(cm) {} + + // Router::ShadowWriter + void shadow(const std::string& cluster, Http::MessagePtr&& request, + std::chrono::milliseconds timeout) override; + + // Http::AsyncClient::Callbacks + void onSuccess(Http::MessagePtr&&) override {} + void onFailure(Http::AsyncClient::FailureReason) override {} + +private: + Upstream::ClusterManager& cm_; +}; + +} // Router diff --git a/source/server/config/http/router.cc b/source/server/config/http/router.cc index 0c112628a496..40fb1a42d007 100644 --- a/source/server/config/http/router.cc +++ b/source/server/config/http/router.cc @@ -1,4 +1,5 @@ #include "common/router/router.h" +#include "common/router/shadow_writer_impl.h" #include "server/config/network/http_connection_manager.h" namespace Server { @@ -16,10 +17,13 @@ class FilterConfig : public HttpFilterConfigFactory { return nullptr; } - return [&server, stat_prefix](Http::FilterChainFactoryCallbacks& callbacks) -> void { - callbacks.addStreamDecoderFilter(Http::StreamDecoderFilterPtr{ - new Router::ProdFilter(stat_prefix, server.stats(), server.clusterManager(), - server.runtime(), server.random())}); + Router::FilterConfigPtr config(new Router::FilterConfig( + stat_prefix, server.stats(), server.clusterManager(), server.runtime(), server.random(), + Router::ShadowWriterPtr{new Router::ShadowWriterImpl(server.clusterManager())})); + + return [config](Http::FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamDecoderFilter( + Http::StreamDecoderFilterPtr{new Router::ProdFilter(config)}); }; } }; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index b39437c67845..45b6a2603028 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -71,6 +71,7 @@ add_executable(envoy-test common/router/config_impl_test.cc common/router/retry_state_impl_test.cc common/router/router_test.cc + common/router/shadow_writer_impl_test.cc common/runtime/runtime_impl_test.cc common/runtime/uuid_util_test.cc common/ssl/connection_impl_test.cc diff --git a/test/common/grpc/rpc_channel_impl_test.cc b/test/common/grpc/rpc_channel_impl_test.cc index 15e9ea6bbbff..ed7744e85168 100644 --- a/test/common/grpc/rpc_channel_impl_test.cc +++ b/test/common/grpc/rpc_channel_impl_test.cc @@ -226,7 +226,7 @@ TEST_F(GrpcRequestImplTest, HttpAsyncRequestTimeout) { service_.SayHello(nullptr, &request, &response, nullptr); EXPECT_CALL(grpc_callbacks_, onFailure(Optional(), "request timeout")); - http_callbacks_->onFailure(Http::AsyncClient::FailureReason::RequestTimemout); + http_callbacks_->onFailure(Http::AsyncClient::FailureReason::RequestTimeout); } TEST_F(GrpcRequestImplTest, NoHttpAsyncRequest) { diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index ff4c20ad8b0a..cb0bfe15b695 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -208,7 +208,7 @@ TEST_F(AsyncClientImplTest, RequestTimeout) { })); EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(ByRef(message_->headers())), true)); - EXPECT_CALL(callbacks_, onFailure(Http::AsyncClient::FailureReason::RequestTimemout)); + EXPECT_CALL(callbacks_, onFailure(Http::AsyncClient::FailureReason::RequestTimeout)); timer_ = new NiceMock(&dispatcher_); EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40))); EXPECT_CALL(stream_encoder_.stream_, resetStream(_)); diff --git a/test/common/router/config_impl_test.cc b/test/common/router/config_impl_test.cc index 212cd6dfd75c..e68cc49a0471 100644 --- a/test/common/router/config_impl_test.cc +++ b/test/common/router/config_impl_test.cc @@ -87,7 +87,7 @@ TEST(RouteMatcherTest, TestRoutes) { "cluster": "instant-server", "case_sensitive": true }, - { + { "path": "/tar", "prefix_rewrite": "/car", "cluster": "instant-server", @@ -452,6 +452,96 @@ TEST(RouteMatcherTest, RateLimit) { .doGlobalLimiting()); } +TEST(RouteMatcherTest, ShadowClusterNotFound) { + std::string json = R"EOF( +{ + "virtual_hosts": [ + { + "name": "www2", + "domains": ["www.lyft.com"], + "routes": [ + { + "prefix": "/foo", + "shadow": { + "cluster": "some_cluster" + }, + "cluster": "www2" + } + ] + } + ] +} + )EOF"; + + Json::StringLoader loader(json); + NiceMock runtime; + NiceMock cm; + EXPECT_CALL(cm, get("www2")).WillRepeatedly(Return(&cm.cluster_)); + EXPECT_CALL(cm, get("some_cluster")).WillRepeatedly(Return(nullptr)); + + EXPECT_THROW(ConfigImpl(loader, runtime, cm), EnvoyException); +} + +TEST(RouteMatcherTest, Shadow) { + std::string json = R"EOF( +{ + "virtual_hosts": [ + { + "name": "www2", + "domains": ["www.lyft.com"], + "routes": [ + { + "prefix": "/foo", + "shadow": { + "cluster": "some_cluster" + }, + "cluster": "www2" + }, + { + "prefix": "/bar", + "shadow": { + "cluster": "some_cluster2", + "runtime_key": "foo" + }, + "cluster": "www2" + }, + { + "prefix": "/baz", + "cluster": "www2" + } + ] + } + ] +} + )EOF"; + + Json::StringLoader loader(json); + NiceMock runtime; + NiceMock cm; + ConfigImpl config(loader, runtime, cm); + + EXPECT_EQ("some_cluster", config.routeForRequest(genHeaders("www.lyft.com", "/foo", "GET"), 0) + ->shadowPolicy() + .cluster()); + EXPECT_EQ("", config.routeForRequest(genHeaders("www.lyft.com", "/foo", "GET"), 0) + ->shadowPolicy() + .runtimeKey()); + + EXPECT_EQ("some_cluster2", config.routeForRequest(genHeaders("www.lyft.com", "/bar", "GET"), 0) + ->shadowPolicy() + .cluster()); + EXPECT_EQ("foo", config.routeForRequest(genHeaders("www.lyft.com", "/bar", "GET"), 0) + ->shadowPolicy() + .runtimeKey()); + + EXPECT_EQ("", config.routeForRequest(genHeaders("www.lyft.com", "/baz", "GET"), 0) + ->shadowPolicy() + .cluster()); + EXPECT_EQ("", config.routeForRequest(genHeaders("www.lyft.com", "/baz", "GET"), 0) + ->shadowPolicy() + .runtimeKey()); +} + TEST(RouteMatcherTest, Retry) { std::string json = R"EOF( { diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc index 1971da37f356..f2d6fdbb62d6 100644 --- a/test/common/router/router_test.cc +++ b/test/common/router/router_test.cc @@ -8,6 +8,7 @@ #include "test/mocks/upstream/mocks.h" using testing::_; +using testing::AtLeast; using testing::Invoke; using testing::NiceMock; using testing::Return; @@ -34,7 +35,11 @@ class TestFilter : public Filter { class RouterTest : public testing::Test { public: - RouterTest() : router_("test.", stats_store_, cm_, runtime_, random_) { + RouterTest() + : shadow_writer_(new MockShadowWriter()), + config_(new FilterConfig("test.", stats_store_, cm_, runtime_, random_, + ShadowWriterPtr{shadow_writer_})), + router_(config_) { router_.setDecoderFilterCallbacks(callbacks_); ON_CALL(*cm_.conn_pool_.host_, url()).WillByDefault(ReturnRef(host_url_)); } @@ -57,6 +62,8 @@ class RouterTest : public testing::Test { NiceMock random_; Http::ConnectionPool::MockCancellable cancellable_; NiceMock callbacks_; + MockShadowWriter* shadow_writer_; + FilterConfigPtr config_; TestFilter router_; Event::MockTimer* response_timeout_{}; Event::MockTimer* per_try_timeout_{}; @@ -483,7 +490,10 @@ TEST_F(RouterTest, RetryUpstream5xxNotComplete) { Buffer::OwnedImpl body_data("hello"); EXPECT_CALL(*router_.retry_state_, enabled()).WillOnce(Return(true)); - EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, router_.decodeData(body_data, true)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, router_.decodeData(body_data, false)); + + Http::HeaderMapImpl trailers{{"some", "trailer"}}; + router_.decodeTrailers(trailers); // 5xx response. router_.retry_state_->expectRetry(); @@ -502,7 +512,8 @@ TEST_F(RouterTest, RetryUpstream5xxNotComplete) { })); ON_CALL(callbacks_, decodingBuffer()).WillByDefault(Return(&body_data)); EXPECT_CALL(encoder2, encodeHeaders(_, false)); - EXPECT_CALL(encoder2, encodeData(_, true)); + EXPECT_CALL(encoder2, encodeData(_, false)); + EXPECT_CALL(encoder2, encodeTrailers(_)); router_.retry_state_->callback_(); // Normal response. @@ -514,6 +525,45 @@ TEST_F(RouterTest, RetryUpstream5xxNotComplete) { EXPECT_EQ(1U, stats_store_.counter("cluster.fake_cluster.upstream_rq_200").value()); } +TEST_F(RouterTest, Shadow) { + callbacks_.route_table_.route_entry_.shadow_policy_.cluster_ = "foo"; + callbacks_.route_table_.route_entry_.shadow_policy_.runtime_key_ = "bar"; + ON_CALL(callbacks_, streamId()).WillByDefault(Return(43)); + + NiceMock encoder; + Http::StreamDecoder* response_decoder = nullptr; + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](Http::StreamDecoder& decoder, Http::ConnectionPool::Callbacks& callbacks) + -> Http::ConnectionPool::Cancellable* { + response_decoder = &decoder; + callbacks.onPoolReady(encoder, cm_.conn_pool_.host_); + return nullptr; + })); + expectResponseTimerCreate(); + + EXPECT_CALL(runtime_.snapshot_, featureEnabled("bar", 0, 43, 10000)).WillOnce(Return(true)); + + Http::HeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + router_.decodeHeaders(headers, false); + + Buffer::OwnedImpl body_data("hello"); + EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, router_.decodeData(body_data, false)); + + Http::HeaderMapImpl trailers{{"some", "trailer"}}; + EXPECT_CALL(callbacks_, decodingBuffer()).Times(AtLeast(1)).WillRepeatedly(Return(&body_data)); + EXPECT_CALL(*shadow_writer_, shadow_("foo", _, std::chrono::milliseconds(10))) + .WillOnce(Invoke([](const std::string&, Http::MessagePtr& request, std::chrono::milliseconds) + -> void { + EXPECT_NE(nullptr, request->body()); + EXPECT_NE(nullptr, request->trailers()); + })); + router_.decodeTrailers(trailers); + + Http::HeaderMapPtr response_headers(new Http::HeaderMapImpl{{":status", "200"}}); + response_decoder->decodeHeaders(std::move(response_headers), true); +} + TEST_F(RouterTest, AltStatName) { // Also test no upstream timeout here. NiceMock route_entry; @@ -561,7 +611,7 @@ TEST_F(RouterTest, Redirect) { router_.decodeHeaders(headers, true); } -TEST(RouterFilterUtilityTest, All) { +TEST(RouterFilterUtilityTest, finalTimeout) { { MockRouteEntry route; EXPECT_CALL(route, timeout()).WillOnce(Return(std::chrono::milliseconds(10))); @@ -616,4 +666,36 @@ TEST(RouterFilterUtilityTest, All) { } } +TEST(RouterFilterUtilityTest, shouldShadow) { + { + TestShadowPolicy policy; + NiceMock runtime; + EXPECT_CALL(runtime.snapshot_, featureEnabled(_, _, _, _)).Times(0); + EXPECT_FALSE(FilterUtility::shouldShadow(policy, runtime, 5)); + } + { + TestShadowPolicy policy; + policy.cluster_ = "cluster"; + NiceMock runtime; + EXPECT_CALL(runtime.snapshot_, featureEnabled(_, _, _, _)).Times(0); + EXPECT_TRUE(FilterUtility::shouldShadow(policy, runtime, 5)); + } + { + TestShadowPolicy policy; + policy.cluster_ = "cluster"; + policy.runtime_key_ = "foo"; + NiceMock runtime; + EXPECT_CALL(runtime.snapshot_, featureEnabled("foo", 0, 5, 10000)).WillOnce(Return(false)); + EXPECT_FALSE(FilterUtility::shouldShadow(policy, runtime, 5)); + } + { + TestShadowPolicy policy; + policy.cluster_ = "cluster"; + policy.runtime_key_ = "foo"; + NiceMock runtime; + EXPECT_CALL(runtime.snapshot_, featureEnabled("foo", 0, 5, 10000)).WillOnce(Return(true)); + EXPECT_TRUE(FilterUtility::shouldShadow(policy, runtime, 5)); + } +} + } // Router diff --git a/test/common/router/shadow_writer_impl_test.cc b/test/common/router/shadow_writer_impl_test.cc new file mode 100644 index 000000000000..5a6436654380 --- /dev/null +++ b/test/common/router/shadow_writer_impl_test.cc @@ -0,0 +1,49 @@ +#include "common/router/shadow_writer_impl.h" + +#include "test/mocks/upstream/mocks.h" + +using testing::_; +using testing::Invoke; + +namespace Router { + +TEST(ShadowWriterImplTest, All) { + Upstream::MockClusterManager cm; + ShadowWriterImpl writer(cm); + + // Success case + Http::MessagePtr message(new Http::RequestMessageImpl()); + EXPECT_CALL(cm, httpAsyncClientForCluster("foo")).WillOnce(ReturnRef(cm.async_client_)); + Http::MockAsyncClientRequest request(&cm.async_client_); + Http::AsyncClient::Callbacks* callback; + EXPECT_CALL(cm.async_client_, + send_(_, _, Optional(std::chrono::milliseconds(5)))) + .WillOnce( + Invoke([&](Http::MessagePtr& inner_message, Http::AsyncClient::Callbacks& callbacks, + const Optional&) -> Http::AsyncClient::Request* { + EXPECT_EQ(message, inner_message); + callback = &callbacks; + return &request; + })); + writer.shadow("foo", std::move(message), std::chrono::milliseconds(5)); + + Http::MessagePtr response(new Http::RequestMessageImpl()); + callback->onSuccess(std::move(response)); + + // Failure case + message.reset(new Http::RequestMessageImpl()); + EXPECT_CALL(cm, httpAsyncClientForCluster("bar")).WillOnce(ReturnRef(cm.async_client_)); + EXPECT_CALL(cm.async_client_, + send_(_, _, Optional(std::chrono::milliseconds(10)))) + .WillOnce( + Invoke([&](Http::MessagePtr& inner_message, Http::AsyncClient::Callbacks& callbacks, + const Optional&) -> Http::AsyncClient::Request* { + EXPECT_EQ(message, inner_message); + callback = &callbacks; + return &request; + })); + writer.shadow("bar", std::move(message), std::chrono::milliseconds(10)); + callback->onFailure(Http::AsyncClient::FailureReason::RequestTimeout); +} + +} // Router diff --git a/test/mocks/router/mocks.cc b/test/mocks/router/mocks.cc index fd765213c20d..8569d0cc30a3 100644 --- a/test/mocks/router/mocks.cc +++ b/test/mocks/router/mocks.cc @@ -19,10 +19,14 @@ void MockRetryState::expectRetry() { MockRetryState::~MockRetryState() {} +MockShadowWriter::MockShadowWriter() {} +MockShadowWriter::~MockShadowWriter() {} + MockRouteEntry::MockRouteEntry() { ON_CALL(*this, clusterName()).WillByDefault(ReturnRef(cluster_name_)); ON_CALL(*this, rateLimitPolicy()).WillByDefault(ReturnRef(rate_limit_policy_)); ON_CALL(*this, retryPolicy()).WillByDefault(ReturnRef(retry_policy_)); + ON_CALL(*this, shadowPolicy()).WillByDefault(ReturnRef(shadow_policy_)); ON_CALL(*this, timeout()).WillByDefault(Return(std::chrono::milliseconds(10))); ON_CALL(*this, virtualClusterName(_)).WillByDefault(ReturnRef(virtual_cluster_)); ON_CALL(*this, virtualHostName()).WillByDefault(ReturnRef(vhost_name_)); diff --git a/test/mocks/router/mocks.h b/test/mocks/router/mocks.h index 5d9f3e83422c..6c6a74d6ca36 100644 --- a/test/mocks/router/mocks.h +++ b/test/mocks/router/mocks.h @@ -1,6 +1,7 @@ #pragma once #include "envoy/router/router.h" +#include "envoy/router/shadow_writer.h" namespace Router { @@ -46,6 +47,31 @@ class TestRateLimitPolicy : public RateLimitPolicy { bool do_global_limiting_{}; }; +class TestShadowPolicy : public ShadowPolicy { +public: + // Router::ShadowPolicy + const std::string& cluster() const override { return cluster_; } + const std::string& runtimeKey() const override { return runtime_key_; } + + std::string cluster_; + std::string runtime_key_; +}; + +class MockShadowWriter : public ShadowWriter { +public: + MockShadowWriter(); + ~MockShadowWriter(); + + // Router::ShadowWriter + void shadow(const std::string& cluster, Http::MessagePtr&& request, + std::chrono::milliseconds timeout) override { + shadow_(cluster, request, timeout); + } + + MOCK_METHOD3(shadow_, void(const std::string& cluster, Http::MessagePtr& request, + std::chrono::milliseconds timeout)); +}; + class MockRouteEntry : public RouteEntry { public: MockRouteEntry(); @@ -56,6 +82,7 @@ class MockRouteEntry : public RouteEntry { MOCK_CONST_METHOD1(finalizeRequestHeaders, void(Http::HeaderMap& headers)); MOCK_CONST_METHOD0(rateLimitPolicy, const RateLimitPolicy&()); MOCK_CONST_METHOD0(retryPolicy, const RetryPolicy&()); + MOCK_CONST_METHOD0(shadowPolicy, const ShadowPolicy&()); MOCK_CONST_METHOD0(timeout, std::chrono::milliseconds()); MOCK_CONST_METHOD1(virtualClusterName, const std::string&(const Http::HeaderMap& headers)); MOCK_CONST_METHOD0(virtualHostName, const std::string&()); @@ -65,6 +92,7 @@ class MockRouteEntry : public RouteEntry { std::string virtual_cluster_{"fake_virtual_cluster"}; TestRetryPolicy retry_policy_; TestRateLimitPolicy rate_limit_policy_; + TestShadowPolicy shadow_policy_; }; class MockConfig : public Config { diff --git a/tools/pre-commit.sh b/tools/pre-commit.sh new file mode 100755 index 000000000000..9b5f091cc923 --- /dev/null +++ b/tools/pre-commit.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +for i in `git diff-index --name-only --diff-filter=ACM HEAD 2>&1`; do + echo "Checking format for $i" + tools/check_format.py $i clang-format-3.6 check + if [[ $? -ne 0 ]]; then + exit 1 + fi +done