Skip to content

Commit

Permalink
add automatic tail sampling based on latency and error count threshol…
Browse files Browse the repository at this point in the history
…ds (#1080)
  • Loading branch information
ivantopo authored Nov 17, 2021
1 parent be83653 commit 25bb796
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/* =========================================================================================
* Copyright © 2013-2021 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/

package kamon.trace

import kamon.Kamon
import kamon.testkit.{InitAndStopKamonAfterAll, Reconfigure, SpanInspection, TestSpanReporter}
import org.scalactic.TimesOnInt.convertIntToRepeater
import org.scalatest.OptionValues
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.SpanSugar
import org.scalatest.wordspec.AnyWordSpec

import java.time.Instant


class LocalTailSamplerSpec extends AnyWordSpec with Matchers with OptionValues with SpanInspection.Syntax with Eventually
with SpanSugar with TestSpanReporter with Reconfigure with InitAndStopKamonAfterAll {


"the Kamon local tail sampler" should {
"keep traces that match the error count threshold" in {
applyConfig(
"""
|kamon.trace {
| sampler = never
| span-reporting-delay = 1 second
|
| local-tail-sampler {
| enabled = yes
| error-count-threshold = 3
| }
|}
|""".stripMargin)

val parentSpan = Kamon.spanBuilder("parent-with-errors").start()

5 times {
Kamon.spanBuilder("child")
.asChildOf(parentSpan)
.start()
.fail("failing for tests")
.finish()
}

parentSpan.finish()
var spansFromParentTrace = 0

eventually(timeout(5 seconds)) {
val reportedSpan = testSpanReporter().nextSpan().value
reportedSpan.trace.id shouldBe parentSpan.trace.id
spansFromParentTrace += 1
spansFromParentTrace shouldBe 6 // The parent Span plus five child Spans
}
}

"keep traces that match the latency threshold" in {
applyConfig(
"""
|kamon.trace {
| sampler = never
| span-reporting-delay = 1 second
|
| local-tail-sampler {
| enabled = yes
| latency-threshold = 3 seconds
| }
|}
|""".stripMargin)

val startInstant = Instant.now()
val parentSpan = Kamon.spanBuilder("parent-with-high-latency").start(startInstant)

5 times {
Kamon.spanBuilder("child")
.asChildOf(parentSpan)
.start()
.finish()
}

parentSpan.finish(startInstant.plusSeconds(5))
var spansFromParentTrace = 0

eventually(timeout(5 seconds)) {
val reportedSpan = testSpanReporter().nextSpan().value
reportedSpan.trace.id shouldBe parentSpan.trace.id
spansFromParentTrace += 1
spansFromParentTrace shouldBe 6 // The parent Span plus five child Spans
}
}

"not keep traces when tail sampling is disabled, even if they meet the criteria" in {
applyConfig(
"""
|kamon.trace {
| sampler = never
| span-reporting-delay = 1 second
|
| local-tail-sampler {
| enabled = no
| error-count-threshold= 1
| latency-threshold = 3 seconds
| }
|}
|""".stripMargin)

val startInstant = Instant.now()
val parentSpan = Kamon.spanBuilder("parent-with-disabled-tail-sampler").start(startInstant)

5 times {
Kamon.spanBuilder("child")
.asChildOf(parentSpan)
.start()
.fail("failure that shouldn't cause the trace to be sampled")
.finish()
}

parentSpan.finish(startInstant.plusSeconds(5))

4 times {
val allSpans = testSpanReporter().spans()
allSpans.find(_.operationName == parentSpan.operationName()) shouldBe empty

Thread.sleep(1000) // Should be enough time since all spans would be flushed after 1 second
}
}
}
}
24 changes: 24 additions & 0 deletions core/kamon-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,30 @@ kamon {
}
}

# Automatically change the sampling decision for a trace based on the either the number of errors in a trace, or a
# fixed latency threshold. The local tail sampler will only work when the `span-reporting-delay` setting is set to a
# value greater than zero.
#
# Important things to keep in mind:
# - The sampling decision override happens only in the current process. This means that any requests sent to other
# services before the sampling decision override happened might be missing from the resulting trace.

# - Try to set `span-reporting-delay` to a value slightly higher than your application's timeout to ensure all the
# local spans related to the trace will still be kept in memory when the sampling override happens
#
local-tail-sampler {

# Turns the local tail sampler logic on or off.
enabled = no

# Overrides the sampling decision to "Sample" when a trace contains at least this number of errors when the local
# Root Span finishes processing.
error-count-threshold = 1

# Overrides the sampling decision to "Sample" when the local Root Span took more than this time to complete.
latency-threshold = 1 second
}

# Settings that influence the tags applied to the "span.processing-time" metric for all finished spans with metric
# tracking enabled.
#
Expand Down
19 changes: 18 additions & 1 deletion core/kamon-core/src/main/scala/kamon/trace/Span.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import kamon.context.Context
import kamon.tag.TagSet
import kamon.trace.Span.Link
import kamon.trace.Trace.SamplingDecision
import kamon.trace.Tracer.LocalTailSamplerSettings
import kamon.util.Clock
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -410,7 +411,8 @@ object Span {
val kind: Kind, localParent: Option[Span], initialOperationName: String, spanTags: TagSet.Builder, metricTags: TagSet.Builder,
createdAt: Instant, initialMarks: List[Mark], initialLinks: List[Link], initialTrackMetrics: Boolean, tagWithParentOperation: Boolean,
includeErrorStacktrace: Boolean, isDelayed: Boolean, clock: Clock, preFinishHooks: Array[Tracer.PreFinishHook],
onFinish: Span.Finished => Unit, sampler: Sampler, scheduler: ScheduledExecutorService, reportingDelay: Duration) extends Span.Delayed {
onFinish: Span.Finished => Unit, sampler: Sampler, scheduler: ScheduledExecutorService, reportingDelay: Duration,
localTailSamplerSettings: LocalTailSamplerSettings) extends Span.Delayed {

private val _metricTags = metricTags
private val _spanTags = spanTags
Expand Down Expand Up @@ -506,6 +508,7 @@ object Span {
override def fail(message: String): Span = synchronized {
if(_isOpen) {
_hasError = true
trace.spanFailed()

if((isSampled || !reportingDelay.isZero))
_spanTags.add(TagKeys.ErrorMessage, message)
Expand All @@ -516,6 +519,7 @@ object Span {
override def fail(throwable: Throwable): Span = synchronized {
if(_isOpen) {
_hasError = true
trace.spanFailed()

if((isSampled || !reportingDelay.isZero)) {
_spanTags.add(TagKeys.ErrorMessage, throwable.getMessage)
Expand All @@ -530,6 +534,7 @@ object Span {
override def fail(message: String, throwable: Throwable): Span = synchronized {
if(_isOpen) {
_hasError = true
trace.spanFailed()

if((isSampled || !reportingDelay.isZero)) {
_spanTags.add(TagKeys.ErrorMessage, message)
Expand Down Expand Up @@ -643,6 +648,17 @@ object Span {
}

private def reportSpan(finishedAt: Instant, metricTags: TagSet): Unit = {
val isRootSpan = (position == Position.Root || position == Position.LocalRoot)

if (isRootSpan && localTailSamplerSettings.enabled) {
val hasEnoughErrors = trace.failedSpansCount() >= localTailSamplerSettings.errorCountThreshold
val hasEnoughLatency = Clock.nanosBetween(_startedAt, finishedAt) >= localTailSamplerSettings.latencyThresholdNanos

if (hasEnoughErrors || hasEnoughLatency) {
trace.keep()
}
}

if(reportingDelay.isZero) {
if(isSampled)
onFinish(toFinishedSpan(finishedAt, metricTags))
Expand Down Expand Up @@ -674,6 +690,7 @@ object Span {
}

private class DelayedReportingRunnable(finishedAt: Instant, metricTags: TagSet) extends Runnable {

override def run(): Unit = {
if (isSampled)
onFinish(toFinishedSpan(finishedAt, metricTags))
Expand Down
20 changes: 20 additions & 0 deletions core/kamon-core/src/main/scala/kamon/trace/Trace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package kamon
package trace

import java.util.concurrent.atomic.AtomicInteger

/**
* Holds information shared across all Spans from the same Trace. It might seem like too little information but all in
* all, a trace is just a bunch of Spans that share the same trace identifier ;).
Expand Down Expand Up @@ -56,6 +58,17 @@ trait Trace {
*/
def keep(): Unit

/**
* Signals that a Span belonging to this trace has failed.
*/
def spanFailed(): Unit

/**
* Returns the number of failed Spans for this trace in this process. This error count does not reflect errors that
* might have happened on other services participating in the same trace.
*/
def failedSpansCount(): Int

}

