diff --git a/source/common/tracing/http_tracer_impl.cc b/source/common/tracing/http_tracer_impl.cc index 2626753cb434..c71892520b7d 100644 --- a/source/common/tracing/http_tracer_impl.cc +++ b/source/common/tracing/http_tracer_impl.cc @@ -114,8 +114,17 @@ void HttpTracerImpl::populateStats(const Decision& decision) { } } -LightStepRecorder::LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink) - : builder_(tracer), sink_(sink) {} +LightStepRecorder::LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink, + Event::Dispatcher& dispatcher) + : builder_(tracer), sink_(sink) { + flush_timer_ = dispatcher.createTimer([this]() -> void { + sink_.tracerStats().timer_flushed_.inc(); + flushSpans(); + enableTimer(); + }); + + enableTimer(); +} void LightStepRecorder::RecordSpan(lightstep::collector::Span&& span) { builder_.addSpan(std::move(span)); @@ -123,7 +132,31 @@ void LightStepRecorder::RecordSpan(lightstep::collector::Span&& span) { uint64_t min_flush_spans = sink_.runtime().snapshot().getInteger("tracing.lightstep.min_flush_spans", 5U); if (builder_.pendingSpans() == min_flush_spans) { - sink_.tracerStats().spans_sent_.add(min_flush_spans); + flushSpans(); + } +} + +bool LightStepRecorder::FlushWithTimeout(lightstep::Duration) { + // Note: We don't expect this to be called, since the Tracer + // reference is private to its LightStepSink. + return true; +} + +std::unique_ptr +LightStepRecorder::NewInstance(LightStepSink& sink, Event::Dispatcher& dispatcher, + const lightstep::TracerImpl& tracer) { + return std::unique_ptr(new LightStepRecorder(tracer, sink, dispatcher)); +} + +void LightStepRecorder::enableTimer() { + uint64_t flush_interval = + sink_.runtime().snapshot().getInteger("tracing.lightstep.flush_interval_ms", 1000U); + flush_timer_->enableTimer(std::chrono::milliseconds(flush_interval)); +} + +void LightStepRecorder::flushSpans() { + if (builder_.pendingSpans() != 0) { + sink_.tracerStats().spans_sent_.add(builder_.pendingSpans()); lightstep::collector::ReportRequest request; std::swap(request, builder_.pending()); @@ -133,22 +166,16 @@ void LightStepRecorder::RecordSpan(lightstep::collector::Span&& span) { message->body(Grpc::Common::serializeBody(std::move(request))); + uint64_t timeout = + sink_.runtime().snapshot().getInteger("tracing.lightstep.request_timeout", 5000U); sink_.clusterManager() .httpAsyncClientForCluster(sink_.collectorCluster()) - .send(std::move(message), *this, std::chrono::milliseconds(5000)); + .send(std::move(message), *this, std::chrono::milliseconds(timeout)); } } -bool LightStepRecorder::FlushWithTimeout(lightstep::Duration) { - // Note: We don't expect this to be called, since the Tracer - // reference is private to its LightStepSink. - return true; -} - -std::unique_ptr -LightStepRecorder::NewInstance(LightStepSink& sink, const lightstep::TracerImpl& tracer) { - return std::unique_ptr(new LightStepRecorder(tracer, sink)); -} +LightStepSink::TlsLightStepTracer::TlsLightStepTracer(lightstep::Tracer tracer, LightStepSink& sink) + : tracer_(tracer), sink_(sink) {} LightStepSink::LightStepSink(const Json::Object& config, Upstream::ClusterManager& cluster_manager, Stats::Store& stats, const std::string& service_node, @@ -169,10 +196,10 @@ LightStepSink::LightStepSink(const Json::Object& config, Upstream::ClusterManage fmt::format("{} collector cluster must support http2 for gRPC calls", collector_cluster_)); } - tls_.set(tls_slot_, [this](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectPtr { + tls_.set(tls_slot_, [this](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectPtr { lightstep::Tracer tracer(lightstep::NewUserDefinedTransportLightStepTracer( - *options_, - std::bind(&LightStepRecorder::NewInstance, std::ref(*this), std::placeholders::_1))); + *options_, std::bind(&LightStepRecorder::NewInstance, std::ref(*this), std::ref(dispatcher), + std::placeholders::_1))); return ThreadLocal::ThreadLocalObjectPtr{new TlsLightStepTracer(std::move(tracer), *this)}; }); diff --git a/source/common/tracing/http_tracer_impl.h b/source/common/tracing/http_tracer_impl.h index b34a0e0cb5bd..93b7cd697783 100644 --- a/source/common/tracing/http_tracer_impl.h +++ b/source/common/tracing/http_tracer_impl.h @@ -28,7 +28,9 @@ struct HttpTracerStats { HTTP_TRACER_STATS(GENERATE_COUNTER_STRUCT) }; -#define LIGHTSTEP_TRACER_STATS(COUNTER) COUNTER(spans_sent) +#define LIGHTSTEP_TRACER_STATS(COUNTER) \ + COUNTER(spans_sent) \ + COUNTER(timer_flushed) struct LightstepTracerStats { LIGHTSTEP_TRACER_STATS(GENERATE_COUNTER_STRUCT) @@ -116,8 +118,7 @@ class LightStepSink : public HttpSink { private: struct TlsLightStepTracer : ThreadLocal::ThreadLocalObject { - TlsLightStepTracer(lightstep::Tracer tracer, LightStepSink& sink) - : tracer_(tracer), sink_(sink) {} + TlsLightStepTracer(lightstep::Tracer tracer, LightStepSink& sink); void shutdown() override {} @@ -142,7 +143,8 @@ class LightStepSink : public HttpSink { class LightStepRecorder : public lightstep::Recorder, Http::AsyncClient::Callbacks { public: - LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink); + LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepSink& sink, + Event::Dispatcher& dispatcher); // lightstep::Recorder void RecordSpan(lightstep::collector::Span&& span) override; @@ -153,11 +155,16 @@ class LightStepRecorder : public lightstep::Recorder, Http::AsyncClient::Callbac void onFailure(Http::AsyncClient::FailureReason) override; static std::unique_ptr NewInstance(LightStepSink& sink, + Event::Dispatcher& dispatcher, const lightstep::TracerImpl& tracer); private: + void enableTimer(); + void flushSpans(); + lightstep::ReportBuilder builder_; LightStepSink& sink_; + Event::TimerPtr flush_timer_; }; } // Tracing diff --git a/test/common/tracing/http_tracer_impl_test.cc b/test/common/tracing/http_tracer_impl_test.cc index c7fa378d50ce..4245df184701 100644 --- a/test/common/tracing/http_tracer_impl_test.cc +++ b/test/common/tracing/http_tracer_impl_test.cc @@ -272,12 +272,19 @@ TEST(HttpNullTracerTest, NoFailures) { class LightStepSinkTest : public Test { public: - void setup(Json::Object& config) { + void setup(Json::Object& config, bool init_timer) { std::unique_ptr opts(new lightstep::TracerOptions()); opts->access_token = "sample_token"; - opts->tracer_attributes["lightstep.guid"] = "random_guid"; opts->tracer_attributes["lightstep.component_name"] = "component"; + ON_CALL(cm_, httpAsyncClientForCluster("lightstep_saas")) + .WillByDefault(ReturnRef(cm_.async_client_)); + + if (init_timer) { + timer_ = new NiceMock(&tls_.dispatcher_); + EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(1000))); + } + sink_.reset( new LightStepSink(config, cm_, stats_, "service_node", tls_, runtime_, std::move(opts))); } @@ -291,17 +298,18 @@ class LightStepSinkTest : public Test { )EOF"; Json::StringLoader loader(valid_config); - setup(loader); + setup(loader, true); } const Http::HeaderMapImpl empty_header_{}; + std::unique_ptr sink_; + NiceMock* timer_; Stats::IsolatedStoreImpl stats_; NiceMock cm_; NiceMock cluster_; NiceMock random_; NiceMock runtime_; - std::unique_ptr sink_; NiceMock tls_; }; @@ -312,14 +320,14 @@ TEST_F(LightStepSinkTest, InitializeSink) { )EOF"; Json::StringLoader loader(invalid_config); - EXPECT_THROW(setup(loader), EnvoyException); + EXPECT_THROW(setup(loader, false), EnvoyException); } { std::string empty_config = "{}"; Json::StringLoader loader(empty_config); - EXPECT_THROW(setup(loader), EnvoyException); + EXPECT_THROW(setup(loader, false), EnvoyException); } { @@ -331,7 +339,7 @@ TEST_F(LightStepSinkTest, InitializeSink) { )EOF"; Json::StringLoader loader(valid_config); - EXPECT_THROW(setup(loader), EnvoyException); + EXPECT_THROW(setup(loader, false), EnvoyException); } { @@ -344,7 +352,7 @@ TEST_F(LightStepSinkTest, InitializeSink) { )EOF"; Json::StringLoader loader(valid_config); - EXPECT_THROW(setup(loader), EnvoyException); + EXPECT_THROW(setup(loader, false), EnvoyException); } { @@ -356,7 +364,7 @@ TEST_F(LightStepSinkTest, InitializeSink) { )EOF"; Json::StringLoader loader(valid_config); - setup(loader); + setup(loader, true); } } @@ -364,9 +372,6 @@ TEST_F(LightStepSinkTest, FlushSeveralSpans) { setupValidSink(); NiceMock request_info; - EXPECT_CALL(cm_, httpAsyncClientForCluster("lightstep_saas")) - .WillOnce(ReturnRef(cm_.async_client_)); - Http::MockAsyncClientRequest request(&cm_.async_client_); Http::AsyncClient::Callbacks* callback; const Optional timeout(std::chrono::seconds(5)); @@ -400,6 +405,8 @@ TEST_F(LightStepSinkTest, FlushSeveralSpans) { EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.min_flush_spans", 5)) .Times(2) .WillRepeatedly(Return(2)); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.request_timeout", 5000U)) + .WillOnce(Return(5000U)); sink_->flushTrace(empty_header_, empty_header_, request_info); sink_->flushTrace(empty_header_, empty_header_, request_info); @@ -434,13 +441,42 @@ TEST_F(LightStepSinkTest, FlushSeveralSpans) { EXPECT_EQ(2U, stats_.counter("tracing.lightstep.spans_sent").value()); } -TEST_F(LightStepSinkTest, FlushOneSpanGrpcFailure) { +TEST_F(LightStepSinkTest, FlushSpansTimer) { setupValidSink(); NiceMock request_info; - EXPECT_CALL(cm_, httpAsyncClientForCluster("lightstep_saas")) - .WillOnce(ReturnRef(cm_.async_client_)); + const Optional timeout(std::chrono::seconds(5)); + EXPECT_CALL(cm_.async_client_, send_(_, _, timeout)); + + SystemTime start_time; + EXPECT_CALL(request_info, startTime()).WillOnce(Return(start_time)); + Optional code(200); + EXPECT_CALL(request_info, responseCode()).Times(2).WillRepeatedly(ReturnRef(code)); + + const std::string protocol = "http/1"; + EXPECT_CALL(request_info, protocol()).WillOnce(ReturnRef(protocol)); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.min_flush_spans", 5)) + .WillOnce(Return(5)); + + sink_->flushTrace(empty_header_, empty_header_, request_info); + // Timer should be re-enabled. + EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(1000))); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.request_timeout", 5000U)) + .WillOnce(Return(5000U)); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.flush_interval_ms", 1000U)) + .WillOnce(Return(1000U)); + + timer_->callback_(); + + EXPECT_EQ(1U, stats_.counter("tracing.lightstep.timer_flushed").value()); + EXPECT_EQ(1U, stats_.counter("tracing.lightstep.spans_sent").value()); +} + +TEST_F(LightStepSinkTest, FlushOneSpanGrpcFailure) { + setupValidSink(); + + NiceMock request_info; Http::MockAsyncClientRequest request(&cm_.async_client_); Http::AsyncClient::Callbacks* callback; const Optional timeout(std::chrono::seconds(5)); @@ -468,6 +504,8 @@ TEST_F(LightStepSinkTest, FlushOneSpanGrpcFailure) { EXPECT_CALL(request_info, protocol()).WillOnce(ReturnRef(protocol)); EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.min_flush_spans", 5)) .WillOnce(Return(1)); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.request_timeout", 5000U)) + .WillOnce(Return(5000U)); sink_->flushTrace(empty_header_, empty_header_, request_info); diff --git a/test/mocks/event/mocks.h b/test/mocks/event/mocks.h index b0ca6e5665df..34215e9300db 100644 --- a/test/mocks/event/mocks.h +++ b/test/mocks/event/mocks.h @@ -97,7 +97,7 @@ class MockTimer : public Timer { // Event::Timer MOCK_METHOD0(disableTimer, void()); - MOCK_METHOD1(enableTimer, void(const std::chrono::milliseconds& d)); + MOCK_METHOD1(enableTimer, void(const std::chrono::milliseconds&)); TimerCb callback_; };