Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform lightstep flushing on timer. #119

Merged
merged 6 commits into from
Oct 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 44 additions & 17 deletions source/common/tracing/http_tracer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,49 @@ void HttpTracerImpl::populateStats(const Decision& decision) {
}
}

LightStepRecorder::LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink)
: builder_(tracer), sink_(sink) {}
LightStepRecorder::LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink,
Event::Dispatcher& dispatcher)
: builder_(tracer), sink_(sink) {
flush_timer_ = dispatcher.createTimer([this]() -> void {
sink_.tracerStats().timer_flushed_.inc();
flushSpans();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check pending spans != 0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

enableTimer();
});

enableTimer();
}

void LightStepRecorder::RecordSpan(lightstep::collector::Span&& span) {
builder_.addSpan(std::move(span));

uint64_t min_flush_spans =
sink_.runtime().snapshot().getInteger("tracing.lightstep.min_flush_spans", 5U);
if (builder_.pendingSpans() == min_flush_spans) {
sink_.tracerStats().spans_sent_.add(min_flush_spans);
flushSpans();
}
}

bool LightStepRecorder::FlushWithTimeout(lightstep::Duration) {
// Note: We don't expect this to be called, since the Tracer
// reference is private to its LightStepSink.
return true;
}

std::unique_ptr<lightstep::Recorder>
LightStepRecorder::NewInstance(LightStepSink& sink, Event::Dispatcher& dispatcher,
const lightstep::TracerImpl& tracer) {
return std::unique_ptr<lightstep::Recorder>(new LightStepRecorder(tracer, sink, dispatcher));
}

void LightStepRecorder::enableTimer() {
uint64_t flush_interval =
sink_.runtime().snapshot().getInteger("tracing.lightstep.flush_interval_ms", 1000U);
flush_timer_->enableTimer(std::chrono::milliseconds(flush_interval));
}

