Skip to content

Commit

Permalink
add automatic tail sampling based on latency and error count thresholds
Browse files Browse the repository at this point in the history
  • Loading branch information
ivantopo committed Nov 17, 2021
1 parent 00ce0d5 commit 6069c0c
Showing 5 changed files with 224 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/* =========================================================================================
* Copyright © 2013-2021 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/

package kamon.trace

import kamon.Kamon
import kamon.testkit.{InitAndStopKamonAfterAll, Reconfigure, SpanInspection, TestSpanReporter}
import org.scalactic.TimesOnInt.convertIntToRepeater
import org.scalatest.OptionValues
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.SpanSugar
import org.scalatest.wordspec.AnyWordSpec

import java.time.Instant


class LocalTailSamplerSpec extends AnyWordSpec with Matchers with OptionValues with SpanInspection.Syntax with Eventually
with SpanSugar with TestSpanReporter with Reconfigure with InitAndStopKamonAfterAll {


"the Kamon local tail sampler" should {
"keep traces that match the error count threshold" in {
applyConfig(
"""
|kamon.trace {
| sampler = never
| span-reporting-delay = 1 second
|
| local-tail-sampler {
| enabled = yes
| error-count-threshold = 3
| }
|}
|""".stripMargin)

val parentSpan = Kamon.spanBuilder("parent-with-errors").start()

5 times {
Kamon.spanBuilder("child")
.asChildOf(parentSpan)
.start()
.fail("failing for tests")
.finish()
}

parentSpan.finish()
var spansFromParentTrace = 0

eventually(timeout(5 seconds)) {
val reportedSpan = testSpanReporter().nextSpan().value
reportedSpan.trace.id shouldBe parentSpan.trace.id
spansFromParentTrace += 1
spansFromParentTrace shouldBe 6 // The parent Span plus five child Spans
}
}

"keep traces that match the latency threshold" in {
applyConfig(
"""
|kamon.trace {
| sampler = never
| span-reporting-delay = 1 second
|
| local-tail-sampler {
| enabled = yes
| latency-threshold = 3 seconds
| }
|}
|""".stripMargin)

val startInstant = Instant.now()
val parentSpan = Kamon.spanBuilder("parent-with-high-latency").start(startInstant)

5 times {
Kamon.spanBuilder("child")
.asChildOf(parentSpan)
.start()
.finish()
}

parentSpan.finish(startInstant.plusSeconds(5))
var spansFromParentTrace = 0

eventually(timeout(5 seconds)) {
val reportedSpan = testSpanReporter().nextSpan().value
reportedSpan.trace.id shouldBe parentSpan.trace.id
spansFromParentTrace += 1
spansFromParentTrace shouldBe 6 // The parent Span plus five child Spans
}
}

"not keep traces when tail sampling is disabled, even if they meet the criteria" in {
applyConfig(
"""
|kamon.trace {
| sampler = never
| span-reporting-delay = 1 second
|
| local-tail-sampler {
| enabled = no
| error-count-threshold= 1
| latency-threshold = 3 seconds
| }
|}
|""".stripMargin)

val startInstant = Instant.now()
val parentSpan = Kamon.spanBuilder("parent-with-disabled-tail-sampler").start(startInstant)

5 times {
Kamon.spanBuilder("child")
.asChildOf(parentSpan)
.start()
.fail("failure that shouldn't cause the trace to be sampled")
.finish()
}

parentSpan.finish(startInstant.plusSeconds(5))

4 times {
val allSpans = testSpanReporter().spans()
allSpans.find(_.operationName == parentSpan.operationName()) shouldBe empty

Thread.sleep(1000) // Should be enough time since all spans would be flushed after 1 second
}
}
}
}
24 changes: 24 additions & 0 deletions core/kamon-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -241,6 +241,30 @@ kamon {
}
}

# Automatically change the sampling decision for a trace based on the either the number of errors in a trace, or a
# fixed latency threshold. The local tail sampler will only work when the `span-reporting-delay` setting is set to a
# value greater than zero.
#
# Important things to keep in mind:
# - The sampling decision override happens only in the current process. This means that any requests sent to other
# services before the sampling decision override happened might be missing from the resulting trace.

# - Try to set `span-reporting-delay` to a value slightly higher than your application's timeout to ensure all the
# local spans related to the trace will still be kept in memory when the sampling override happens
#
local-tail-sampler {

# Turns the local tail sampler logic on or off.
enabled = no

# Overrides the sampling decision to "Sample" when a trace contains at least this number of errors when the local
# Root Span finishes processing.
error-count-threshold = 1

# Overrides the sampling decision to "Sample" when the local Root Span took more than this time to complete.
latency-threshold = 1 second
}

# Settings that influence the tags applied to the "span.processing-time" metric for all finished spans with metric
# tracking enabled.
#
19 changes: 18 additions & 1 deletion core/kamon-core/src/main/scala/kamon/trace/Span.scala
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ import kamon.context.Context
import kamon.tag.TagSet
import kamon.trace.Span.Link
import kamon.trace.Trace.SamplingDecision
import kamon.trace.Tracer.LocalTailSamplerSettings
import kamon.util.Clock
import org.slf4j.LoggerFactory

@@ -410,7 +411,8 @@ object Span {
val kind: Kind, localParent: Option[Span], initialOperationName: String, spanTags: TagSet.Builder, metricTags: TagSet.Builder,
createdAt: Instant, initialMarks: List[Mark], initialLinks: List[Link], initialTrackMetrics: Boolean, tagWithParentOperation: Boolean,
includeErrorStacktrace: Boolean, isDelayed: Boolean, clock: Clock, preFinishHooks: Array[Tracer.PreFinishHook],
onFinish: Span.Finished => Unit, sampler: Sampler, scheduler: ScheduledExecutorService, reportingDelay: Duration) extends Span.Delayed {
onFinish: Span.Finished => Unit, sampler: Sampler, scheduler: ScheduledExecutorService, reportingDelay: Duration,
localTailSamplerSettings: LocalTailSamplerSettings) extends Span.Delayed {

private val _metricTags = metricTags
private val _spanTags = spanTags
@@ -506,6 +508,7 @@ object Span {
override def fail(message: String): Span = synchronized {
if(_isOpen) {
_hasError = true
trace.spanFailed()

if((isSampled || !reportingDelay.isZero))
_spanTags.add(TagKeys.ErrorMessage, message)
@@ -516,6 +519,7 @@ object Span {
override def fail(throwable: Throwable): Span = synchronized {
if(_isOpen) {
_hasError = true
trace.spanFailed()

if((isSampled || !reportingDelay.isZero)) {
_spanTags.add(TagKeys.ErrorMessage, throwable.getMessage)
@@ -530,6 +534,7 @@ object Span {
override def fail(message: String, throwable: Throwable): Span = synchronized {
if(_isOpen) {
_hasError = true
trace.spanFailed()

if((isSampled || !reportingDelay.isZero)) {
_spanTags.add(TagKeys.ErrorMessage, message)
@@ -643,6 +648,17 @@ object Span {
}

private def reportSpan(finishedAt: Instant, metricTags: TagSet): Unit = {
val isRootSpan = (position == Position.Root || position == Position.LocalRoot)

if (isRootSpan && localTailSamplerSettings.enabled) {
val hasEnoughErrors = trace.failedSpansCount() >= localTailSamplerSettings.errorCountThreshold
val hasEnoughLatency = Clock.nanosBetween(_startedAt, finishedAt) >= localTailSamplerSettings.latencyThresholdNanos

if (hasEnoughErrors || hasEnoughLatency) {
trace.keep()
}
}

if(reportingDelay.isZero) {
if(isSampled)
onFinish(toFinishedSpan(finishedAt, metricTags))
@@ -674,6 +690,7 @@ object Span {
}

private class DelayedReportingRunnable(finishedAt: Instant, metricTags: TagSet) extends Runnable {

override def run(): Unit = {
if (isSampled)
onFinish(toFinishedSpan(finishedAt, metricTags))
20 changes: 20 additions & 0 deletions core/kamon-core/src/main/scala/kamon/trace/Trace.scala
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@
package kamon
package trace

import java.util.concurrent.atomic.AtomicInteger

/**
* Holds information shared across all Spans from the same Trace. It might seem like too little information but all in
* all, a trace is just a bunch of Spans that share the same trace identifier ;).
@@ -56,6 +58,17 @@ trait Trace {
*/
def keep(): Unit

