diff --git a/app/app_test.go b/app/app_test.go index 6fdaa62654..393535a4cd 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -243,13 +243,14 @@ func TestAppIntegration(t *testing.T) { time.Sleep(5 * app.Config.GetSendTickerValue()) - events := sender.Events() - require.Len(t, events, 1) - - assert.Equal(t, "dataset", events[0].Dataset) - assert.Equal(t, "bar", events[0].Data["foo"]) - assert.Equal(t, "1", events[0].Data["trace.trace_id"]) - assert.Equal(t, uint(1), events[0].Data["meta.refinery.original_sample_rate"]) + require.EventuallyWithT(t, func(collect *assert.CollectT) { + events := sender.Events() + require.Len(collect, events, 1) + assert.Equal(collect, "dataset", events[0].Dataset) + assert.Equal(collect, "bar", events[0].Data["foo"]) + assert.Equal(collect, "1", events[0].Data["trace.trace_id"]) + assert.Equal(collect, uint(1), events[0].Data["meta.refinery.original_sample_rate"]) + }, 2*time.Second, 10*time.Millisecond) err = startstop.Stop(graph.Objects(), nil) assert.NoError(t, err) @@ -400,8 +401,8 @@ func TestPeerRouting(t *testing.T) { } assert.Equal(t, expectedEvent, senders[0].Events()[0]) - // Repeat, but deliver to host 1 on the peer channel, it should not be - // passed to host 0. + // Repeat, but deliver to host 1 on the peer channel, it should be + // passed to host 0 since that's who the trace belongs to. req, err = http.NewRequest( "POST", "http://localhost:11003/1/batch/dataset", @@ -414,7 +415,7 @@ func TestPeerRouting(t *testing.T) { req.Body = io.NopCloser(strings.NewReader(blob)) post(t, req) assert.Eventually(t, func() bool { - return len(senders[1].Events()) == 1 + return len(senders[0].Events()) == 1 }, 2*time.Second, 2*time.Millisecond) assert.Equal(t, expectedEvent, senders[0].Events()[0]) } @@ -522,8 +523,8 @@ func TestEventsEndpoint(t *testing.T) { senders[0].Events()[0], ) - // Repeat, but deliver to host 1 on the peer channel, it should not be - // passed to host 0. + // Repeat, but deliver to host 1 on the peer channel, it should be + // passed to host 0 since that's the host this trace belongs to. blob = blob[:0] buf := bytes.NewBuffer(blob) @@ -545,7 +546,7 @@ func TestEventsEndpoint(t *testing.T) { post(t, req) assert.Eventually(t, func() bool { - return len(senders[1].Events()) == 1 + return len(senders[0].Events()) == 1 }, 2*time.Second, 2*time.Millisecond) assert.Equal( @@ -565,10 +566,10 @@ func TestEventsEndpoint(t *testing.T) { "api_host": "http://api.honeycomb.io", "dataset": "dataset", "environment": "", - "enqueued_at": senders[1].Events()[0].Metadata.(map[string]any)["enqueued_at"], + "enqueued_at": senders[0].Events()[0].Metadata.(map[string]any)["enqueued_at"], }, }, - senders[1].Events()[0], + senders[0].Events()[0], ) } @@ -644,7 +645,7 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) { senders[0].Events()[0], ) - // Repeat, but deliver to host 1 on the peer channel, it should not be + // Repeat, but deliver to host 1 on the peer channel, it should be // passed to host 0. blob = blob[:0] @@ -667,7 +668,7 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) { post(t, req) assert.Eventually(t, func() bool { - return len(senders[1].Events()) == 1 + return len(senders[0].Events()) == 1 }, 2*time.Second, 2*time.Millisecond) assert.Equal( @@ -687,10 +688,10 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) { "api_host": "http://api.honeycomb.io", "dataset": "dataset", "environment": "test", - "enqueued_at": senders[1].Events()[0].Metadata.(map[string]any)["enqueued_at"], + "enqueued_at": senders[0].Events()[0].Metadata.(map[string]any)["enqueued_at"], }, }, - senders[1].Events()[0], + senders[0].Events()[0], ) } diff --git a/cmd/refinery/main.go b/cmd/refinery/main.go index 1923eed034..d2181dd9fb 100644 --- a/cmd/refinery/main.go +++ b/cmd/refinery/main.go @@ -249,7 +249,7 @@ func main() { {Value: legacyMetrics, Name: "legacyMetrics"}, {Value: promMetrics, Name: "promMetrics"}, {Value: oTelMetrics, Name: "otelMetrics"}, - {Value: tracer, Name: "tracer"}, + {Value: tracer, Name: "tracer"}, // we need to use a named injection here because trace.Tracer's struct fields are all private {Value: clockwork.NewRealClock()}, {Value: metricsSingleton, Name: "metrics"}, {Value: genericMetricsRecorder, Name: "genericMetrics"}, diff --git a/collect/cache/cuckooSentCache.go b/collect/cache/cuckooSentCache.go index c652853f72..20c0519e05 100644 --- a/collect/cache/cuckooSentCache.go +++ b/collect/cache/cuckooSentCache.go @@ -210,11 +210,11 @@ func (c *cuckooSentCache) Record(trace KeptTrace, keep bool, reason string) { c.dropped.Add(trace.ID()) } -func (c *cuckooSentCache) Check(span *types.Span) (TraceSentRecord, string, bool) { +func (c *cuckooSentCache) CheckSpan(span *types.Span) (TraceSentRecord, string, bool) { // was it dropped? if c.dropped.Check(span.TraceID) { // we recognize it as dropped, so just say so; there's nothing else to do - return &cuckooDroppedRecord{}, "", false + return &cuckooDroppedRecord{}, "", true } // was it kept? c.keptMut.Lock() @@ -261,3 +261,23 @@ func (c *cuckooSentCache) Resize(cfg config.SampleCacheConfig) error { go c.monitor() return nil } + +// CheckTrace checks if a trace was kept or dropped, and returns the reason if it was kept. +// The bool return value is true if the trace was found in the cache. +// It does not modify the count information. +func (c *cuckooSentCache) CheckTrace(traceID string) (TraceSentRecord, string, bool) { + // was it dropped? + if c.dropped.Check(traceID) { + // we recognize it as dropped, so just say so; there's nothing else to do + return &cuckooDroppedRecord{}, "", true + } + // was it kept? + c.keptMut.Lock() + defer c.keptMut.Unlock() + if sentRecord, found := c.kept.Get(traceID); found { + reason, _ := c.sentReasons.Get(uint(sentRecord.reason)) + return sentRecord, reason, true + } + // we have no memory of this place + return nil, "", false +} diff --git a/collect/cache/traceSentCache.go b/collect/cache/traceSentCache.go index 362126dbf3..f8bf8c3531 100644 --- a/collect/cache/traceSentCache.go +++ b/collect/cache/traceSentCache.go @@ -25,9 +25,12 @@ type TraceSentRecord interface { type TraceSentCache interface { // Record preserves the record of a trace being sent or not. Record(trace KeptTrace, keep bool, reason string) - // Check tests if a trace corresponding to the span is in the cache; if found, it returns the appropriate TraceSentRecord and true, + // CheckTrace if a trace is in the cache; if found, it returns the appropriate TraceSentRecord and true, else nil and false. + // It does not modify the count information. + CheckTrace(traceID string) (TraceSentRecord, string, bool) + // CheckSpan tests if a trace corresponding to the span is in the cache; if found, it returns the appropriate TraceSentRecord and true, // else nil and false. - Check(span *types.Span) (TraceSentRecord, string, bool) + CheckSpan(span *types.Span) (TraceSentRecord, string, bool) // Stop halts the cache in preparation for shutdown Stop() // Resize adjusts the size of the cache according to the Config passed in diff --git a/collect/collect.go b/collect/collect.go index 4dae4b2d54..b9b3e2f21e 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -15,10 +15,12 @@ import ( "github.com/honeycombio/refinery/collect/cache" "github.com/honeycombio/refinery/config" "github.com/honeycombio/refinery/generics" + "github.com/honeycombio/refinery/internal/health" "github.com/honeycombio/refinery/internal/otelutil" "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" "github.com/honeycombio/refinery/sample" + "github.com/honeycombio/refinery/sharder" "github.com/honeycombio/refinery/transmit" "github.com/honeycombio/refinery/types" "github.com/jonboulle/clockwork" @@ -26,6 +28,7 @@ import ( ) var ErrWouldBlock = errors.New("not adding span, channel buffer is full") +var CollectorHealthKey = "collector" type Collector interface { // AddSpan adds a span to be collected, buffered, and merged into a trace. @@ -53,10 +56,13 @@ const ( // InMemCollector is a single threaded collector. type InMemCollector struct { - Config config.Config `inject:""` - Logger logger.Logger `inject:""` - Clock clockwork.Clock `inject:""` - Tracer trace.Tracer `inject:"tracer"` + Config config.Config `inject:""` + Logger logger.Logger `inject:""` + Clock clockwork.Clock `inject:""` + Tracer trace.Tracer `inject:"tracer"` + Health health.Recorder `inject:""` + Sharder sharder.Sharder `inject:""` + Transmission transmit.Transmission `inject:"upstreamTransmission"` Metrics metrics.Metrics `inject:"genericMetrics"` SamplerFactory *sample.SamplerFactory `inject:""` @@ -77,6 +83,7 @@ type InMemCollector struct { incoming chan *types.Span fromPeer chan *types.Span reload chan struct{} + done chan struct{} hostname string } @@ -91,6 +98,8 @@ func (i *InMemCollector) Start() error { // listen for config reloads i.Config.RegisterReloadCallback(i.sendReloadSignal) + i.Health.Register(CollectorHealthKey, 3*time.Second) + i.Metrics.Register("trace_duration_ms", "histogram") i.Metrics.Register("trace_span_count", "histogram") i.Metrics.Register("collector_incoming_queue", "histogram") @@ -108,6 +117,7 @@ func (i *InMemCollector) Start() error { i.Metrics.Register("trace_send_dropped", "counter") i.Metrics.Register("trace_send_has_root", "counter") i.Metrics.Register("trace_send_no_root", "counter") + i.Metrics.Register(TraceSendGotRoot, "counter") i.Metrics.Register(TraceSendExpired, "counter") i.Metrics.Register(TraceSendEjectedFull, "counter") @@ -129,6 +139,7 @@ func (i *InMemCollector) Start() error { i.Metrics.Store("INCOMING_CAP", float64(cap(i.incoming))) i.Metrics.Store("PEER_CAP", float64(cap(i.fromPeer))) i.reload = make(chan struct{}, 1) + i.done = make(chan struct{}) i.datasetSamplers = make(map[string]sample.Sampler) if i.Config.GetAddHostMetadataToTrace() { @@ -165,7 +176,7 @@ func (i *InMemCollector) reloadConfigs() { // pull the old cache contents into the new cache for j, trace := range existingCache.GetAll() { if j >= imcConfig.CacheCapacity { - i.send(trace, TraceSendEjectedFull) + i.sendTrace(trace, TraceSendEjectedFull) continue } c.Set(trace) @@ -239,7 +250,7 @@ func (i *InMemCollector) checkAlloc() { for _, trace := range allTraces { tracesSent.Add(trace.TraceID) totalDataSizeSent += trace.DataSize - i.send(trace, TraceSendEjectedMemsize) + i.sendTrace(trace, TraceSendEjectedMemsize) if totalDataSizeSent > int(totalToRemove) { break } @@ -313,6 +324,7 @@ func (i *InMemCollector) collect() { defer i.mutex.Unlock() for { + i.Health.Ready(CollectorHealthKey, true) // record channel lengths as histogram but also as gauges i.Metrics.Histogram("collector_incoming_queue", float64(len(i.incoming))) i.Metrics.Histogram("collector_peer_queue", float64(len(i.fromPeer))) @@ -324,6 +336,8 @@ func (i *InMemCollector) collect() { // deadlocks because peers are waiting to get their events handed off to each // other. select { + case <-i.done: + return case sp, ok := <-i.fromPeer: if !ok { // channel's been closed; we should shut down. @@ -332,8 +346,10 @@ func (i *InMemCollector) collect() { i.processSpan(sp) default: select { + case <-i.done: + return case <-ticker.C: - i.sendTracesInCache(time.Now()) + i.sendExpiredTracesInCache(i.Clock.Now()) i.checkAlloc() // Briefly unlock the cache, to allow test access. @@ -361,13 +377,13 @@ func (i *InMemCollector) collect() { } } -func (i *InMemCollector) sendTracesInCache(now time.Time) { +func (i *InMemCollector) sendExpiredTracesInCache(now time.Time) { traces := i.cache.TakeExpiredTraces(now) for _, t := range traces { if t.RootSpan != nil { - i.send(t, TraceSendGotRoot) + i.sendTrace(t, TraceSendGotRoot) } else { - i.send(t, TraceSendExpired) + i.sendTrace(t, TraceSendExpired) } } } @@ -375,9 +391,8 @@ func (i *InMemCollector) sendTracesInCache(now time.Time) { // processSpan does all the stuff necessary to take an incoming span and add it // to (or create a new placeholder for) a trace. func (i *InMemCollector) processSpan(sp *types.Span) { - ctx, span := otelutil.StartSpan(context.Background(), i.Tracer, "processSpan") + ctx := context.Background() defer func() { - span.End() i.Metrics.Increment("span_processed") i.Metrics.Down("spans_waiting") }() @@ -385,7 +400,7 @@ func (i *InMemCollector) processSpan(sp *types.Span) { trace := i.cache.Get(sp.TraceID) if trace == nil { // if the trace has already been sent, just pass along the span - if sr, sentReason, found := i.sampleTraceCache.Check(sp); found { + if sr, sentReason, found := i.sampleTraceCache.CheckSpan(sp); found { i.Metrics.Increment("trace_sent_cache_hit") // bump the count of records on this trace -- if the root span isn't // the last late span, then it won't be perfect, but it will be better than @@ -415,13 +430,13 @@ func (i *InMemCollector) processSpan(sp *types.Span) { // push this into the cache and if we eject an unsent trace, send it ASAP ejectedTrace := i.cache.Set(trace) if ejectedTrace != nil { - i.send(ejectedTrace, TraceSendEjectedFull) + i.sendTrace(ejectedTrace, TraceSendEjectedFull) } } // if the trace we got back from the cache has already been sent, deal with the // span. if trace.Sent { - if sr, reason, found := i.sampleTraceCache.Check(sp); found { + if sr, reason, found := i.sampleTraceCache.CheckSpan(sp); found { i.Metrics.Increment("trace_sent_cache_hit") i.dealWithSentTrace(ctx, sr, reason, sp) return @@ -468,7 +483,7 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span) (processed bool, } var rate uint - record, reason, found := i.sampleTraceCache.Check(sp) + record, reason, found := i.sampleTraceCache.CheckSpan(sp) if !found { rate, keep, reason = i.StressRelief.GetSampleRate(sp.TraceID) now := i.Clock.Now() @@ -495,9 +510,9 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span) (processed bool, i.Metrics.Increment("kept_from_stress") // ok, we're sending it, so decorate it first - sp.Event.Data["meta.stressed"] = true + sp.Data["meta.stressed"] = true if i.Config.GetAddRuleReasonToTrace() { - sp.Event.Data["meta.refinery.reason"] = reason + sp.Data["meta.refinery.reason"] = reason } if i.hostname != "" { sp.Data["meta.refinery.local_hostname"] = i.hostname @@ -514,7 +529,7 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span) (processed bool, // on the trace has already been made, and it obeys that decision by either // sending the span immediately or dropping it. func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSentRecord, sentReason string, sp *types.Span) { - ctx, span := otelutil.StartSpanMulti(ctx, i.Tracer, "dealWithSentTrace", map[string]interface{}{ + _, span := otelutil.StartSpanMulti(ctx, i.Tracer, "dealWithSentTrace", map[string]interface{}{ "trace_id": sp.TraceID, "sent_reason": sentReason, "hostname": i.hostname, @@ -606,7 +621,7 @@ func mergeTraceAndSpanSampleRates(sp *types.Span, traceSampleRate uint, dryRunMo func (i *InMemCollector) isRootSpan(sp *types.Span) bool { // log event should never be considered a root span, check for that first - if signalType, _ := sp.Data["meta.signal_type"]; signalType == "log" { + if signalType := sp.Data["meta.signal_type"]; signalType == "log" { return false } // check if the event has a parent id using the configured parent id field names @@ -619,7 +634,18 @@ func (i *InMemCollector) isRootSpan(sp *types.Span) bool { return true } -func (i *InMemCollector) send(trace *types.Trace, sendReason string) { +// drainTrace is called when a trace should be sent to honeycomb during shutdown. +func (i *InMemCollector) drainTrace(trace *types.Trace, sendReason string) { + i.send(trace, sendReason, true) +} + +// sendTrace is called when a trace is ready to be sent to honeycomb during normal operation. +func (i *InMemCollector) sendTrace(trace *types.Trace, sendReason string) { + i.send(trace, sendReason, false) +} + +// send is the actual workhorse of sending a trace to honeycomb. It's called by both drainTrace and sendTrace. +func (i *InMemCollector) send(trace *types.Trace, sendReason string, onShutdown bool) { if trace.Sent { // someone else already sent this so we shouldn't also send it. i.Logger.Debug(). @@ -731,6 +757,9 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) { if i.hostname != "" { sp.Data["meta.refinery.local_hostname"] = i.hostname } + if onShutdown { + sp.Data["meta.refinery.shutdown.send"] = true + } mergeTraceAndSpanSampleRates(sp, trace.SampleRate(), isDryRun) i.addAdditionalAttributes(sp) i.Transmission.EnqueueSpan(sp) @@ -738,30 +767,175 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) { } func (i *InMemCollector) Stop() error { - // close the incoming channel and (TODO) wait for all collectors to finish - close(i.incoming) + close(i.done) + // signal the health system to not be ready + // so that no new traces are accepted + i.Health.Ready(CollectorHealthKey, false) i.mutex.Lock() - defer i.mutex.Unlock() - // purge the collector of any in-flight traces - if i.cache != nil { - traces := i.cache.GetAll() - for _, trace := range traces { - if trace != nil { - i.send(trace, TraceSendEjectedFull) - } - } + if !i.Config.GetCollectionConfig().DisableRedistribution { + i.sendTracesOnShutdown() } + if i.Transmission != nil { i.Transmission.Flush() } i.sampleTraceCache.Stop() + i.mutex.Unlock() + + close(i.incoming) + close(i.fromPeer) return nil } +// sentRecord is a struct that holds a span and the record of the trace decision made. +type sentRecord struct { + span *types.Span + record cache.TraceSentRecord + reason string +} + +// sendTracesInCache sends all traces in the cache to their final destination. +// This is done on shutdown to ensure that all traces are sent before the collector +// is stopped. +// It does this by pulling spans out of both the incoming queue and the peer queue so that +// any spans that are still in the queues when the collector is stopped are also sent. +// It also pulls traces out of the cache and sends them to their final destination. +func (i *InMemCollector) sendTracesOnShutdown() { + wg := &sync.WaitGroup{} + sentChan := make(chan sentRecord, len(i.incoming)) + forwardChan := make(chan *types.Span, i.Config.GetCollectionConfig().CacheCapacity) + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i.Config.GetCollectionConfig().ShutdownDelay)) + defer cancel() + + // start a goroutine that will pull spans off of the channels passed in + // and send them to their final destination + wg.Add(1) + go func() { + defer wg.Done() + i.sendSpansOnShutdown(ctx, sentChan, forwardChan) + }() + + // start a goroutine that will pull spans off of the incoming queue + // and place them on the sentChan or forwardChan + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case sp, ok := <-i.incoming: + if !ok { + return + } + + i.distributeSpansOnShutdown(sentChan, forwardChan, sp) + } + } + }() + + // start a goroutine that will pull spans off of the peer queue + // and place them on the sentChan or forwardChan + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case sp, ok := <-i.fromPeer: + if !ok { + return + } + + i.distributeSpansOnShutdown(sentChan, forwardChan, sp) + } + } + }() + + // pull traces from the trace cache and place them on the sentChan or forwardChan + if i.cache != nil { + traces := i.cache.GetAll() + for _, trace := range traces { + i.distributeSpansOnShutdown(sentChan, forwardChan, trace.GetSpans()...) + } + } + + wg.Wait() + + close(sentChan) + close(forwardChan) + +} + +// distributeSpansInCache takes a list of spans and sends them to the appropriate channel based on the state of the trace. +func (i *InMemCollector) distributeSpansOnShutdown(sentTraceChan chan sentRecord, forwardTraceChan chan *types.Span, spans ...*types.Span) { + for _, sp := range spans { + if sp != nil { + + // first check if there's a trace decision + record, reason, found := i.sampleTraceCache.CheckTrace(sp.TraceID) + if found { + sentTraceChan <- sentRecord{sp, record, reason} + continue + } + + // if there's no trace decision, then we need to forward the trace to its new home + forwardTraceChan <- sp + } + } +} + +// sendSpansOnShutdown is a helper function that sends span to their final destination +// on shutdown. +func (i *InMemCollector) sendSpansOnShutdown(ctx context.Context, sentTraceChan <-chan sentRecord, forwardTraceChan <-chan *types.Span) { + for { + select { + case <-ctx.Done(): + i.Logger.Info().Logf("Timed out waiting for traces to send") + return + + case r, ok := <-sentTraceChan: + if !ok { + return + } + + ctx, span := otelutil.StartSpanMulti(ctx, i.Tracer, "shutdown_sent_trace", map[string]interface{}{"trace_id": r.span.TraceID, "hostname": i.hostname}) + r.span.Data["meta.refinery.shutdown.send"] = true + + i.dealWithSentTrace(ctx, r.record, r.reason, r.span) + + span.End() + + case sp, ok := <-forwardTraceChan: + if !ok { + return + } + + _, span := otelutil.StartSpanMulti(ctx, i.Tracer, "shutdown_forwarded_trace", map[string]interface{}{"trace_id": sp.TraceID, "hostname": i.hostname}) + + targetShard := i.Sharder.WhichShard(sp.TraceID) + url := targetShard.GetAddress() + + otelutil.AddSpanField(span, "target_shard", url) + + // TODO: we need to decorate the expired traces before forwarding them so that + // the downstream consumers can make decisions based on the metadata without having + // to restart the TraceTimeout or SendDelay + sp.APIHost = url + sp.Data["meta.refinery.shutdown.send"] = false + i.Transmission.EnqueueSpan(sp) + span.End() + } + + } +} + // Convenience method for tests. func (i *InMemCollector) getFromCache(traceID string) *types.Trace { i.mutex.RLock() diff --git a/collect/collect_test.go b/collect/collect_test.go index dffb54700b..d4fafb501d 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -1,6 +1,7 @@ package collect import ( + "context" "fmt" "math/rand" "runtime" @@ -17,10 +18,12 @@ import ( "github.com/honeycombio/refinery/collect/cache" "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/internal/health" "github.com/honeycombio/refinery/internal/peer" "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" "github.com/honeycombio/refinery/sample" + "github.com/honeycombio/refinery/sharder" "github.com/honeycombio/refinery/transmit" "github.com/honeycombio/refinery/types" ) @@ -40,10 +43,21 @@ func newCache() (cache.TraceSentCache, error) { func newTestCollector(conf config.Config, transmission transmit.Transmission) *InMemCollector { s := &metrics.MockMetrics{} s.Start() + clock := clockwork.NewRealClock() + healthReporter := &health.Health{ + Clock: clock, + } + healthReporter.Start() + return &InMemCollector{ - Config: conf, - Logger: &logger.NullLogger{}, - Tracer: noop.NewTracerProvider().Tracer("test"), + Config: conf, + Clock: clock, + Logger: &logger.NullLogger{}, + Tracer: noop.NewTracerProvider().Tracer("test"), + Health: healthReporter, + Sharder: &sharder.SingleServerSharder{ + Logger: &logger.NullLogger{}, + }, Transmission: transmission, Metrics: &metrics.NullMetrics{}, StressRelief: &MockStressReliever{}, @@ -52,6 +66,7 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission) *I Metrics: s, Logger: &logger.NullLogger{}, }, + done: make(chan struct{}), } } @@ -64,6 +79,9 @@ func TestAddRootSpan(t *testing.T) { GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1}, SendTickerVal: 2 * time.Millisecond, ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + }, } transmission := &transmit.MockTransmission{} transmission.Start() @@ -140,6 +158,9 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) { GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: expectedDeterministicSampleRate}, SendTickerVal: 2 * time.Millisecond, ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + }, } transmission := &transmit.MockTransmission{} transmission.Start() @@ -222,6 +243,9 @@ func TestTransmittedSpansShouldHaveASampleRateOfAtLeastOne(t *testing.T) { GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1}, SendTickerVal: 2 * time.Millisecond, ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + }, } transmission := &transmit.MockTransmission{} transmission.Start() @@ -281,6 +305,9 @@ func TestAddSpan(t *testing.T) { GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1}, SendTickerVal: 2 * time.Millisecond, ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + }, } transmission := &transmit.MockTransmission{} transmission.Start() @@ -343,6 +370,9 @@ func TestDryRunMode(t *testing.T) { SendTickerVal: 20 * time.Millisecond, DryRun: true, ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + }, } transmission := &transmit.MockTransmission{} transmission.Start() @@ -466,6 +496,7 @@ func TestCacheSizeReload(t *testing.T) { SendTickerVal: 2 * time.Millisecond, GetCollectionConfigVal: config.CollectionConfig{ CacheCapacity: 1, + ShutdownDelay: config.Duration(1 * time.Millisecond), }, ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, SampleCache: config.SampleCacheConfig{ @@ -539,7 +570,7 @@ func TestSampleConfigReload(t *testing.T) { GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1}, SendTickerVal: 2 * time.Millisecond, ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, - GetCollectionConfigVal: config.CollectionConfig{CacheCapacity: 10}, + GetCollectionConfigVal: config.CollectionConfig{CacheCapacity: 10, ShutdownDelay: config.Duration(1 * time.Millisecond)}, SampleCache: config.SampleCacheConfig{ KeptSize: 100, DroppedSize: 100, @@ -611,6 +642,10 @@ func TestStableMaxAlloc(t *testing.T) { GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1}, SendTickerVal: 2 * time.Millisecond, ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + CacheCapacity: 1000, + }, } transmission := &transmit.MockTransmission{} @@ -703,6 +738,10 @@ func TestAddSpanNoBlock(t *testing.T) { GetSamplerTypeVal: &config.DeterministicSamplerConfig{}, SendTickerVal: 2 * time.Millisecond, ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + CacheCapacity: 10, + }, } transmission := &transmit.MockTransmission{} @@ -749,6 +788,8 @@ func TestDependencyInjection(t *testing.T) { &inject.Object{Value: &logger.NullLogger{}}, &inject.Object{Value: noop.NewTracerProvider().Tracer("test"), Name: "tracer"}, &inject.Object{Value: clockwork.NewRealClock()}, + &inject.Object{Value: &health.Health{}}, + &inject.Object{Value: &sharder.SingleServerSharder{}}, &inject.Object{Value: &transmit.MockTransmission{}, Name: "upstreamTransmission"}, &inject.Object{Value: &metrics.NullMetrics{}, Name: "genericMetrics"}, &inject.Object{Value: &sample.SamplerFactory{}}, @@ -775,6 +816,10 @@ func TestAddCountsToRoot(t *testing.T) { AddSpanCountToRoot: true, AddCountsToRoot: true, ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + CacheCapacity: 3, + }, } transmission := &transmit.MockTransmission{} @@ -854,6 +899,9 @@ func TestLateRootGetsCounts(t *testing.T) { AddCountsToRoot: true, ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, AddRuleReasonToTrace: true, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + }, } transmission := &transmit.MockTransmission{} @@ -935,6 +983,9 @@ func TestAddSpanCount(t *testing.T) { SendTickerVal: 2 * time.Millisecond, AddSpanCountToRoot: true, ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + }, } transmission := &transmit.MockTransmission{} transmission.Start() @@ -998,6 +1049,9 @@ func TestLateRootGetsSpanCount(t *testing.T) { AddSpanCountToRoot: true, ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, AddRuleReasonToTrace: true, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + }, } transmission := &transmit.MockTransmission{} transmission.Start() @@ -1062,6 +1116,9 @@ func TestLateSpanNotDecorated(t *testing.T) { GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1}, SendTickerVal: 2 * time.Millisecond, ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + }, } transmission := &transmit.MockTransmission{} @@ -1124,6 +1181,9 @@ func TestAddAdditionalAttributes(t *testing.T) { "name": "foo", "other": "bar", }, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + }, } transmission := &transmit.MockTransmission{} transmission.Start() @@ -1189,6 +1249,9 @@ func TestStressReliefDecorateHostname(t *testing.T) { DeactivationLevel: 25, SamplingRate: 100, }, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + }, } transmission := &transmit.MockTransmission{} @@ -1285,6 +1348,9 @@ func TestSpanWithRuleReasons(t *testing.T) { SendTickerVal: 2 * time.Millisecond, ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, AddRuleReasonToTrace: true, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + }, } transmission := &transmit.MockTransmission{} @@ -1424,6 +1490,9 @@ func TestIsRootSpan(t *testing.T) { collector := &InMemCollector{ Config: &config.MockConfig{ ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + }, }, } @@ -1433,3 +1502,96 @@ func TestIsRootSpan(t *testing.T) { }) } } + +func TestDrainTracesOnShutdown(t *testing.T) { + // set up the trace cache + conf := &config.MockConfig{ + GetSendDelayVal: 1 * time.Millisecond, + GetTraceTimeoutVal: 60 * time.Second, + GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1}, + SendTickerVal: 2 * time.Millisecond, + ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(100 * time.Millisecond), + CacheCapacity: 3, + }, + } + transmission := &transmit.MockTransmission{} + transmission.Start() + coll := newTestCollector(conf, transmission) + coll.hostname = "host123" + c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) + coll.cache = c + stc, err := newCache() + assert.NoError(t, err, "lru cache should start") + coll.sampleTraceCache = stc + + coll.incoming = make(chan *types.Span, 5) + coll.fromPeer = make(chan *types.Span, 5) + coll.datasetSamplers = make(map[string]sample.Sampler) + + sentTraceChan := make(chan sentRecord, 1) + forwardTraceChan := make(chan *types.Span, 1) + expiredTraceChan := make(chan *types.Span, 1) + + // test 1 + // the trace in cache already has decision made + trace1 := &types.Trace{ + TraceID: "traceID1", + } + span1 := &types.Span{ + TraceID: "traceID1", + Event: types.Event{ + Dataset: "aoeu", + Data: make(map[string]interface{}), + }, + } + + stc.Record(trace1, true, "test") + + coll.distributeSpansOnShutdown(sentTraceChan, forwardTraceChan, span1) + require.Len(t, sentTraceChan, 1) + require.Len(t, forwardTraceChan, 0) + require.Len(t, expiredTraceChan, 0) + + ctx1, cancel1 := context.WithCancel(context.Background()) + go coll.sendSpansOnShutdown(ctx1, sentTraceChan, forwardTraceChan) + require.EventuallyWithT(t, func(collect *assert.CollectT) { + transmission.Mux.Lock() + events := transmission.Events + require.Len(collect, events, 1) + require.Equal(collect, span1.Dataset, events[0].Dataset) + transmission.Mux.Unlock() + }, 2*time.Second, 100*time.Millisecond) + + cancel1() + transmission.Flush() + + // test 2 + // we can't make a decision for the trace yet, let's + // forward it to its new home + span2 := &types.Span{ + TraceID: "traceID2", + Event: types.Event{ + Dataset: "test2", + Data: make(map[string]interface{}), + }, + } + + coll.distributeSpansOnShutdown(sentTraceChan, forwardTraceChan, span2) + require.Len(t, sentTraceChan, 0) + require.Len(t, forwardTraceChan, 1) + require.Len(t, expiredTraceChan, 0) + + ctx2, cancel2 := context.WithCancel(context.Background()) + go coll.sendSpansOnShutdown(ctx2, sentTraceChan, forwardTraceChan) + require.EventuallyWithT(t, func(collect *assert.CollectT) { + transmission.Mux.Lock() + require.Len(collect, transmission.Events, 1) + require.Equal(collect, span2.Dataset, transmission.Events[0].Dataset) + require.Equal(collect, "http://self", transmission.Events[0].APIHost) + transmission.Mux.Unlock() + }, 2*time.Second, 100*time.Millisecond) + cancel2() + +} diff --git a/config/file_config.go b/config/file_config.go index 7b148922cc..482154028d 100644 --- a/config/file_config.go +++ b/config/file_config.go @@ -276,12 +276,14 @@ type RedisPeerManagementConfig struct { type CollectionConfig struct { // CacheCapacity must be less than math.MaxInt32 - CacheCapacity int `yaml:"CacheCapacity" default:"10_000"` - PeerQueueSize int `yaml:"PeerQueueSize"` - IncomingQueueSize int `yaml:"IncomingQueueSize"` - AvailableMemory MemorySize `yaml:"AvailableMemory" cmdenv:"AvailableMemory"` - MaxMemoryPercentage int `yaml:"MaxMemoryPercentage" default:"75"` - MaxAlloc MemorySize `yaml:"MaxAlloc"` + CacheCapacity int `yaml:"CacheCapacity" default:"10_000"` + PeerQueueSize int `yaml:"PeerQueueSize"` + IncomingQueueSize int `yaml:"IncomingQueueSize"` + AvailableMemory MemorySize `yaml:"AvailableMemory" cmdenv:"AvailableMemory"` + MaxMemoryPercentage int `yaml:"MaxMemoryPercentage" default:"75"` + MaxAlloc MemorySize `yaml:"MaxAlloc"` + DisableRedistribution bool `yaml:"DisableRedistribution"` + ShutdownDelay Duration `yaml:"ShutdownDelay" default:"15s"` } // GetMaxAlloc returns the maximum amount of memory to use for the cache. diff --git a/config/metadata/configMeta.yaml b/config/metadata/configMeta.yaml index f5a4543361..aff3096c84 100644 --- a/config/metadata/configMeta.yaml +++ b/config/metadata/configMeta.yaml @@ -1223,6 +1223,33 @@ groups: supported. See `MaxMemoryPercentage` for more details. If set, `Collections.AvailableMemory` must not be defined. + - name: DisableRedistribution + type: bool + valuetype: nondefault + firstversion: v2.8 + default: false + reload: true + summary: controls whether to transmit traces in cache to remaining peers during cluster scaling event. + description: > + If `true`, Refinery will NOT forward live traces in its cache to the rest of the peers when peers join or leave the cluster. + By diabling this behavior, it can help to prevent distuptive burst of network traffic when large traces with long TraceTimeout + are redistributed. + + - name: ShutdownDelay + type: duration + valuetype: nondefault + default: 15s + reload: true + summary: controls the maximum time Refinery can use while draining traces at shutdown. + description: > + This setting controls the duration that Refinery expects to have to + drain in-process traces before shutting down an instance. When asked + to shut down gracefully, Refinery stops accepting new spans + immediately and drains the remaining traces by sending them to remaining + peers. + This value should be set to a bit less than the normal timeout period + for shutting down without forcibly terminating the process. + - name: BufferSizes title: "Buffer Sizes" description: > diff --git a/route/route.go b/route/route.go index e5b357a63f..fa74aeebb6 100644 --- a/route/route.go +++ b/route/route.go @@ -604,7 +604,7 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error { // Figure out if we should handle this span locally or pass on to a peer targetShard := r.Sharder.WhichShard(traceID) - if r.incomingOrPeer == "incoming" && !targetShard.Equals(r.Sharder.MyShard()) { + if !targetShard.Equals(r.Sharder.MyShard()) { r.Metrics.Increment(r.incomingOrPeer + "_router_peer") debugLog. WithString("peer", targetShard.GetAddress()). diff --git a/sharder/single.go b/sharder/single.go index e2003a29df..edcfcf5dcc 100644 --- a/sharder/single.go +++ b/sharder/single.go @@ -12,7 +12,7 @@ var selfShard SingleShard = "self" func (s *SingleShard) Equals(other Shard) bool { return true } // GetAddress will never be used because every shard is my shard -func (s *SingleShard) GetAddress() string { return "" } +func (s *SingleShard) GetAddress() string { return "http://self" } type SingleServerSharder struct { Logger logger.Logger `inject:""`