object Trace {
Expand All @@ -80,6 +93,7 @@ object Trace {

private class MutableTrace(val id: Identifier, initialDecision: Trace.SamplingDecision) extends Trace {
@volatile private var _samplingDecision = initialDecision
@volatile private var _failedSpansCount = new AtomicInteger(0)

override def samplingDecision: SamplingDecision =
_samplingDecision
Expand All @@ -90,6 +104,12 @@ object Trace {
override def keep(): Unit =
_samplingDecision = SamplingDecision.Sample

override def spanFailed(): Unit =
_failedSpansCount.incrementAndGet()

override def failedSpansCount(): Int =
_failedSpansCount.get()

override def toString(): String =
s"{id=${id.string},samplingDecision=${_samplingDecision}"
}
Expand Down
22 changes: 21 additions & 1 deletion core/kamon-core/src/main/scala/kamon/trace/Tracer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import kamon.tag.TagSet
import kamon.tag.Lookups.option
import kamon.trace.Span.{Kind, Link, Position, TagKeys}
import kamon.trace.Trace.SamplingDecision
import kamon.trace.Tracer.LocalTailSamplerSettings
import kamon.util.Clock
import org.jctools.queues.{MessagePassingQueue, MpscArrayQueue}
import org.slf4j.LoggerFactory
Expand All @@ -51,6 +52,7 @@ class Tracer(initialConfig: Config, clock: Clock, contextStorage: ContextStorage
@volatile private var _preStartHooks: Array[Tracer.PreStartHook] = Array.empty
@volatile private var _preFinishHooks: Array[Tracer.PreFinishHook] = Array.empty
@volatile private var _delayedSpanReportingDelay: Duration = Duration.ZERO
@volatile private var _localTailSamplerSettings: LocalTailSamplerSettings = LocalTailSamplerSettings(false, Int.MaxValue, Long.MaxValue)
@volatile private var _scheduler: Option[ScheduledExecutorService] = None
private val _onSpanFinish: Span.Finished => Unit = _spanBuffer.offer

Expand Down Expand Up @@ -361,7 +363,7 @@ class Tracer(initialConfig: Config, clock: Clock, contextStorage: ContextStorage

new Span.Local(id, parentId, trace, position, _kind, localParent, _name, _spanTags, _metricTags, at, _marks, _links,
_trackMetrics, _tagWithParentOperation, _includeErrorStacktrace, isDelayed, clock, _preFinishHooks, _onSpanFinish,
_sampler, _scheduler.get, _delayedSpanReportingDelay)
_sampler, _scheduler.get, _delayedSpanReportingDelay, _localTailSamplerSettings)
}
}

Expand Down Expand Up @@ -428,6 +430,17 @@ class Tracer(initialConfig: Config, clock: Clock, contextStorage: ContextStorage
val tagWithParentOperation = traceConfig.getBoolean("span-metric-tags.parent-operation")
val includeErrorStacktrace = traceConfig.getBoolean("include-error-stacktrace")
val delayedSpanReportingDelay = traceConfig.getDuration("span-reporting-delay")
val localTailSamplerSettings = LocalTailSamplerSettings(
enabled = traceConfig.getBoolean("local-tail-sampler.enabled"),
errorCountThreshold = traceConfig.getInt("local-tail-sampler.error-count-threshold"),
latencyThresholdNanos = traceConfig.getDuration("local-tail-sampler.latency-threshold").toNanos
)

if(localTailSamplerSettings.enabled && delayedSpanReportingDelay.isZero) {
_logger.warn(
"Enabling local tail sampling without a span-reporting-delay setting will probably lead to incomplete " +
"traces. Consider setting span-reporting-delay to a value slightly above your application's requests timeout")
}

if(_traceReporterQueueSize != traceReporterQueueSize) {
// By simply changing the buffer we might be dropping Spans that have not been collected yet by the reporters.
Expand All @@ -445,6 +458,7 @@ class Tracer(initialConfig: Config, clock: Clock, contextStorage: ContextStorage
_tagWithParentOperation = tagWithParentOperation
_traceReporterQueueSize = traceReporterQueueSize
_delayedSpanReportingDelay = delayedSpanReportingDelay
_localTailSamplerSettings = localTailSamplerSettings
_preStartHooks = preStartHooks
_preFinishHooks = preFinishHooks

Expand Down Expand Up @@ -490,4 +504,10 @@ object Tracer {
trait PreFinishHook {
def beforeFinish(span: Span): Unit
}

private[trace] case class LocalTailSamplerSettings(
enabled: Boolean,
errorCountThreshold: Int,
latencyThresholdNanos: Long
)
}

0 comments on commit 25bb796

Please sign in to comment.