/**
* Signals that a Span belonging to this trace has failed.
*/
def spanFailed(): Unit

/**
* Returns the number of failed Spans for this trace in this process. This error count does not reflect errors that
* might have happened on other services participating in the same trace.
*/
def failedSpansCount(): Int

}

object Trace {
@@ -80,6 +93,7 @@ object Trace {

private class MutableTrace(val id: Identifier, initialDecision: Trace.SamplingDecision) extends Trace {
@volatile private var _samplingDecision = initialDecision
@volatile private var _failedSpansCount = new AtomicInteger(0)

override def samplingDecision: SamplingDecision =
_samplingDecision
@@ -90,6 +104,12 @@ object Trace {
override def keep(): Unit =
_samplingDecision = SamplingDecision.Sample

override def spanFailed(): Unit =
_failedSpansCount.incrementAndGet()

override def failedSpansCount(): Int =
_failedSpansCount.get()

override def toString(): String =
s"{id=${id.string},samplingDecision=${_samplingDecision}"
}
22 changes: 21 additions & 1 deletion core/kamon-core/src/main/scala/kamon/trace/Tracer.scala
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ import kamon.tag.TagSet
import kamon.tag.Lookups.option
import kamon.trace.Span.{Kind, Link, Position, TagKeys}
import kamon.trace.Trace.SamplingDecision
import kamon.trace.Tracer.LocalTailSamplerSettings
import kamon.util.Clock
import org.jctools.queues.{MessagePassingQueue, MpscArrayQueue}
import org.slf4j.LoggerFactory
@@ -51,6 +52,7 @@ class Tracer(initialConfig: Config, clock: Clock, contextStorage: ContextStorage
@volatile private var _preStartHooks: Array[Tracer.PreStartHook] = Array.empty
@volatile private var _preFinishHooks: Array[Tracer.PreFinishHook] = Array.empty
@volatile private var _delayedSpanReportingDelay: Duration = Duration.ZERO
@volatile private var _localTailSamplerSettings: LocalTailSamplerSettings = LocalTailSamplerSettings(false, Int.MaxValue, Long.MaxValue)
@volatile private var _scheduler: Option[ScheduledExecutorService] = None
private val _onSpanFinish: Span.Finished => Unit = _spanBuffer.offer

@@ -361,7 +363,7 @@ class Tracer(initialConfig: Config, clock: Clock, contextStorage: ContextStorage

new Span.Local(id, parentId, trace, position, _kind, localParent, _name, _spanTags, _metricTags, at, _marks, _links,
_trackMetrics, _tagWithParentOperation, _includeErrorStacktrace, isDelayed, clock, _preFinishHooks, _onSpanFinish,
_sampler, _scheduler.get, _delayedSpanReportingDelay)
_sampler, _scheduler.get, _delayedSpanReportingDelay, _localTailSamplerSettings)
}
}

@@ -428,6 +430,17 @@ class Tracer(initialConfig: Config, clock: Clock, contextStorage: ContextStorage
val tagWithParentOperation = traceConfig.getBoolean("span-metric-tags.parent-operation")
val includeErrorStacktrace = traceConfig.getBoolean("include-error-stacktrace")
val delayedSpanReportingDelay = traceConfig.getDuration("span-reporting-delay")
val localTailSamplerSettings = LocalTailSamplerSettings(
enabled = traceConfig.getBoolean("local-tail-sampler.enabled"),
errorCountThreshold = traceConfig.getInt("local-tail-sampler.error-count-threshold"),
latencyThresholdNanos = traceConfig.getDuration("local-tail-sampler.latency-threshold").toNanos
)

if(localTailSamplerSettings.enabled && delayedSpanReportingDelay.isZero) {
_logger.warn(
"Enabling local tail sampling without a span-reporting-delay setting will probably lead to incomplete " +
"traces. Consider setting span-reporting-delay to a value slightly above your application's requests timeout")
}

if(_traceReporterQueueSize != traceReporterQueueSize) {
// By simply changing the buffer we might be dropping Spans that have not been collected yet by the reporters.
@@ -445,6 +458,7 @@ class Tracer(initialConfig: Config, clock: Clock, contextStorage: ContextStorage
_tagWithParentOperation = tagWithParentOperation
_traceReporterQueueSize = traceReporterQueueSize
_delayedSpanReportingDelay = delayedSpanReportingDelay
_localTailSamplerSettings = localTailSamplerSettings
_preStartHooks = preStartHooks
_preFinishHooks = preFinishHooks

@@ -490,4 +504,10 @@ object Tracer {
trait PreFinishHook {
def beforeFinish(span: Span): Unit
}

private[trace] case class LocalTailSamplerSettings(
enabled: Boolean,
errorCountThreshold: Int,
latencyThresholdNanos: Long
)
}

0 comments on commit 6069c0c

Please sign in to comment.