void LightStepRecorder::flushSpans() {
if (builder_.pendingSpans() != 0) {
sink_.tracerStats().spans_sent_.add(builder_.pendingSpans());
lightstep::collector::ReportRequest request;
std::swap(request, builder_.pending());

Expand All @@ -133,22 +166,16 @@ void LightStepRecorder::RecordSpan(lightstep::collector::Span&& span) {

message->body(Grpc::Common::serializeBody(std::move(request)));

uint64_t timeout =
sink_.runtime().snapshot().getInteger("tracing.lightstep.request_timeout", 5000U);
sink_.clusterManager()
.httpAsyncClientForCluster(sink_.collectorCluster())
.send(std::move(message), *this, std::chrono::milliseconds(5000));
.send(std::move(message), *this, std::chrono::milliseconds(timeout));
}
}

bool LightStepRecorder::FlushWithTimeout(lightstep::Duration) {
// Note: We don't expect this to be called, since the Tracer
// reference is private to its LightStepSink.
return true;
}

std::unique_ptr<lightstep::Recorder>
LightStepRecorder::NewInstance(LightStepSink& sink, const lightstep::TracerImpl& tracer) {
return std::unique_ptr<lightstep::Recorder>(new LightStepRecorder(tracer, sink));
}
LightStepSink::TlsLightStepTracer::TlsLightStepTracer(lightstep::Tracer tracer, LightStepSink& sink)
: tracer_(tracer), sink_(sink) {}

LightStepSink::LightStepSink(const Json::Object& config, Upstream::ClusterManager& cluster_manager,
Stats::Store& stats, const std::string& service_node,
Expand All @@ -169,10 +196,10 @@ LightStepSink::LightStepSink(const Json::Object& config, Upstream::ClusterManage
fmt::format("{} collector cluster must support http2 for gRPC calls", collector_cluster_));
}

tls_.set(tls_slot_, [this](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectPtr {
tls_.set(tls_slot_, [this](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectPtr {
lightstep::Tracer tracer(lightstep::NewUserDefinedTransportLightStepTracer(
*options_,
std::bind(&LightStepRecorder::NewInstance, std::ref(*this), std::placeholders::_1)));
*options_, std::bind(&LightStepRecorder::NewInstance, std::ref(*this), std::ref(dispatcher),
std::placeholders::_1)));

return ThreadLocal::ThreadLocalObjectPtr{new TlsLightStepTracer(std::move(tracer), *this)};
});
Expand Down
15 changes: 11 additions & 4 deletions source/common/tracing/http_tracer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ struct HttpTracerStats {
HTTP_TRACER_STATS(GENERATE_COUNTER_STRUCT)
};

#define LIGHTSTEP_TRACER_STATS(COUNTER) COUNTER(spans_sent)
#define LIGHTSTEP_TRACER_STATS(COUNTER) \
COUNTER(spans_sent) \
COUNTER(timer_flushed)

struct LightstepTracerStats {
LIGHTSTEP_TRACER_STATS(GENERATE_COUNTER_STRUCT)
Expand Down Expand Up @@ -116,8 +118,7 @@ class LightStepSink : public HttpSink {

private:
struct TlsLightStepTracer : ThreadLocal::ThreadLocalObject {
TlsLightStepTracer(lightstep::Tracer tracer, LightStepSink& sink)
: tracer_(tracer), sink_(sink) {}
TlsLightStepTracer(lightstep::Tracer tracer, LightStepSink& sink);

void shutdown() override {}

Expand All @@ -142,7 +143,8 @@ class LightStepSink : public HttpSink {

class LightStepRecorder : public lightstep::Recorder, Http::AsyncClient::Callbacks {
public:
LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink);
LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink,
Event::Dispatcher& dispatcher);

// lightstep::Recorder
void RecordSpan(lightstep::collector::Span&& span) override;
Expand All @@ -153,11 +155,16 @@ class LightStepRecorder : public lightstep::Recorder, Http::AsyncClient::Callbac
void onFailure(Http::AsyncClient::FailureReason) override;

static std::unique_ptr<lightstep::Recorder> NewInstance(LightStepSink& sink,
Event::Dispatcher& dispatcher,
const lightstep::TracerImpl& tracer);

private:
void enableTimer();
void flushSpans();

lightstep::ReportBuilder builder_;
LightStepSink& sink_;
Event::TimerPtr flush_timer_;
};

} // Tracing
68 changes: 53 additions & 15 deletions test/common/tracing/http_tracer_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,19 @@ TEST(HttpNullTracerTest, NoFailures) {

class LightStepSinkTest : public Test {
public:
void setup(Json::Object& config) {
void setup(Json::Object& config, bool init_timer) {
std::unique_ptr<lightstep::TracerOptions> opts(new lightstep::TracerOptions());
opts->access_token = "sample_token";
opts->tracer_attributes["lightstep.guid"] = "random_guid";
opts->tracer_attributes["lightstep.component_name"] = "component";

ON_CALL(cm_, httpAsyncClientForCluster("lightstep_saas"))
.WillByDefault(ReturnRef(cm_.async_client_));

if (init_timer) {
timer_ = new NiceMock<Event::MockTimer>(&tls_.dispatcher_);
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(1000)));
}

sink_.reset(
new LightStepSink(config, cm_, stats_, "service_node", tls_, runtime_, std::move(opts)));
}
Expand All @@ -291,17 +298,18 @@ class LightStepSinkTest : public Test {
)EOF";
Json::StringLoader loader(valid_config);

setup(loader);
setup(loader, true);
}

const Http::HeaderMapImpl empty_header_{};

std::unique_ptr<LightStepSink> sink_;
NiceMock<Event::MockTimer>* timer_;
Stats::IsolatedStoreImpl stats_;
NiceMock<Upstream::MockClusterManager> cm_;
NiceMock<Upstream::MockCluster> cluster_;
NiceMock<Runtime::MockRandomGenerator> random_;
NiceMock<Runtime::MockLoader> runtime_;
std::unique_ptr<LightStepSink> sink_;
NiceMock<ThreadLocal::MockInstance> tls_;
};

Expand All @@ -312,14 +320,14 @@ TEST_F(LightStepSinkTest, InitializeSink) {
)EOF";
Json::StringLoader loader(invalid_config);

EXPECT_THROW(setup(loader), EnvoyException);
EXPECT_THROW(setup(loader, false), EnvoyException);
}

{
std::string empty_config = "{}";
Json::StringLoader loader(empty_config);

EXPECT_THROW(setup(loader), EnvoyException);
EXPECT_THROW(setup(loader, false), EnvoyException);
}

{
Expand All @@ -331,7 +339,7 @@ TEST_F(LightStepSinkTest, InitializeSink) {
)EOF";
Json::StringLoader loader(valid_config);

EXPECT_THROW(setup(loader), EnvoyException);
EXPECT_THROW(setup(loader, false), EnvoyException);
}

{
Expand All @@ -344,7 +352,7 @@ TEST_F(LightStepSinkTest, InitializeSink) {
)EOF";
Json::StringLoader loader(valid_config);

EXPECT_THROW(setup(loader), EnvoyException);
EXPECT_THROW(setup(loader, false), EnvoyException);
}

{
Expand All @@ -356,17 +364,14 @@ TEST_F(LightStepSinkTest, InitializeSink) {
)EOF";
Json::StringLoader loader(valid_config);

setup(loader);
setup(loader, true);
}
}

TEST_F(LightStepSinkTest, FlushSeveralSpans) {
setupValidSink();

NiceMock<Http::AccessLog::MockRequestInfo> request_info;
EXPECT_CALL(cm_, httpAsyncClientForCluster("lightstep_saas"))
.WillOnce(ReturnRef(cm_.async_client_));

Http::MockAsyncClientRequest request(&cm_.async_client_);
Http::AsyncClient::Callbacks* callback;
const Optional<std::chrono::milliseconds> timeout(std::chrono::seconds(5));
Expand Down Expand Up @@ -400,6 +405,8 @@ TEST_F(LightStepSinkTest, FlushSeveralSpans) {
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.min_flush_spans", 5))
.Times(2)
.WillRepeatedly(Return(2));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.request_timeout", 5000U))
.WillOnce(Return(5000U));

sink_->flushTrace(empty_header_, empty_header_, request_info);
sink_->flushTrace(empty_header_, empty_header_, request_info);
Expand Down Expand Up @@ -434,13 +441,42 @@ TEST_F(LightStepSinkTest, FlushSeveralSpans) {
EXPECT_EQ(2U, stats_.counter("tracing.lightstep.spans_sent").value());
}

TEST_F(LightStepSinkTest, FlushOneSpanGrpcFailure) {
TEST_F(LightStepSinkTest, FlushSpansTimer) {
setupValidSink();

NiceMock<Http::AccessLog::MockRequestInfo> request_info;
EXPECT_CALL(cm_, httpAsyncClientForCluster("lightstep_saas"))
.WillOnce(ReturnRef(cm_.async_client_));

const Optional<std::chrono::milliseconds> timeout(std::chrono::seconds(5));
EXPECT_CALL(cm_.async_client_, send_(_, _, timeout));

SystemTime start_time;
EXPECT_CALL(request_info, startTime()).WillOnce(Return(start_time));
Optional<uint32_t> code(200);
EXPECT_CALL(request_info, responseCode()).Times(2).WillRepeatedly(ReturnRef(code));

const std::string protocol = "http/1";
EXPECT_CALL(request_info, protocol()).WillOnce(ReturnRef(protocol));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.min_flush_spans", 5))
.WillOnce(Return(5));

sink_->flushTrace(empty_header_, empty_header_, request_info);
// Timer should be re-enabled.
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(1000)));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.request_timeout", 5000U))
.WillOnce(Return(5000U));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.flush_interval_ms", 1000U))
.WillOnce(Return(1000U));

timer_->callback_();

EXPECT_EQ(1U, stats_.counter("tracing.lightstep.timer_flushed").value());
EXPECT_EQ(1U, stats_.counter("tracing.lightstep.spans_sent").value());
}

TEST_F(LightStepSinkTest, FlushOneSpanGrpcFailure) {
setupValidSink();

NiceMock<Http::AccessLog::MockRequestInfo> request_info;
Http::MockAsyncClientRequest request(&cm_.async_client_);
Http::AsyncClient::Callbacks* callback;
const Optional<std::chrono::milliseconds> timeout(std::chrono::seconds(5));
Expand Down Expand Up @@ -468,6 +504,8 @@ TEST_F(LightStepSinkTest, FlushOneSpanGrpcFailure) {
EXPECT_CALL(request_info, protocol()).WillOnce(ReturnRef(protocol));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.min_flush_spans", 5))
.WillOnce(Return(1));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.request_timeout", 5000U))
.WillOnce(Return(5000U));

sink_->flushTrace(empty_header_, empty_header_, request_info);

Expand Down
2 changes: 1 addition & 1 deletion test/mocks/event/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class MockTimer : public Timer {

// Event::Timer
MOCK_METHOD0(disableTimer, void());
MOCK_METHOD1(enableTimer, void(const std::chrono::milliseconds& d));
MOCK_METHOD1(enableTimer, void(const std::chrono::milliseconds&));

TimerCb callback_;
};
Expand Down