Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform lightstep flushing on timer. #119

Merged
merged 6 commits into from
Oct 5, 2016
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 46 additions & 20 deletions source/common/tracing/http_tracer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,47 @@ void HttpTracerImpl::populateStats(const Decision& decision) {
}
}

LightStepRecorder::LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink)
: builder_(tracer), sink_(sink) {}
LightStepRecorder::LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink,
Event::Dispatcher& dispatcher)
: builder_(tracer), sink_(sink) {
flush_timer_ = dispatcher.createTimer([this]() -> void {
sink_.tracerStats().timer_flushed_.inc();
flushSpans();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check pending spans != 0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

uint64_t flush_interval =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: duplicate code here and below can be collapsed into helper function

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixing

sink_.runtime().snapshot().getInteger("tracing.lightstep.flush_interval_ms", 1000U);
flush_timer_->enableTimer(std::chrono::milliseconds(flush_interval));
});

uint64_t flush_interval =
sink_.runtime().snapshot().getInteger("tracing.lightstep.flush_interval_ms", 1000U);
flush_timer_->enableTimer(std::chrono::milliseconds(flush_interval));
}

void LightStepRecorder::RecordSpan(lightstep::collector::Span&& span) {
builder_.addSpan(std::move(span));

uint64_t min_flush_spans =
sink_.runtime().snapshot().getInteger("tracing.lightstep.min_flush_spans", 5U);
if (builder_.pendingSpans() == min_flush_spans) {
sink_.tracerStats().spans_sent_.add(min_flush_spans);
flushSpans();
}
}

bool LightStepRecorder::FlushWithTimeout(lightstep::Duration) {
// Note: We don't expect this to be called, since the Tracer
// reference is private to its LightStepSink.
return true;
}

std::unique_ptr<lightstep::Recorder>
LightStepRecorder::NewInstance(LightStepSink& sink, Event::Dispatcher& dispatcher,
const lightstep::TracerImpl& tracer) {
return std::unique_ptr<lightstep::Recorder>(new LightStepRecorder(tracer, sink, dispatcher));
}

