Skip to content

Commit

Permalink
Perform lightstep flushing on timer. (#119)
Browse files Browse the repository at this point in the history
  • Loading branch information
RomanDzhabarov authored Oct 5, 2016
1 parent 6591604 commit 6aaf128
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 37 deletions.
61 changes: 44 additions & 17 deletions source/common/tracing/http_tracer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,49 @@ void HttpTracerImpl::populateStats(const Decision& decision) {
}
}

LightStepRecorder::LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink)
: builder_(tracer), sink_(sink) {}
LightStepRecorder::LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink,
Event::Dispatcher& dispatcher)
: builder_(tracer), sink_(sink) {
flush_timer_ = dispatcher.createTimer([this]() -> void {
sink_.tracerStats().timer_flushed_.inc();
flushSpans();
enableTimer();
});

enableTimer();
}

void LightStepRecorder::RecordSpan(lightstep::collector::Span&& span) {
builder_.addSpan(std::move(span));

uint64_t min_flush_spans =
sink_.runtime().snapshot().getInteger("tracing.lightstep.min_flush_spans", 5U);
if (builder_.pendingSpans() == min_flush_spans) {
sink_.tracerStats().spans_sent_.add(min_flush_spans);
flushSpans();
}
}

bool LightStepRecorder::FlushWithTimeout(lightstep::Duration) {
// Note: We don't expect this to be called, since the Tracer
// reference is private to its LightStepSink.
return true;
}

std::unique_ptr<lightstep::Recorder>
LightStepRecorder::NewInstance(LightStepSink& sink, Event::Dispatcher& dispatcher,
const lightstep::TracerImpl& tracer) {
return std::unique_ptr<lightstep::Recorder>(new LightStepRecorder(tracer, sink, dispatcher));
}

void LightStepRecorder::enableTimer() {
uint64_t flush_interval =
sink_.runtime().snapshot().getInteger("tracing.lightstep.flush_interval_ms", 1000U);
flush_timer_->enableTimer(std::chrono::milliseconds(flush_interval));
}

