Skip to content

Commit

Permalink
Network 475 check upstream for canary status (#49)
Browse files Browse the repository at this point in the history
Update upstream canary checking to use canary status of upstream host in addition to headers.
  • Loading branch information
junr03 authored Sep 2, 2016
1 parent 29ea37b commit b3df612
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 29 deletions.
21 changes: 13 additions & 8 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ const std::string& AsyncRequestImpl::upstreamZone() {
return upstream_host_ ? upstream_host_->zone() : EMPTY_STRING;
}

bool AsyncRequestImpl::isUpstreamCanary() {
return (response_ ? (response_->headers().get(Headers::get().EnvoyUpstreamCanary) == "true")
: false) ||
(upstream_host_ ? upstream_host_->canary() : false);
}

void AsyncRequestImpl::cancel() {
ASSERT(stream_encoder_);
stream_encoder_->resetStream();
Expand All @@ -84,7 +90,7 @@ void AsyncRequestImpl::decodeHeaders(HeaderMapPtr&& headers, bool end_stream) {

CodeUtility::ResponseStatInfo info{parent_.stats_store_, parent_.stat_prefix_,
response_->headers(), true, EMPTY_STRING, EMPTY_STRING,
parent_.local_zone_name_, upstreamZone()};
parent_.local_zone_name_, upstreamZone(), isUpstreamCanary()};
CodeUtility::chargeResponseStat(info);

if (end_stream) {
Expand Down Expand Up @@ -118,11 +124,10 @@ void AsyncRequestImpl::decodeTrailers(HeaderMapPtr&& trailers) {
}

void AsyncRequestImpl::onComplete() {
// TODO: Check host's canary status in addition to canary header.
CodeUtility::ResponseTimingInfo info{
parent_.stats_store_, parent_.stat_prefix_, stream_encoder_->requestCompleteTime(),
response_->headers().get(Headers::get().EnvoyUpstreamCanary) == "true", true, EMPTY_STRING,
EMPTY_STRING, parent_.local_zone_name_, upstreamZone()};
CodeUtility::ResponseTimingInfo info{parent_.stats_store_, parent_.stat_prefix_,
stream_encoder_->requestCompleteTime(), isUpstreamCanary(),
true, EMPTY_STRING, EMPTY_STRING, parent_.local_zone_name_,
upstreamZone()};
CodeUtility::chargeResponseTiming(info);

callbacks_.onSuccess(std::move(response_));
Expand All @@ -132,7 +137,7 @@ void AsyncRequestImpl::onComplete() {
void AsyncRequestImpl::onResetStream(StreamResetReason) {
CodeUtility::ResponseStatInfo info{parent_.stats_store_, parent_.stat_prefix_,
SERVICE_UNAVAILABLE_HEADER, true, EMPTY_STRING, EMPTY_STRING,
parent_.local_zone_name_, upstreamZone()};
parent_.local_zone_name_, upstreamZone(), isUpstreamCanary()};
CodeUtility::chargeResponseStat(info);
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
cleanup();
Expand All @@ -141,7 +146,7 @@ void AsyncRequestImpl::onResetStream(StreamResetReason) {
void AsyncRequestImpl::onRequestTimeout() {
CodeUtility::ResponseStatInfo info{parent_.stats_store_, parent_.stat_prefix_,
REQUEST_TIMEOUT_HEADER, true, EMPTY_STRING, EMPTY_STRING,
parent_.local_zone_name_, upstreamZone()};
parent_.local_zone_name_, upstreamZone(), isUpstreamCanary()};
CodeUtility::chargeResponseStat(info);
parent_.cluster_.stats().upstream_rq_timeout_.inc();
stream_encoder_->resetStream();
Expand Down
1 change: 1 addition & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class AsyncRequestImpl final : public AsyncClient::Request,

private:
const std::string& upstreamZone();
bool isUpstreamCanary();

// Http::StreamDecoder
void decodeHeaders(HeaderMapPtr&& headers, bool end_stream) override;
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/codes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void CodeUtility::chargeResponseStat(const ResponseStatInfo& info) {
std::string group_string = groupStringForResponseCode(static_cast<Code>(response_code));

// If the response is from a canary, also create canary stats.
if (info.response_headers_.get(Headers::get().EnvoyUpstreamCanary) == "true") {
if (info.upstream_canary_) {
info.store_.counter(fmt::format("{}canary.upstream_rq_{}", info.prefix_, group_string)).inc();
info.store_.counter(fmt::format("{}canary.upstream_rq_{}", info.prefix_, response_code)).inc();
}
Expand Down
1 change: 1 addition & 0 deletions source/common/http/codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class CodeUtility {
const std::string& request_vcluster_name_;
const std::string& from_zone_;
const std::string& to_zone_;
bool upstream_canary_;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/filter/ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void RateLimitFilter::complete(RateLimit::LimitStatus status) {
config_->stats().counter(cluster_ratelimit_stat_prefix_ + "over_limit").inc();
Http::CodeUtility::ResponseStatInfo info{config_->stats(), cluster_stat_prefix_,
TOO_MANY_REQUESTS_HEADER, true, EMPTY_STRING,
EMPTY_STRING, EMPTY_STRING, EMPTY_STRING};
EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, false};
Http::CodeUtility::chargeResponseStat(info);
break;
}
Expand Down
11 changes: 7 additions & 4 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,22 @@ const std::string& Filter::upstreamZone() {
}

void Filter::chargeUpstreamCode(const Http::HeaderMap& response_headers) {
bool is_canary = (response_headers.get(Http::Headers::get().EnvoyUpstreamCanary) == "true") ||
(upstream_host_ ? upstream_host_->canary() : false);
if (!callbacks_->requestInfo().healthCheck()) {
Http::CodeUtility::ResponseStatInfo info{
config_->stats_store_, stat_prefix_, response_headers,
downstream_headers_->get(Http::Headers::get().EnvoyInternalRequest) == "true",
route_->virtualHostName(), request_vcluster_name_, config_->service_zone_, upstreamZone()};
route_->virtualHostName(), request_vcluster_name_, config_->service_zone_, upstreamZone(),
is_canary};

Http::CodeUtility::chargeResponseStat(info);

for (const std::string& alt_prefix : alt_stat_prefixes_) {
Http::CodeUtility::ResponseStatInfo info{
config_->stats_store_, alt_prefix, response_headers,
downstream_headers_->get(Http::Headers::get().EnvoyInternalRequest) == "true", "", "",
config_->service_zone_, upstreamZone()};
config_->service_zone_, upstreamZone(), is_canary};

Http::CodeUtility::chargeResponseStat(info);
}
Expand Down Expand Up @@ -383,9 +386,9 @@ void Filter::onUpstreamHeaders(Http::HeaderMapPtr&& headers, bool end_stream) {
std::to_string(ms.count()));
}

// TODO: Check host's canary status in addition to canary header.
upstream_request_->upstream_canary_ =
headers->get(Http::Headers::get().EnvoyUpstreamCanary) == "true";
(headers->get(Http::Headers::get().EnvoyUpstreamCanary) == "true") ||
(upstream_host_ ? upstream_host_->canary() : false);
chargeUpstreamCode(*headers);

downstream_response_started_ = true;
Expand Down
72 changes: 61 additions & 11 deletions test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ using testing::ByRef;
using testing::Invoke;
using testing::NiceMock;
using testing::Ref;
using testing::Return;
using testing::ReturnRef;

namespace Http {

class AsyncClientImplTest : public testing::Test, public AsyncClientConnPoolFactory {
class AsyncClientImplTestBase : public testing::Test, public AsyncClientConnPoolFactory {
public:
AsyncClientImplTest() {
AsyncClientImplTestBase() {
HttpTestUtility::addDefaultHeaders(message_->headers());
ON_CALL(*conn_pool_.host_, zone()).WillByDefault(ReturnRef(upstream_zone_));
}
Expand All @@ -35,13 +36,22 @@ class AsyncClientImplTest : public testing::Test, public AsyncClientConnPoolFact
ConnectionPool::MockInstance conn_pool_;
NiceMock<MockStreamEncoder> stream_encoder_;
StreamDecoder* response_decoder_{};
NiceMock<Stats::MockStore> stats_store_;
NiceMock<Event::MockTimer>* timer_;
NiceMock<Event::MockDispatcher> dispatcher_;
NiceMock<Upstream::MockCluster> cluster_;
};

TEST_F(AsyncClientImplTest, Basic) {
class AsyncClientImplTestMockStats : public AsyncClientImplTestBase {
public:
NiceMock<Stats::MockStore> stats_store_;
};

class AsyncClientImplTestIsolatedStats : public AsyncClientImplTestBase {
public:
Stats::IsolatedStoreImpl stats_store_;
};

TEST_F(AsyncClientImplTestMockStats, Basic) {
message_->body(Buffer::InstancePtr{new Buffer::OwnedImpl("test body")});
Buffer::Instance& data = *message_->body();

Expand Down Expand Up @@ -77,7 +87,7 @@ TEST_F(AsyncClientImplTest, Basic) {
response_decoder_->decodeData(data, true);
}

TEST_F(AsyncClientImplTest, MultipleRequests) {
TEST_F(AsyncClientImplTestMockStats, MultipleRequests) {
// Send request 1
message_->body(Buffer::InstancePtr{new Buffer::OwnedImpl("test body")});
Buffer::Instance& data = *message_->body();
Expand Down Expand Up @@ -124,7 +134,7 @@ TEST_F(AsyncClientImplTest, MultipleRequests) {
response_decoder_->decodeData(data, true);
}

TEST_F(AsyncClientImplTest, Trailers) {
TEST_F(AsyncClientImplTestMockStats, Trailers) {
message_->body(Buffer::InstancePtr{new Buffer::OwnedImpl("test body")});
Buffer::Instance& data = *message_->body();

Expand All @@ -148,7 +158,7 @@ TEST_F(AsyncClientImplTest, Trailers) {
response_decoder_->decodeTrailers(HeaderMapPtr{new HeaderMapImpl{{"some", "trailer"}}});
}

TEST_F(AsyncClientImplTest, FailRequest) {
TEST_F(AsyncClientImplTestMockStats, FailRequest) {
EXPECT_CALL(stats_store_, counter("cluster.fake_cluster.upstream_rq_5xx"));
EXPECT_CALL(stats_store_, counter("cluster.fake_cluster.upstream_rq_503"));
EXPECT_CALL(stats_store_, counter("cluster.fake_cluster.zone.from_az.to_az.upstream_rq_503"));
Expand All @@ -171,7 +181,7 @@ TEST_F(AsyncClientImplTest, FailRequest) {
stream_encoder_.getStream().resetStream(StreamResetReason::RemoteReset);
}

TEST_F(AsyncClientImplTest, CancelRequest) {
TEST_F(AsyncClientImplTestMockStats, CancelRequest) {
EXPECT_CALL(conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](StreamDecoder&, ConnectionPool::Callbacks& callbacks)
-> ConnectionPool::Cancellable* {
Expand All @@ -188,7 +198,7 @@ TEST_F(AsyncClientImplTest, CancelRequest) {
request->cancel();
}

TEST_F(AsyncClientImplTest, PoolFailure) {
TEST_F(AsyncClientImplTestMockStats, PoolFailure) {
EXPECT_CALL(stats_store_, counter("cluster.fake_cluster.upstream_rq_5xx"));
EXPECT_CALL(stats_store_, counter("cluster.fake_cluster.upstream_rq_503"));
EXPECT_CALL(stats_store_, counter("cluster.fake_cluster.zone.from_az.to_az.upstream_rq_5xx"));
Expand All @@ -210,7 +220,7 @@ TEST_F(AsyncClientImplTest, PoolFailure) {
client.send(std::move(message_), callbacks_, Optional<std::chrono::milliseconds>()));
}

TEST_F(AsyncClientImplTest, RequestTimeout) {
TEST_F(AsyncClientImplTestMockStats, RequestTimeout) {
EXPECT_CALL(stats_store_, counter("cluster.fake_cluster.upstream_rq_5xx"));
EXPECT_CALL(stats_store_, counter("cluster.fake_cluster.upstream_rq_504"));
EXPECT_CALL(stats_store_, counter("cluster.fake_cluster.zone.from_az.to_az.upstream_rq_5xx"));
Expand All @@ -236,7 +246,7 @@ TEST_F(AsyncClientImplTest, RequestTimeout) {
EXPECT_EQ(1UL, cluster_.stats_store_.counter("cluster.fake_cluster.upstream_rq_timeout").value());
}

TEST_F(AsyncClientImplTest, DisableTimer) {
TEST_F(AsyncClientImplTestMockStats, DisableTimer) {
EXPECT_CALL(conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](StreamDecoder&, ConnectionPool::Callbacks& callbacks)
-> ConnectionPool::Cancellable* {
Expand All @@ -255,4 +265,44 @@ TEST_F(AsyncClientImplTest, DisableTimer) {
request->cancel();
}

TEST_F(AsyncClientImplTestIsolatedStats, CanaryStatusCounterTrue) {
message_->body(Buffer::InstancePtr{new Buffer::OwnedImpl("test body")});
Buffer::Instance& data = *message_->body();

EXPECT_CALL(conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](StreamDecoder& decoder, ConnectionPool::Callbacks& callbacks)
-> ConnectionPool::Cancellable* {
callbacks.onPoolReady(stream_encoder_, conn_pool_.host_);
response_decoder_ = &decoder;
return nullptr;
}));
AsyncClientImpl client(cluster_, *this, stats_store_, dispatcher_, "from_az");
client.send(std::move(message_), callbacks_, Optional<std::chrono::milliseconds>());
HeaderMapPtr response_headers(
new HeaderMapImpl{{":status", "200"}, {"x-envoy-upstream-canary", "false"}});
ON_CALL(*conn_pool_.host_, canary()).WillByDefault(Return(true));
response_decoder_->decodeHeaders(std::move(response_headers), false);
EXPECT_EQ(1U, stats_store_.counter("cluster.fake_cluster.canary.upstream_rq_200").value());
response_decoder_->decodeData(data, true);
}

TEST_F(AsyncClientImplTestIsolatedStats, CanaryStatusCounterFalse) {
message_->body(Buffer::InstancePtr{new Buffer::OwnedImpl("test body")});
Buffer::Instance& data = *message_->body();

EXPECT_CALL(conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](StreamDecoder& decoder, ConnectionPool::Callbacks& callbacks)
-> ConnectionPool::Cancellable* {
callbacks.onPoolReady(stream_encoder_, conn_pool_.host_);
response_decoder_ = &decoder;
return nullptr;
}));
AsyncClientImpl client(cluster_, *this, stats_store_, dispatcher_, "from_az");
client.send(std::move(message_), callbacks_, Optional<std::chrono::milliseconds>());
HeaderMapPtr response_headers(
new HeaderMapImpl{{":status", "200"}, {"x-envoy-upstream-canary", "false"}});
response_decoder_->decodeHeaders(std::move(response_headers), false);
EXPECT_EQ(0U, stats_store_.counter("cluster.fake_cluster.canary.upstream_rq_200").value());
response_decoder_->decodeData(data, true);
}
} // Http
6 changes: 2 additions & 4 deletions test/common/http/codes_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ class CodeUtilityTest : public testing::Test {
const std::string& from_az = EMPTY_STRING,
const std::string& to_az = EMPTY_STRING) {
HeaderMapImpl headers{{":status", std::to_string(code)}};
if (canary) {
headers.addViaMove("x-envoy-upstream-canary", "true");
}

CodeUtility::ResponseStatInfo info{store_, "prefix.", headers, internal_request,
request_vhost_name, request_vcluster_name, from_az, to_az};
request_vhost_name, request_vcluster_name, from_az, to_az,
canary};

CodeUtility::chargeResponseStat(info);
}
Expand Down
61 changes: 61 additions & 0 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -710,4 +710,65 @@ TEST(RouterFilterUtilityTest, shouldShadow) {
}
}

TEST_F(RouterTest, CanaryStatusTrue) {
NiceMock<MockRouteEntry> route_entry;
EXPECT_CALL(callbacks_.route_table_, routeForRequest(_)).WillOnce(Return(&route_entry));
EXPECT_CALL(route_entry, timeout()).WillOnce(Return(std::chrono::milliseconds(0)));
EXPECT_CALL(callbacks_.dispatcher_, createTimer_(_)).Times(0);

NiceMock<Http::MockStreamEncoder> encoder;
Http::StreamDecoder* response_decoder = nullptr;
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](Http::StreamDecoder& decoder, Http::ConnectionPool::Callbacks& callbacks)
-> Http::ConnectionPool::Cancellable* {
response_decoder = &decoder;
callbacks.onPoolReady(encoder, cm_.conn_pool_.host_);
return nullptr;
}));

Http::HeaderMapImpl headers{{"x-envoy-upstream-alt-stat-name", "alt_stat"},
{"x-envoy-internal", "true"}};
HttpTestUtility::addDefaultHeaders(headers);
router_.decodeHeaders(headers, true);

Http::HeaderMapPtr response_headers(
new Http::HeaderMapImpl{{":status", "200"},
{"x-envoy-upstream-canary", "false"},
{"x-envoy-virtual-cluster", "hello"}});
ON_CALL(*cm_.conn_pool_.host_, canary()).WillByDefault(Return(true));
response_decoder->decodeHeaders(std::move(response_headers), true);

EXPECT_EQ(1U, stats_store_.counter("cluster.fake_cluster.canary.upstream_rq_200").value());
}

TEST_F(RouterTest, CanaryStatusFalse) {
NiceMock<MockRouteEntry> route_entry;
EXPECT_CALL(callbacks_.route_table_, routeForRequest(_)).WillOnce(Return(&route_entry));
EXPECT_CALL(route_entry, timeout()).WillOnce(Return(std::chrono::milliseconds(0)));
EXPECT_CALL(callbacks_.dispatcher_, createTimer_(_)).Times(0);

NiceMock<Http::MockStreamEncoder> encoder;
Http::StreamDecoder* response_decoder = nullptr;
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](Http::StreamDecoder& decoder, Http::ConnectionPool::Callbacks& callbacks)
-> Http::ConnectionPool::Cancellable* {
response_decoder = &decoder;
callbacks.onPoolReady(encoder, cm_.conn_pool_.host_);
return nullptr;
}));

Http::HeaderMapImpl headers{{"x-envoy-upstream-alt-stat-name", "alt_stat"},
{"x-envoy-internal", "true"}};
HttpTestUtility::addDefaultHeaders(headers);
router_.decodeHeaders(headers, true);

Http::HeaderMapPtr response_headers(
new Http::HeaderMapImpl{{":status", "200"},
{"x-envoy-upstream-canary", "false"},
{"x-envoy-virtual-cluster", "hello"}});
response_decoder->decodeHeaders(std::move(response_headers), true);

EXPECT_EQ(0U, stats_store_.counter("cluster.fake_cluster.canary.upstream_rq_200").value());
}

} // Router

0 comments on commit b3df612

Please sign in to comment.