void LightStepRecorder::flushSpans() {
if (builder_.pendingSpans() != 0) {
sink_.tracerStats().spans_sent_.add(builder_.pendingSpans());
lightstep::collector::ReportRequest request;
std::swap(request, builder_.pending());

Expand All @@ -133,29 +164,24 @@ void LightStepRecorder::RecordSpan(lightstep::collector::Span&& span) {

message->body(Grpc::Common::serializeBody(std::move(request)));

uint64_t timeout =
sink_.runtime().snapshot().getInteger("tracing.lightstep.request_timeout", 5000U);
sink_.clusterManager()
.httpAsyncClientForCluster(sink_.collectorCluster())
.send(std::move(message), *this, std::chrono::milliseconds(5000));
.send(std::move(message), *this, std::chrono::milliseconds(timeout));
}
}

bool LightStepRecorder::FlushWithTimeout(lightstep::Duration) {
// Note: We don't expect this to be called, since the Tracer
// reference is private to its LightStepSink.
return true;
}

std::unique_ptr<lightstep::Recorder>
LightStepRecorder::NewInstance(LightStepSink& sink, const lightstep::TracerImpl& tracer) {
return std::unique_ptr<lightstep::Recorder>(new LightStepRecorder(tracer, sink));
}
LightStepSink::TlsLightStepTracer::TlsLightStepTracer(lightstep::Tracer tracer, LightStepSink& sink)
: tracer_(tracer), sink_(sink) {}

LightStepSink::LightStepSink(const Json::Object& config, Upstream::ClusterManager& cluster_manager,
Stats::Store& stats, const std::string& service_node,
ThreadLocal::Instance& tls, Runtime::Loader& runtime,
Event::Dispatcher& dispatcher, Stats::Store& stats,
const std::string& service_node, ThreadLocal::Instance& tls,
Runtime::Loader& runtime,
std::unique_ptr<lightstep::TracerOptions> options)
: collector_cluster_(config.getString("collector_cluster")), cm_(cluster_manager),
stats_store_(stats),
dispatcher_(dispatcher), stats_store_(stats),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete/don't need any more

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

tracer_stats_{LIGHTSTEP_TRACER_STATS(POOL_COUNTER_PREFIX(stats, "tracing.lightstep."))},
service_node_(service_node), tls_(tls), runtime_(runtime), options_(std::move(options)),
tls_slot_(tls.allocateSlot()) {
Expand All @@ -169,10 +195,10 @@ LightStepSink::LightStepSink(const Json::Object& config, Upstream::ClusterManage
fmt::format("{} collector cluster must support http2 for gRPC calls", collector_cluster_));
}

tls_.set(tls_slot_, [this](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectPtr {
tls_.set(tls_slot_, [this](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectPtr {
lightstep::Tracer tracer(lightstep::NewUserDefinedTransportLightStepTracer(
*options_,
std::bind(&LightStepRecorder::NewInstance, std::ref(*this), std::placeholders::_1)));
*options_, std::bind(&LightStepRecorder::NewInstance, std::ref(*this), std::ref(dispatcher),
std::placeholders::_1)));

return ThreadLocal::ThreadLocalObjectPtr{new TlsLightStepTracer(std::move(tracer), *this)};
});
Expand Down
20 changes: 14 additions & 6 deletions source/common/tracing/http_tracer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ struct HttpTracerStats {
HTTP_TRACER_STATS(GENERATE_COUNTER_STRUCT)
};

#define LIGHTSTEP_TRACER_STATS(COUNTER) COUNTER(spans_sent)
#define LIGHTSTEP_TRACER_STATS(COUNTER) \
COUNTER(spans_sent) \
COUNTER(timer_flushed)

struct LightstepTracerStats {
LIGHTSTEP_TRACER_STATS(GENERATE_COUNTER_STRUCT)
Expand Down Expand Up @@ -98,8 +100,9 @@ class HttpTracerImpl : public HttpTracer {
class LightStepSink : public HttpSink {
public:
LightStepSink(const Json::Object& config, Upstream::ClusterManager& cluster_manager,
Stats::Store& stats, const std::string& service_node, ThreadLocal::Instance& tls,
Runtime::Loader& runtime, std::unique_ptr<lightstep::TracerOptions> options);
Event::Dispatcher& dispatcher, Stats::Store& stats, const std::string& service_node,
ThreadLocal::Instance& tls, Runtime::Loader& runtime,
std::unique_ptr<lightstep::TracerOptions> options);

// Tracer::HttpSink
void flushTrace(const Http::HeaderMap& request_headers, const Http::HeaderMap& response_headers,
Expand All @@ -110,14 +113,14 @@ class LightStepSink : public HttpSink {
Runtime::Loader& runtime() { return runtime_; }
Stats::Store& statsStore() { return stats_store_; }
LightstepTracerStats& tracerStats() { return tracer_stats_; }
Event::Dispatcher& dispatcher() { return dispatcher_; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


static const std::string LIGHTSTEP_SERVICE;
static const std::string LIGHTSTEP_METHOD;

private:
struct TlsLightStepTracer : ThreadLocal::ThreadLocalObject {
TlsLightStepTracer(lightstep::Tracer tracer, LightStepSink& sink)
: tracer_(tracer), sink_(sink) {}
TlsLightStepTracer(lightstep::Tracer tracer, LightStepSink& sink);

void shutdown() override {}

Expand All @@ -131,6 +134,7 @@ class LightStepSink : public HttpSink {

const std::string collector_cluster_;
Upstream::ClusterManager& cm_;
Event::Dispatcher& dispatcher_;
Stats::Store& stats_store_;
LightstepTracerStats tracer_stats_;
const std::string service_node_;
Expand All @@ -142,7 +146,8 @@ class LightStepSink : public HttpSink {

class LightStepRecorder : public lightstep::Recorder, Http::AsyncClient::Callbacks {
public:
LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink);
LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink,
Event::Dispatcher& dispatcher);

// lightstep::Recorder
void RecordSpan(lightstep::collector::Span&& span) override;
Expand All @@ -152,12 +157,15 @@ class LightStepRecorder : public lightstep::Recorder, Http::AsyncClient::Callbac
void onSuccess(Http::MessagePtr&&) override;
void onFailure(Http::AsyncClient::FailureReason) override;

void flushSpans();
static std::unique_ptr<lightstep::Recorder> NewInstance(LightStepSink& sink,
Event::Dispatcher& dispatcher,
const lightstep::TracerImpl& tracer);

private:
lightstep::ReportBuilder builder_;
LightStepSink& sink_;
Event::TimerPtr flush_timer_;
};

} // Tracing
2 changes: 1 addition & 1 deletion source/server/configuration_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ void MainImpl::initializeTracers(const Json::Object& tracing_configuration_) {
opts->guid_generator = [&rand]() { return rand.random(); };

http_tracer_->addSink(Tracing::HttpSinkPtr{new Tracing::LightStepSink(
sink.getObject("config"), *cluster_manager_, server_.stats(),
sink.getObject("config"), *cluster_manager_, server_.dispatcher(), server_.stats(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

server_.options().serviceNodeName(), server_.threadLocal(), server_.runtime(),
std::move(opts))});
} else {
Expand Down
72 changes: 55 additions & 17 deletions test/common/tracing/http_tracer_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,21 @@ TEST(HttpNullTracerTest, NoFailures) {

class LightStepSinkTest : public Test {
public:
void setup(Json::Object& config) {
void setup(Json::Object& config, bool init_timer) {
std::unique_ptr<lightstep::TracerOptions> opts(new lightstep::TracerOptions());
opts->access_token = "sample_token";
opts->tracer_attributes["lightstep.guid"] = "random_guid";
opts->tracer_attributes["lightstep.component_name"] = "component";

sink_.reset(
new LightStepSink(config, cm_, stats_, "service_node", tls_, runtime_, std::move(opts)));
ON_CALL(cm_, httpAsyncClientForCluster("lightstep_saas"))
.WillByDefault(ReturnRef(cm_.async_client_));

if (init_timer) {
timer_ = new NiceMock<Event::MockTimer>(&tls_.dispatcher_);
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(1000)));
}

sink_.reset(new LightStepSink(config, cm_, tls_.dispatcher_, stats_, "service_node", tls_,
runtime_, std::move(opts)));
}

void setupValidSink() {
Expand All @@ -291,17 +298,18 @@ class LightStepSinkTest : public Test {
)EOF";
Json::StringLoader loader(valid_config);

setup(loader);
setup(loader, true);
}

const Http::HeaderMapImpl empty_header_{};

std::unique_ptr<LightStepSink> sink_;
NiceMock<Event::MockTimer>* timer_;
Stats::IsolatedStoreImpl stats_;
NiceMock<Upstream::MockClusterManager> cm_;
NiceMock<Upstream::MockCluster> cluster_;
NiceMock<Runtime::MockRandomGenerator> random_;
NiceMock<Runtime::MockLoader> runtime_;
std::unique_ptr<LightStepSink> sink_;
NiceMock<ThreadLocal::MockInstance> tls_;
};

Expand All @@ -312,14 +320,14 @@ TEST_F(LightStepSinkTest, InitializeSink) {
)EOF";
Json::StringLoader loader(invalid_config);

EXPECT_THROW(setup(loader), EnvoyException);
EXPECT_THROW(setup(loader, false), EnvoyException);
}

{
std::string empty_config = "{}";
Json::StringLoader loader(empty_config);

EXPECT_THROW(setup(loader), EnvoyException);
EXPECT_THROW(setup(loader, false), EnvoyException);
}

{
Expand All @@ -331,7 +339,7 @@ TEST_F(LightStepSinkTest, InitializeSink) {
)EOF";
Json::StringLoader loader(valid_config);

EXPECT_THROW(setup(loader), EnvoyException);
EXPECT_THROW(setup(loader, false), EnvoyException);
}

{
Expand All @@ -344,7 +352,7 @@ TEST_F(LightStepSinkTest, InitializeSink) {
)EOF";
Json::StringLoader loader(valid_config);

EXPECT_THROW(setup(loader), EnvoyException);
EXPECT_THROW(setup(loader, false), EnvoyException);
}

{
Expand All @@ -356,17 +364,14 @@ TEST_F(LightStepSinkTest, InitializeSink) {
)EOF";
Json::StringLoader loader(valid_config);

setup(loader);
setup(loader, true);
}
}

TEST_F(LightStepSinkTest, FlushSeveralSpans) {
setupValidSink();

NiceMock<Http::AccessLog::MockRequestInfo> request_info;
EXPECT_CALL(cm_, httpAsyncClientForCluster("lightstep_saas"))
.WillOnce(ReturnRef(cm_.async_client_));

Http::MockAsyncClientRequest request(&cm_.async_client_);
Http::AsyncClient::Callbacks* callback;
const Optional<std::chrono::milliseconds> timeout(std::chrono::seconds(5));
Expand Down Expand Up @@ -400,6 +405,8 @@ TEST_F(LightStepSinkTest, FlushSeveralSpans) {
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.min_flush_spans", 5))
.Times(2)
.WillRepeatedly(Return(2));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.request_timeout", 5000U))
.WillOnce(Return(5000U));

sink_->flushTrace(empty_header_, empty_header_, request_info);
sink_->flushTrace(empty_header_, empty_header_, request_info);
Expand Down Expand Up @@ -434,13 +441,42 @@ TEST_F(LightStepSinkTest, FlushSeveralSpans) {
EXPECT_EQ(2U, stats_.counter("tracing.lightstep.spans_sent").value());
}

TEST_F(LightStepSinkTest, FlushOneSpanGrpcFailure) {
TEST_F(LightStepSinkTest, FlushSpansTimer) {
setupValidSink();

NiceMock<Http::AccessLog::MockRequestInfo> request_info;
EXPECT_CALL(cm_, httpAsyncClientForCluster("lightstep_saas"))
.WillOnce(ReturnRef(cm_.async_client_));

const Optional<std::chrono::milliseconds> timeout(std::chrono::seconds(5));
EXPECT_CALL(cm_.async_client_, send_(_, _, timeout));

SystemTime start_time;
EXPECT_CALL(request_info, startTime()).WillOnce(Return(start_time));
Optional<uint32_t> code(200);
EXPECT_CALL(request_info, responseCode()).Times(2).WillRepeatedly(ReturnRef(code));

const std::string protocol = "http/1";
EXPECT_CALL(request_info, protocol()).WillOnce(ReturnRef(protocol));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.min_flush_spans", 5))
.WillOnce(Return(5));

sink_->flushTrace(empty_header_, empty_header_, request_info);
// Timer should be re-enabled.
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(1000)));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.request_timeout", 5000U))
.WillOnce(Return(5000U));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.flush_interval_ms", 1000U))
.WillOnce(Return(1000U));

timer_->callback_();

EXPECT_EQ(1U, stats_.counter("tracing.lightstep.timer_flushed").value());
EXPECT_EQ(1U, stats_.counter("tracing.lightstep.spans_sent").value());
}

TEST_F(LightStepSinkTest, FlushOneSpanGrpcFailure) {
setupValidSink();

NiceMock<Http::AccessLog::MockRequestInfo> request_info;
Http::MockAsyncClientRequest request(&cm_.async_client_);
Http::AsyncClient::Callbacks* callback;
const Optional<std::chrono::milliseconds> timeout(std::chrono::seconds(5));
Expand Down Expand Up @@ -468,6 +504,8 @@ TEST_F(LightStepSinkTest, FlushOneSpanGrpcFailure) {
EXPECT_CALL(request_info, protocol()).WillOnce(ReturnRef(protocol));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.min_flush_spans", 5))
.WillOnce(Return(1));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.request_timeout", 5000U))
.WillOnce(Return(5000U));

sink_->flushTrace(empty_header_, empty_header_, request_info);

Expand Down
2 changes: 1 addition & 1 deletion test/mocks/event/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class MockTimer : public Timer {

// Event::Timer
MOCK_METHOD0(disableTimer, void());
MOCK_METHOD1(enableTimer, void(const std::chrono::milliseconds& d));
MOCK_METHOD1(enableTimer, void(const std::chrono::milliseconds&));

TimerCb callback_;
};
Expand Down