void LightStepRecorder::flushSpans() {
if (builder_.pendingSpans() != 0) {
sink_.tracerStats().spans_sent_.add(builder_.pendingSpans());
lightstep::collector::ReportRequest request;
std::swap(request, builder_.pending());

Expand All @@ -133,22 +166,16 @@ void LightStepRecorder::RecordSpan(lightstep::collector::Span&& span) {

message->body(Grpc::Common::serializeBody(std::move(request)));

uint64_t timeout =
sink_.runtime().snapshot().getInteger("tracing.lightstep.request_timeout", 5000U);
sink_.clusterManager()
.httpAsyncClientForCluster(sink_.collectorCluster())
.send(std::move(message), *this, std::chrono::milliseconds(5000));
.send(std::move(message), *this, std::chrono::milliseconds(timeout));
}
}

bool LightStepRecorder::FlushWithTimeout(lightstep::Duration) {
// Note: We don't expect this to be called, since the Tracer
// reference is private to its LightStepSink.
return true;
}

std::unique_ptr<lightstep::Recorder>
LightStepRecorder::NewInstance(LightStepSink& sink, const lightstep::TracerImpl& tracer) {
return std::unique_ptr<lightstep::Recorder>(new LightStepRecorder(tracer, sink));
}
LightStepSink::TlsLightStepTracer::TlsLightStepTracer(lightstep::Tracer tracer, LightStepSink& sink)
: tracer_(tracer), sink_(sink) {}

LightStepSink::LightStepSink(const Json::Object& config, Upstream::ClusterManager& cluster_manager,
Stats::Store& stats, const std::string& service_node,
Expand All @@ -169,10 +196,10 @@ LightStepSink::LightStepSink(const Json::Object& config, Upstream::ClusterManage
fmt::format("{} collector cluster must support http2 for gRPC calls", collector_cluster_));
}

tls_.set(tls_slot_, [this](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectPtr {
tls_.set(tls_slot_, [this](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectPtr {
lightstep::Tracer tracer(lightstep::NewUserDefinedTransportLightStepTracer(
*options_,
std::bind(&LightStepRecorder::NewInstance, std::ref(*this), std::placeholders::_1)));
*options_, std::bind(&LightStepRecorder::NewInstance, std::ref(*this), std::ref(dispatcher),
std::placeholders::_1)));

return ThreadLocal::ThreadLocalObjectPtr{new TlsLightStepTracer(std::move(tracer), *this)};
});
Expand Down
15 changes: 11 additions & 4 deletions source/common/tracing/http_tracer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ struct HttpTracerStats {
HTTP_TRACER_STATS(GENERATE_COUNTER_STRUCT)
};

#define LIGHTSTEP_TRACER_STATS(COUNTER) COUNTER(spans_sent)
#define LIGHTSTEP_TRACER_STATS(COUNTER) \
COUNTER(spans_sent) \
COUNTER(timer_flushed)

struct LightstepTracerStats {
LIGHTSTEP_TRACER_STATS(GENERATE_COUNTER_STRUCT)
Expand Down Expand Up @@ -116,8 +118,7 @@ class LightStepSink : public HttpSink {

private:
struct TlsLightStepTracer : ThreadLocal::ThreadLocalObject {
TlsLightStepTracer(lightstep::Tracer tracer, LightStepSink& sink)
: tracer_(tracer), sink_(sink) {}
TlsLightStepTracer(lightstep::Tracer tracer, LightStepSink& sink);

void shutdown() override {}

Expand All @@ -142,7 +143,8 @@ class LightStepSink : public HttpSink {

class LightStepRecorder : public lightstep::Recorder, Http::AsyncClient::Callbacks {
public:
LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink);
LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink,
Event::Dispatcher& dispatcher);

// lightstep::Recorder
void RecordSpan(lightstep::collector::Span&& span) override;
Expand All @@ -153,11 +155,16 @@ class LightStepRecorder : public lightstep::Recorder, Http::AsyncClient::Callbac
void onFailure(Http::AsyncClient::FailureReason) override;

static std::unique_ptr<lightstep::Recorder> NewInstance(LightStepSink& sink,
Event::Dispatcher& dispatcher,
const lightstep::TracerImpl& tracer);

private:
void enableTimer();
void flushSpans();

lightstep::ReportBuilder builder_;
LightStepSink& sink_;
Event::TimerPtr flush_timer_;
};

} // Tracing
68 changes: 53 additions & 15 deletions test/common/tracing/http_tracer_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,19 @@ TEST(HttpNullTracerTest, NoFailures) {

class LightStepSinkTest : public Test {
public:
void setup(Json::Object& config) {
void setup(Json::Object& config, bool init_timer) {
std::unique_ptr<lightstep::TracerOptions> opts(new lightstep::TracerOptions());
opts->access_token = "sample_token";
opts->tracer_attributes["lightstep.guid"] = "random_guid";
opts->tracer_attributes["lightstep.component_name"] = "component";

ON_CALL(cm_, httpAsyncClientForCluster("lightstep_saas"))
.WillByDefault(ReturnRef(cm_.async_client_));

if (init_timer) {
timer_ = new NiceMock<Event::MockTimer>(&tls_.dispatcher_);
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(1000)));
}

sink_.reset(
new LightStepSink(config, cm_, stats_, "service_node", tls_, runtime_, std::move(opts)));
}
Expand All @@ -291,17 +298,18 @@ class LightStepSinkTest : public Test {
)EOF";
Json::StringLoader loader(valid_config);

setup(loader);
setup(loader, true);
}

const Http::HeaderMapImpl empty_header_{};

std::unique_ptr<LightStepSink> sink_;
NiceMock<Event::MockTimer>* timer_;
Stats::IsolatedStoreImpl stats_;
NiceMock<Upstream::MockClusterManager> cm_;
NiceMock<Upstream::MockCluster> cluster_;
NiceMock<Runtime::MockRandomGenerator> random_;
NiceMock<Runtime::MockLoader> runtime_;
std::unique_ptr<LightStepSink> sink_;
NiceMock<ThreadLocal::MockInstance> tls_;
};

Expand All @@ -312,14 +320,14 @@ TEST_F(LightStepSinkTest, InitializeSink) {
)EOF";
Json::StringLoader loader(invalid_config);

EXPECT_THROW(setup(loader), EnvoyException);
EXPECT_THROW(setup(loader, false), EnvoyException);
}

{
std::string empty_config = "{}";
Json::StringLoader loader(empty_config);

EXPECT_THROW(setup(loader), EnvoyException);
EXPECT_THROW(setup(loader, false), EnvoyException);
}

{
Expand All @@ -331,7 +339,7 @@ TEST_F(LightStepSinkTest, InitializeSink) {
)EOF";
Json::StringLoader loader(valid_config);

EXPECT_THROW(setup(loader), EnvoyException);
EXPECT_THROW(setup(loader, false), EnvoyException);
}

{
Expand All @@ -344,7 +352,7 @@ TEST_F(LightStepSinkTest, InitializeSink) {
)EOF";
Json::StringLoader loader(valid_config);

EXPECT_THROW(setup(loader), EnvoyException);
EXPECT_THROW(setup(loader, false), EnvoyException);
}

{
Expand All @@ -356,17 +364,14 @@ TEST_F(LightStepSinkTest, InitializeSink) {
)EOF";
Json::StringLoader loader(valid_config);

setup(loader);
setup(loader, true);
}
}

TEST_F(LightStepSinkTest, FlushSeveralSpans) {
setupValidSink();

NiceMock<Http::AccessLog::MockRequestInfo> request_info;
EXPECT_CALL(cm_, httpAsyncClientForCluster("lightstep_saas"))
.WillOnce(ReturnRef(cm_.async_client_));

Http::MockAsyncClientRequest request(&cm_.async_client_);
Http::AsyncClient::Callbacks* callback;
const Optional<std::chrono::milliseconds> timeout(std::chrono::seconds(5));
Expand Down Expand Up @@ -400,6 +405,8 @@ TEST_F(LightStepSinkTest, FlushSeveralSpans) {
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.min_flush_spans", 5))
.Times(2)
.WillRepeatedly(Return(2));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.request_timeout", 5000U))
.WillOnce(Return(5000U));

sink_->flushTrace(empty_header_, empty_header_, request_info);
sink_->flushTrace(empty_header_, empty_header_, request_info);
Expand Down Expand Up @@ -434,13 +441,42 @@ TEST_F(LightStepSinkTest, FlushSeveralSpans) {
EXPECT_EQ(2U, stats_.counter("tracing.lightstep.spans_sent").value());
}

TEST_F(LightStepSinkTest, FlushOneSpanGrpcFailure) {
TEST_F(LightStepSinkTest, FlushSpansTimer) {
setupValidSink();

NiceMock<Http::AccessLog::MockRequestInfo> request_info;
EXPECT_CALL(cm_, httpAsyncClientForCluster("lightstep_saas"))
.WillOnce(ReturnRef(cm_.async_client_));

const Optional<std::chrono::milliseconds> timeout(std::chrono::seconds(5));
EXPECT_CALL(cm_.async_client_, send_(_, _, timeout));

SystemTime start_time;
EXPECT_CALL(request_info, startTime()).WillOnce(Return(start_time));
Optional<uint32_t> code(200);
EXPECT_CALL(request_info, responseCode()).Times(2).WillRepeatedly(ReturnRef(code));

const std::string protocol = "http/1";
EXPECT_CALL(request_info, protocol()).WillOnce(ReturnRef(protocol));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.min_flush_spans", 5))
.WillOnce(Return(5));

sink_->flushTrace(empty_header_, empty_header_, request_info);
// Timer should be re-enabled.
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(1000)));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.request_timeout", 5000U))
.WillOnce(Return(5000U));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.flush_interval_ms", 1000U))
.WillOnce(Return(1000U));

timer_->callback_();

EXPECT_EQ(1U, stats_.counter("tracing.lightstep.timer_flushed").value());
EXPECT_EQ(1U, stats_.counter("tracing.lightstep.spans_sent").value());
}

TEST_F(LightStepSinkTest, FlushOneSpanGrpcFailure) {
setupValidSink();

NiceMock<Http::AccessLog::MockRequestInfo> request_info;
Http::MockAsyncClientRequest request(&cm_.async_client_);
Http::AsyncClient::Callbacks* callback;
const Optional<std::chrono::milliseconds> timeout(std::chrono::seconds(5));
Expand Down Expand Up @@ -468,6 +504,8 @@ TEST_F(LightStepSinkTest, FlushOneSpanGrpcFailure) {
EXPECT_CALL(request_info, protocol()).WillOnce(ReturnRef(protocol));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.min_flush_spans", 5))
.WillOnce(Return(1));
EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.request_timeout", 5000U))
.WillOnce(Return(5000U));

sink_->flushTrace(empty_header_, empty_header_, request_info);

Expand Down
2 changes: 1 addition & 1 deletion test/mocks/event/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class MockTimer : public Timer {

// Event::Timer
MOCK_METHOD0(disableTimer, void());
MOCK_METHOD1(enableTimer, void(const std::chrono::milliseconds& d));
MOCK_METHOD1(enableTimer, void(const std::chrono::milliseconds&));

TimerCb callback_;
};
Expand Down

0 comments on commit 6aaf128

Please sign in to comment.