diff --git a/CHANGELOG.md b/CHANGELOG.md index 502c4e18cf..fdaa0207ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5723](https://github.com/thanos-io/thanos/pull/5723) Compactor: Support disable block viewer UI. - [#5674](https://github.com/thanos-io/thanos/pull/5674) Query Frontend/Store: Add support connecting to redis using TLS. - [#5734](https://github.com/thanos-io/thanos/pull/5734) Store: Support disable block viewer UI. +- [#5411](https://github.com/thanos-io/thanos/pull/5411) Tracing: Add OpenTelemetry Protocol exporter. ### Changed @@ -33,6 +34,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5641](https://github.com/thanos-io/thanos/pull/5641) Store: Remove hardcoded labels in shard matcher. - [#5641](https://github.com/thanos-io/thanos/pull/5641) Query: Inject unshardable le label in query analyzer. - [#5685](https://github.com/thanos-io/thanos/pull/5685) Receive: Make active/head series limiting configuration per tenant by adding it to new limiting config. +- [#5411](https://github.com/thanos-io/thanos/pull/5411) Tracing: Change Jaeger exporter from OpenTracing to OpenTelemetry. *Options `RPC Metrics`, `Gen128Bit` and `Disabled` are now deprecated and won't have any effect when set :warning:.* ### Removed diff --git a/docs/tracing.md b/docs/tracing.md index df8558d884..430570d72f 100644 --- a/docs/tracing.md +++ b/docs/tracing.md @@ -70,9 +70,39 @@ Every request against any Thanos component's API with header `X-Thanos-Force-Tra Currently supported tracing backends: +### OpenTelemetry (OTLP) + +Thanos supports exporting traces in the OpenTelemetry Protocol (OTLP). Both gRPC and HTTP clients are supported. Options can be provided also via environment variables. For more details see the [exporter specification](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md#configuration-options). + +```yaml mdox-exec="go run scripts/cfggen/main.go --name=otlp.Config" +type: OTLP +config: + client_type: "" + reconnection_period: 0s + compression: "" + insecure: false + endpoint: "" + url_path: "" + timeout: 0s + retry_config: + retry_enabled: false + retry_initial_interval: 0s + retry_max_interval: 0s + retry_max_elapsed_time: 0s + headers: {} + tls_config: + ca_file: "" + cert_file: "" + key_file: "" + server_name: "" + insecure_skip_verify: false +``` + ### Jaeger -Client for https://github.com/jaegertracing/jaeger tracing. +Client for https://github.com/jaegertracing/jaeger tracing. Options can be provided also via environment variables. For more details see the Jaeger [exporter specification](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/sdk-environment-variables.md#jaeger-exporter). + +*WARNING: Options `RPC Metrics`, `Gen128Bit` and `Disabled` are now deprecated and won't have any effect when set* ```yaml mdox-exec="go run scripts/cfggen/main.go --name=jaeger.Config" type: JAEGER @@ -86,9 +116,17 @@ config: sampler_manager_host_port: "" sampler_max_operations: 0 sampler_refresh_interval: 0s + sampler_parent_config: + local_parent_sampled: false + remote_parent_sampled: false + sampling_server_url: "" + operation_name_late_binding: false + initial_sampler_rate: 0 reporter_max_queue_size: 0 reporter_flush_interval: 0s reporter_log_spans: false + reporter_disable_attempt_reconnecting: false + reporter_attempt_reconnect_interval: 0s endpoint: "" user: "" password: "" diff --git a/go.mod b/go.mod index 5e7a57036f..8785e202d7 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/thanos-io/thanos require ( + cloud.google.com/go/storage v1.22.1 // indirect cloud.google.com/go/trace v1.2.0 github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.8.3 github.com/NYTimes/gziphandler v1.1.1 @@ -72,7 +73,7 @@ require ( github.com/thanos-community/promql-engine v0.0.0-20220929065849-dbc95397ccf3 github.com/thanos-io/objstore v0.0.0-20220923084403-cec51c61948b github.com/uber/jaeger-client-go v2.30.0+incompatible - github.com/uber/jaeger-lib v2.4.1+incompatible + github.com/uber/jaeger-lib v2.4.1+incompatible // indirect github.com/vimeo/galaxycache v0.0.0-20210323154928-b7e5d71c067a github.com/weaveworks/common v0.0.0-20220706100410-67d27ed40fae go.elastic.co/apm v1.11.0 @@ -80,7 +81,11 @@ require ( go.etcd.io/etcd/api/v3 v3.5.4 go.etcd.io/etcd/client/pkg/v3 v3.5.4 go.etcd.io/etcd/client/v3 v3.5.4 - go.opentelemetry.io/contrib/propagators/ot v1.4.0 + go.opentelemetry.io/contrib/propagators/ot v1.9.0 // indirect + go.opentelemetry.io/otel v1.10.0 + go.opentelemetry.io/otel/bridge/opentracing v1.10.0 + go.opentelemetry.io/otel/sdk v1.9.0 + go.opentelemetry.io/otel/trace v1.10.0 go.uber.org/atomic v1.9.0 go.uber.org/automaxprocs v1.5.1 go.uber.org/goleak v1.1.12 @@ -89,6 +94,7 @@ require ( golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 golang.org/x/text v0.3.7 golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 + google.golang.org/api v0.91.0 // indirect google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 google.golang.org/grpc v1.48.0 google.golang.org/grpc/examples v0.0.0-20211119005141-f45e61797429 @@ -101,22 +107,30 @@ require ( require ( github.com/efficientgo/core v1.0.0-rc.0 github.com/minio/sha256-simd v1.0.0 - go.opentelemetry.io/otel v1.10.0 - go.opentelemetry.io/otel/bridge/opentracing v1.10.0 - go.opentelemetry.io/otel/sdk v1.9.0 - go.opentelemetry.io/otel/trace v1.10.0 ) require ( cloud.google.com/go v0.102.0 // indirect cloud.google.com/go/compute v1.7.0 // indirect cloud.google.com/go/iam v0.3.0 // indirect - cloud.google.com/go/storage v1.22.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.1 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect + go.opentelemetry.io/contrib/samplers/jaegerremote v0.3.0 + go.opentelemetry.io/otel/exporters/jaeger v1.8.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.9.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.9.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.9.0 +) + +require ( + github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd + go.opentelemetry.io/contrib/propagators/autoprop v0.34.0 +) + +require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.32.3 // indirect github.com/OneOfOne/xxhash v1.2.6 // indirect github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect @@ -139,6 +153,7 @@ require ( github.com/aws/smithy-go v1.11.1 // indirect github.com/baidubce/bce-sdk-go v0.9.111 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/cenkalti/backoff/v4 v4.1.3 // indirect github.com/chromedp/sysutil v1.0.0 // indirect github.com/clbanning/mxj v1.8.4 // indirect github.com/coreos/go-semver v0.3.0 // indirect @@ -147,7 +162,6 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dnaeon/go-vcr v1.2.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect - github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd // indirect github.com/elastic/go-sysinfo v1.8.1 // indirect github.com/elastic/go-windows v1.0.1 // indirect github.com/fatih/color v1.13.0 // indirect @@ -180,6 +194,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect github.com/googleapis/gax-go/v2 v2.4.0 // indirect github.com/googleapis/go-type-adapters v1.0.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.1 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-hclog v0.16.2 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect @@ -221,6 +236,7 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/stretchr/objx v0.4.0 // indirect github.com/tencentyun/cos-go-sdk-v5 v0.7.34 // indirect + github.com/tidwall/pretty v1.2.0 // indirect github.com/tklauser/go-sysconf v0.3.4 // indirect github.com/tklauser/numcpus v0.2.1 // indirect github.com/weaveworks/promrus v1.2.0 // indirect @@ -230,7 +246,12 @@ require ( go.mongodb.org/mongo-driver v1.10.0 // indirect go.opencensus.io v0.23.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.34.0 // indirect + go.opentelemetry.io/contrib/propagators/aws v1.9.0 // indirect + go.opentelemetry.io/contrib/propagators/b3 v1.9.0 // indirect + go.opentelemetry.io/contrib/propagators/jaeger v1.9.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.9.0 // indirect go.opentelemetry.io/otel/metric v0.31.0 // indirect + go.opentelemetry.io/proto/otlp v0.18.0 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.21.0 // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect @@ -239,7 +260,6 @@ require ( golang.org/x/tools v0.1.12 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect gonum.org/v1/gonum v0.12.0 // indirect - google.golang.org/api v0.91.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.28.1 // indirect gopkg.in/ini.v1 v1.66.6 // indirect diff --git a/go.sum b/go.sum index cd11a64913..23955a6d60 100644 --- a/go.sum +++ b/go.sum @@ -179,6 +179,8 @@ github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2y github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= +github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= +github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= @@ -401,6 +403,8 @@ github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzq github.com/golang-jwt/jwt/v4 v4.4.1 h1:pC5DB52sCeK48Wlb9oPcdhnjkz1TKt1D/P7WKJ0kUcQ= github.com/golang-jwt/jwt/v4 v4.4.1/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= +github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -532,6 +536,9 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92Bcuy github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.1 h1:/sDbPb60SusIXjiJGYLUoS/rAQurQmvGWmwn2bBPM9c= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.1/go.mod h1:G+WkljZi4mflcqVxYSgvt8MNctRQHjEH8ubKtt1Ka3w= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= github.com/hashicorp/consul/api v1.14.0 h1:Y64GIJ8hYTu+tuGekwO4G4ardXoiCivX9wv1iP/kihk= @@ -943,8 +950,9 @@ github.com/thanos-io/objstore v0.0.0-20220923084403-cec51c61948b h1:P+MnJn+NoU6N github.com/thanos-io/objstore v0.0.0-20220923084403-cec51c61948b/go.mod h1:Vx5dZs9ElxEhNLnum/OgB0pNTqNdI2zdXL82BeJr3T4= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= -github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tklauser/go-sysconf v0.3.4 h1:HT8SVixZd3IzLdfs/xlpq0jeSfTX57g1v6wB1EuzV7M= github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek= github.com/tklauser/numcpus v0.2.1 h1:ct88eFm+Q7m2ZfXJdan1xYoXKlmwsfP+k88q05KvlZc= @@ -1013,21 +1021,45 @@ go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.34.0 h1:9NkMW03wwEzPtP/KciZ4Ozu/Uz5ZA7kfqXJIObnrjGU= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.34.0/go.mod h1:548ZsYzmT4PL4zWKRd8q/N4z0Wxzn/ZxUE+lkEpwWQA= -go.opentelemetry.io/contrib/propagators/ot v1.4.0 h1:sHp8P5+xmMORvsgKjIPPX4U97JUgSqY4xPWa6ncF1PA= -go.opentelemetry.io/contrib/propagators/ot v1.4.0/go.mod h1:FivzsGJqC7ND++UUOifWfkiuEOFXtVQ3fh2ZkqIJ9X4= -go.opentelemetry.io/otel v1.4.0/go.mod h1:jeAqMFKy2uLIxCtKxoFj0FAL5zAPKQagc3+GtBWakzk= +go.opentelemetry.io/contrib/propagators/autoprop v0.34.0 h1:S1iBWYnf1iqK4O/qnjUhQ2MMNis/h5+LeB/51+uzGHI= +go.opentelemetry.io/contrib/propagators/autoprop v0.34.0/go.mod h1:lJppyBme+d8vGNukA9sHdlKvw/q4i4c9JXx2RTpuHmM= +go.opentelemetry.io/contrib/propagators/aws v1.9.0 h1:60BnkzNPdU3WD12oOGQNTjgbyws8iDggIaBWpghvcVo= +go.opentelemetry.io/contrib/propagators/aws v1.9.0/go.mod h1:lYGAfTJZU1mo92QxtuiuNJjRyRyEWj1ldO1b0Vpc1I0= +go.opentelemetry.io/contrib/propagators/b3 v1.9.0 h1:Lzb9zU98jCE2kyfCjWfSSsiQoGtvBL+COxvUBf7FNhU= +go.opentelemetry.io/contrib/propagators/b3 v1.9.0/go.mod h1:fyx3gFXn+4w5uWTTiqaI8oBNBW/6w9Ow5zxXf7NGixU= +go.opentelemetry.io/contrib/propagators/jaeger v1.9.0 h1:edJTgwezAtLKUINAXfjxllJ1vlsphNPV7RkuKNd/HkQ= +go.opentelemetry.io/contrib/propagators/jaeger v1.9.0/go.mod h1:Q/AXutvrBTfEDSeRLwOmKhyviX5adJvTesg6JFTybYg= +go.opentelemetry.io/contrib/propagators/ot v1.9.0 h1:+pYoqyFoA3H6EZ7Wie2ZQdqS4ZfG42PAGvj3eLUukHE= +go.opentelemetry.io/contrib/propagators/ot v1.9.0/go.mod h1:D2GfaecHHX67fXT93/5iKl2DArjt8+H0XWtFD8b4Z+k= +go.opentelemetry.io/contrib/samplers/jaegerremote v0.3.0 h1:SLLzX5hdPC0jR3t0MrmRhZkKZJ0UKhcB+0N/wWkiarQ= +go.opentelemetry.io/contrib/samplers/jaegerremote v0.3.0 h1:SLLzX5hdPC0jR3t0MrmRhZkKZJ0UKhcB+0N/wWkiarQ= +go.opentelemetry.io/contrib/samplers/jaegerremote v0.3.0/go.mod h1:QnxuwZJaTvT5YN/25CLle62v/7gal96wXN/CSOhWMaI= +go.opentelemetry.io/contrib/samplers/jaegerremote v0.3.0/go.mod h1:QnxuwZJaTvT5YN/25CLle62v/7gal96wXN/CSOhWMaI= go.opentelemetry.io/otel v1.10.0 h1:Y7DTJMR6zs1xkS/upamJYk0SxxN4C9AqRd77jmZnyY4= go.opentelemetry.io/otel v1.10.0/go.mod h1:NbvWjCthWHKBEUMpf0/v8ZRZlni86PpGFEMA9pnQSnQ= go.opentelemetry.io/otel/bridge/opentracing v1.10.0 h1:WzAVGovpC1s7KD5g4taU6BWYZP3QGSDVTlbRu9fIHw8= go.opentelemetry.io/otel/bridge/opentracing v1.10.0/go.mod h1:J7GLR/uxxqMAzZptsH0pjte3Ep4GacTCrbGBoDuHBqk= +go.opentelemetry.io/otel/exporters/jaeger v1.8.0 h1:TLLqD6kDhLPziEC7pgPrMvP9lAqdk3n1gf8DiFSnfW8= +go.opentelemetry.io/otel/exporters/jaeger v1.8.0/go.mod h1:GbWg+ng88rDtx+id26C34QLqw2erqJeAjsCx9AFeHfE= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.9.0 h1:ggqApEjDKczicksfvZUCxuvoyDmR6Sbm56LwiK8DVR0= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.9.0/go.mod h1:78XhIg8Ht9vR4tbLNUhXsiOnE2HOuSeKAiAcoVQEpOY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.9.0 h1:NN90Cuna0CnBg8YNu1Q0V35i2E8LDByFOwHRCq/ZP9I= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.9.0/go.mod h1:0EsCXjZAiiZGnLdEUXM9YjCKuuLZMYyglh2QDXcYKVA= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.9.0 h1:M0/hqGuJBLeIEu20f89H74RGtqV2dn+SFWEz9ATAAwY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.9.0/go.mod h1:K5G92gbtCrYJ0mn6zj9Pst7YFsDFuvSYEhYKRMcufnM= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.9.0 h1:FAF9l8Wjxi9Ad2k/vLTfHZyzXYX72C62wBGpV3G6AIo= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.9.0/go.mod h1:smUdtylgc0YQiUr2PuifS4hBXhAS5xtR6WQhxP1wiNA= go.opentelemetry.io/otel/metric v0.31.0 h1:6SiklT+gfWAwWUR0meEMxQBtihpiEs4c+vL9spDTqUs= go.opentelemetry.io/otel/metric v0.31.0/go.mod h1:ohmwj9KTSIeBnDBm/ZwH2PSZxZzoOaG2xZeekTRzL5A= go.opentelemetry.io/otel/sdk v1.9.0 h1:LNXp1vrr83fNXTHgU8eO89mhzxb/bbWAsHG6fNf3qWo= +go.opentelemetry.io/otel/sdk v1.9.0 h1:LNXp1vrr83fNXTHgU8eO89mhzxb/bbWAsHG6fNf3qWo= +go.opentelemetry.io/otel/sdk v1.9.0/go.mod h1:AEZc8nt5bd2F7BC24J5R0mrjYnpEgYHyTcM/vrSple4= go.opentelemetry.io/otel/sdk v1.9.0/go.mod h1:AEZc8nt5bd2F7BC24J5R0mrjYnpEgYHyTcM/vrSple4= -go.opentelemetry.io/otel/trace v1.4.0/go.mod h1:uc3eRsqDfWs9R7b92xbQbU42/eTNz4N+gLP8qJCi4aE= go.opentelemetry.io/otel/trace v1.10.0 h1:npQMbR8o7mum8uF95yFbOEJffhs1sbCOfDh8zAJiH5E= go.opentelemetry.io/otel/trace v1.10.0/go.mod h1:Sij3YYczqAdz+EhmGhE6TpTxUO5/F/AzrK+kxfGqySM= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.opentelemetry.io/proto/otlp v0.18.0 h1:W5hyXNComRa23tGpKwG+FRAc4rfF6ZUg1JReK+QHS80= +go.opentelemetry.io/proto/otlp v0.18.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/pkg/extprom/http/instrument_server.go b/pkg/extprom/http/instrument_server.go index 5af2e60dbc..d06c5261dd 100644 --- a/pkg/extprom/http/instrument_server.go +++ b/pkg/extprom/http/instrument_server.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/uber/jaeger-client-go" + "go.opentelemetry.io/otel/trace" ) // InstrumentationMiddleware holds necessary metrics to instrument an http.Server @@ -78,17 +79,36 @@ func httpInstrumentationHandler(baseLabels prometheus.Labels, metrics *defaultMe observer.Observe(time.Since(now).Seconds()) // If we find a tracingID we'll expose it as Exemplar. + var ( + traceID string + OTfound bool + ) + span := opentracing.SpanFromContext(r.Context()) if span != nil { spanCtx, ok := span.Context().(jaeger.SpanContext) if ok && spanCtx.IsSampled() { - observer.(prometheus.ExemplarObserver).ObserveWithExemplar( - time.Since(now).Seconds(), - prometheus.Labels{ - "traceID": spanCtx.TraceID().String(), - }, - ) + traceID = spanCtx.TraceID().String() } + + OTfound = ok + } + + // If OpenTracing span not found, try OTEL. + if !OTfound { + span := trace.SpanFromContext(r.Context()) + if span != nil { + traceID = span.SpanContext().SpanID().String() + } + } + + if traceID != "" { + observer.(prometheus.ExemplarObserver).ObserveWithExemplar( + time.Since(now).Seconds(), + prometheus.Labels{ + "traceID": traceID, + }, + ) } }), ), diff --git a/pkg/tracing/client/factory.go b/pkg/tracing/client/factory.go index 455c3afc31..2ebe818c95 100644 --- a/pkg/tracing/client/factory.go +++ b/pkg/tracing/client/factory.go @@ -20,16 +20,18 @@ import ( "github.com/thanos-io/thanos/pkg/tracing/jaeger" "github.com/thanos-io/thanos/pkg/tracing/lightstep" "github.com/thanos-io/thanos/pkg/tracing/migration" + "github.com/thanos-io/thanos/pkg/tracing/otlp" ) type TracingProvider string const ( - Stackdriver TracingProvider = "STACKDRIVER" - GoogleCloud TracingProvider = "GOOGLE_CLOUD" - Jaeger TracingProvider = "JAEGER" - ElasticAPM TracingProvider = "ELASTIC_APM" - Lightstep TracingProvider = "LIGHTSTEP" + Stackdriver TracingProvider = "STACKDRIVER" + GoogleCloud TracingProvider = "GOOGLE_CLOUD" + Jaeger TracingProvider = "JAEGER" + ElasticAPM TracingProvider = "ELASTIC_APM" + Lightstep TracingProvider = "LIGHTSTEP" + OpenTelemetryProtocol TracingProvider = "OTLP" ) type TracingConfig struct { @@ -63,11 +65,23 @@ func NewTracer(ctx context.Context, logger log.Logger, metrics *prometheus.Regis tracer, closerFunc := migration.Bridge(tracerProvider, logger) return tracer, closerFunc, nil case string(Jaeger): - return jaeger.NewTracer(ctx, logger, metrics, config) + tracerProvider, err := jaeger.NewTracerProvider(ctx, logger, config) + if err != nil { + return nil, nil, errors.Wrap(err, "new tracer provider err") + } + tracer, closerFunc := migration.Bridge(tracerProvider, logger) + return tracer, closerFunc, nil case string(ElasticAPM): return elasticapm.NewTracer(config) case string(Lightstep): return lightstep.NewTracer(ctx, config) + case string(OpenTelemetryProtocol): + tracerProvider, err := otlp.NewTracerProvider(ctx, logger, config) + if err != nil { + return nil, nil, errors.Wrap(err, "new tracer provider err") + } + tracer, closerFunc := migration.Bridge(tracerProvider, logger) + return tracer, closerFunc, nil default: return nil, nil, errors.Errorf("tracing with type %s is not supported", tracingConf.Type) } diff --git a/pkg/tracing/google_cloud/google_cloud.go b/pkg/tracing/google_cloud/google_cloud.go index c2c7fde212..bebadacbe6 100644 --- a/pkg/tracing/google_cloud/google_cloud.go +++ b/pkg/tracing/google_cloud/google_cloud.go @@ -7,15 +7,16 @@ import ( "context" "os" + "github.com/prometheus/common/version" + "github.com/thanos-io/thanos/pkg/tracing/migration" + cloudtrace "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/prometheus/common/version" - "github.com/thanos-io/thanos/pkg/tracing/migration" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/resource" tracesdk "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + semconv "go.opentelemetry.io/otel/semconv/v1.10.0" "gopkg.in/yaml.v2" ) diff --git a/pkg/tracing/google_cloud/google_cloud_test.go b/pkg/tracing/google_cloud/google_cloud_test.go index 69ab9ec59c..761cfa6e72 100644 --- a/pkg/tracing/google_cloud/google_cloud_test.go +++ b/pkg/tracing/google_cloud/google_cloud_test.go @@ -50,19 +50,8 @@ func TestContextTracing_ClientEnablesTracing(t *testing.T) { srvRoot, srvCtx := tracing.StartSpan(tracing.ContextWithTracer(clientCtx, srvTracer), "b") srvChild, _ := tracing.StartSpan(srvCtx, "bb") - testutil.Equals(t, 0, len(exp.GetSpans())) - srvChild.Finish() - testutil.Equals(t, 1, len(exp.GetSpans())) - testutil.Equals(t, 1, countSampledSpans(exp.GetSpans())) - - srvRoot.Finish() - testutil.Equals(t, 2, len(exp.GetSpans())) - testutil.Equals(t, 2, countSampledSpans(exp.GetSpans())) - - clientRoot.Finish() - testutil.Equals(t, 3, len(exp.GetSpans())) - testutil.Equals(t, 3, countSampledSpans(exp.GetSpans())) + tracing.CountSpans_ClientEnablesTracing(t, exp, clientRoot, srvRoot, srvChild) } // This test shows that if sample factor will disable tracing on client process, when it would be enabled on server @@ -92,17 +81,8 @@ func TestContextTracing_ClientDisablesTracing(t *testing.T) { srvRoot, srvCtx := tracing.StartSpan(tracing.ContextWithTracer(clientCtx, srvTracer), "b") srvChild, _ := tracing.StartSpan(srvCtx, "bb") - testutil.Equals(t, 0, len(exp.GetSpans())) - - // Since we are not recording neither sampling, no spans should show up. - srvChild.Finish() - testutil.Equals(t, 0, len(exp.GetSpans())) - srvRoot.Finish() - testutil.Equals(t, 0, len(exp.GetSpans())) - - clientRoot.Finish() - testutil.Equals(t, 0, len(exp.GetSpans())) + tracing.ContextTracing_ClientDisablesTracing(t, exp, clientRoot, srvRoot, srvChild) } // This test shows that if span will contain special baggage (for example from special HTTP header), even when sample @@ -137,28 +117,6 @@ func TestContextTracing_ForceTracing(t *testing.T) { srvRoot, srvCtx := tracing.StartSpan(tracing.ContextWithTracer(clientCtx, srvTracer), "b") srvChild, _ := tracing.StartSpan(srvCtx, "bb") - testutil.Equals(t, 0, len(exp.GetSpans())) - - srvChild.Finish() - testutil.Equals(t, 1, len(exp.GetSpans())) - testutil.Equals(t, 1, countSampledSpans(exp.GetSpans())) - - srvRoot.Finish() - testutil.Equals(t, 2, len(exp.GetSpans())) - testutil.Equals(t, 2, countSampledSpans(exp.GetSpans())) - - clientRoot.Finish() - testutil.Equals(t, 3, len(exp.GetSpans())) - testutil.Equals(t, 3, countSampledSpans(exp.GetSpans())) -} - -func countSampledSpans(ss tracetest.SpanStubs) int { - var count int - for _, s := range ss { - if s.SpanContext.IsSampled() { - count++ - } - } - return count + tracing.ContextTracing_ForceTracing(t, exp, clientRoot, srvRoot, srvChild) } diff --git a/pkg/tracing/jaeger/config_yaml.go b/pkg/tracing/jaeger/config_yaml.go index d999ec81fa..71009070b3 100644 --- a/pkg/tracing/jaeger/config_yaml.go +++ b/pkg/tracing/jaeger/config_yaml.go @@ -4,165 +4,194 @@ package jaeger import ( - "net" - "net/url" + "log" + "math" "os" "strconv" "strings" "time" - "github.com/opentracing/opentracing-go" - "github.com/pkg/errors" - "github.com/uber/jaeger-client-go" - "github.com/uber/jaeger-client-go/config" - "gopkg.in/yaml.v2" + glog "github.com/go-kit/log" + "github.com/go-kit/log/level" + "go.opentelemetry.io/contrib/samplers/jaegerremote" + "go.opentelemetry.io/otel/attribute" + otel_jaeger "go.opentelemetry.io/otel/exporters/jaeger" + tracesdk "go.opentelemetry.io/otel/sdk/trace" ) +type ParentBasedSamplerConfig struct { + LocalParentSampled bool `yaml:"local_parent_sampled"` + RemoteParentSampled bool `yaml:"remote_parent_sampled"` +} + // Config - YAML configuration. For details see to https://github.com/jaegertracing/jaeger-client-go#environment-variables. type Config struct { - ServiceName string `yaml:"service_name"` - Disabled bool `yaml:"disabled"` - RPCMetrics bool `yaml:"rpc_metrics"` - Tags string `yaml:"tags"` - SamplerType string `yaml:"sampler_type"` - SamplerParam float64 `yaml:"sampler_param"` - SamplerManagerHostPort string `yaml:"sampler_manager_host_port"` - SamplerMaxOperations int `yaml:"sampler_max_operations"` - SamplerRefreshInterval time.Duration `yaml:"sampler_refresh_interval"` - ReporterMaxQueueSize int `yaml:"reporter_max_queue_size"` - ReporterFlushInterval time.Duration `yaml:"reporter_flush_interval"` - ReporterLogSpans bool `yaml:"reporter_log_spans"` - Endpoint string `yaml:"endpoint"` - User string `yaml:"user"` - Password string `yaml:"password"` - AgentHost string `yaml:"agent_host"` - AgentPort int `yaml:"agent_port"` - Gen128Bit bool `yaml:"traceid_128bit"` + ServiceName string `yaml:"service_name"` + Disabled bool `yaml:"disabled"` + RPCMetrics bool `yaml:"rpc_metrics"` + Tags string `yaml:"tags"` + SamplerType string `yaml:"sampler_type"` + SamplerParam float64 `yaml:"sampler_param"` + SamplerManagerHostPort string `yaml:"sampler_manager_host_port"` + SamplerMaxOperations int `yaml:"sampler_max_operations"` + SamplerRefreshInterval time.Duration `yaml:"sampler_refresh_interval"` + SamplerParentConfig ParentBasedSamplerConfig `yaml:"sampler_parent_config"` + SamplingServerURL string `yaml:"sampling_server_url"` + OperationNameLateBinding bool `yaml:"operation_name_late_binding"` + InitialSamplingRate float64 `yaml:"initial_sampler_rate"` + ReporterMaxQueueSize int `yaml:"reporter_max_queue_size"` + ReporterFlushInterval time.Duration `yaml:"reporter_flush_interval"` + ReporterLogSpans bool `yaml:"reporter_log_spans"` + ReporterDisableAttemptReconnecting bool `yaml:"reporter_disable_attempt_reconnecting"` + ReporterAttemptReconnectInterval time.Duration `yaml:"reporter_attempt_reconnect_interval"` + Endpoint string `yaml:"endpoint"` + User string `yaml:"user"` + Password string `yaml:"password"` + AgentHost string `yaml:"agent_host"` + AgentPort int `yaml:"agent_port"` + Gen128Bit bool `yaml:"traceid_128bit"` + // Remove the above field. Ref: https://github.com/open-telemetry/opentelemetry-specification/issues/525#issuecomment-605519217 + // Ref: https://opentelemetry.io/docs/reference/specification/trace/api/#spancontext } -// ParseConfigFromYaml uses config YAML to set the tracer's Configuration. -func ParseConfigFromYaml(cfg []byte) (*config.Configuration, error) { - conf := &Config{} - - if err := yaml.Unmarshal(cfg, &conf); err != nil { - return nil, err +// getCollectorEndpoints returns Jaeger options populated with collector related options. +func getCollectorEndpoints(config Config) []otel_jaeger.CollectorEndpointOption { + var collectorOptions []otel_jaeger.CollectorEndpointOption + if config.User != "" { + collectorOptions = append(collectorOptions, otel_jaeger.WithUsername(config.User)) } - - c := &config.Configuration{} - - if conf.ServiceName != "" { - c.ServiceName = conf.ServiceName + if config.Password != "" { + collectorOptions = append(collectorOptions, otel_jaeger.WithPassword(config.Password)) } + collectorOptions = append(collectorOptions, otel_jaeger.WithEndpoint(config.Endpoint)) - if conf.RPCMetrics { - c.RPCMetrics = conf.RPCMetrics - } - - if conf.Gen128Bit { - c.Gen128Bit = conf.Gen128Bit - } - - if conf.Disabled { - c.Disabled = conf.Disabled - } + return collectorOptions +} - if conf.Tags != "" { - c.Tags = parseTags(conf.Tags) +// getAgentEndpointOptions returns Jaeger options populated with agent related options. +func getAgentEndpointOptions(config Config) []otel_jaeger.AgentEndpointOption { + var endpointOptions []otel_jaeger.AgentEndpointOption + endpointOptions = append(endpointOptions, otel_jaeger.WithAgentHost(config.AgentHost)) + endpointOptions = append(endpointOptions, otel_jaeger.WithAgentPort(strconv.Itoa(config.AgentPort))) + + // This option, as part of the Jaeger config, was JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED. + if config.ReporterDisableAttemptReconnecting { + endpointOptions = append(endpointOptions, otel_jaeger.WithDisableAttemptReconnecting()) + if config.ReporterAttemptReconnectInterval != 0 { + endpointOptions = append(endpointOptions, otel_jaeger.WithAttemptReconnectingInterval(config.ReporterAttemptReconnectInterval)) + } } - c.Sampler = samplerConfigFromConfig(*conf) - - if r, err := reporterConfigFromConfig(*conf); err == nil { - c.Reporter = r - } else { - return nil, errors.Wrap(err, "cannot obtain reporter config from YAML") + if config.ReporterLogSpans { + var logger *log.Logger + endpointOptions = append(endpointOptions, otel_jaeger.WithLogger(logger)) } - return c, nil + return endpointOptions } -// samplerConfigFromConfig creates a new SamplerConfig based on the YAML Config. -func samplerConfigFromConfig(cfg Config) *config.SamplerConfig { - sc := &config.SamplerConfig{} - - if cfg.SamplerType != "" { - sc.Type = cfg.SamplerType - } +// getSamplingFraction returns the sampling fraction based on the sampler type. +// Ref: https://www.jaegertracing.io/docs/1.35/sampling/#client-sampling-configuration +func getSamplingFraction(samplerType string, samplingFactor float64) float64 { + switch samplerType { + case "const": + if samplingFactor > 1 { + return 1.0 + } else if samplingFactor < 0 { + return 0.0 + } + return math.Round(samplingFactor) - if cfg.SamplerParam != 0 { - sc.Param = cfg.SamplerParam - } + case "probabilistic": + return samplingFactor - if cfg.SamplerManagerHostPort != "" { - sc.SamplingServerURL = cfg.SamplerManagerHostPort + case "ratelimiting": + return math.Round(samplingFactor) // Needs to be an integer } - if cfg.SamplerMaxOperations != 0 { - sc.MaxOperations = cfg.SamplerMaxOperations - } + return samplingFactor +} - if cfg.SamplerRefreshInterval != 0 { - sc.SamplingRefreshInterval = cfg.SamplerRefreshInterval +func getSampler(config Config) tracesdk.Sampler { + samplerType := config.SamplerType + samplingFraction := getSamplingFraction(samplerType, config.SamplerParam) + + var sampler tracesdk.Sampler + switch samplerType { + case "probabilistic": + sampler = tracesdk.ParentBased(tracesdk.TraceIDRatioBased(samplingFraction)) + case "const": + if samplingFraction == 1.0 { + sampler = tracesdk.AlwaysSample() + } else { + sampler = tracesdk.NeverSample() + } + case "remote": + remoteOptions := getRemoteOptions(config) + sampler = jaegerremote.New(config.ServiceName, remoteOptions...) + case "ratelimiting": + // The same config options are applicable to both remote and rate-limiting samplers. + remoteOptions := getRemoteOptions(config) + sampler = jaegerremote.New(config.ServiceName, remoteOptions...) + sampler, ok := sampler.(*rateLimitingSampler) + if ok { + sampler.Update(config.SamplerParam) + } + default: + var root tracesdk.Sampler + var parentOptions []tracesdk.ParentBasedSamplerOption + if config.SamplerParentConfig.LocalParentSampled { + parentOptions = append(parentOptions, tracesdk.WithLocalParentSampled(root)) + } + if config.SamplerParentConfig.RemoteParentSampled { + parentOptions = append(parentOptions, tracesdk.WithRemoteParentSampled(root)) + } + sampler = tracesdk.ParentBased(root, parentOptions...) } - - return sc + return sampler } -// reporterConfigFromConfig creates a new ReporterConfig based on the YAML Config. -func reporterConfigFromConfig(cfg Config) (*config.ReporterConfig, error) { - rc := &config.ReporterConfig{} - - if cfg.ReporterMaxQueueSize != 0 { - rc.QueueSize = cfg.ReporterMaxQueueSize +func getRemoteOptions(config Config) []jaegerremote.Option { + var remoteOptions []jaegerremote.Option + if config.SamplerRefreshInterval != 0 { + remoteOptions = append(remoteOptions, jaegerremote.WithSamplingRefreshInterval(config.SamplerRefreshInterval)) } - - if cfg.ReporterFlushInterval != 0 { - rc.BufferFlushInterval = cfg.ReporterFlushInterval + if config.SamplingServerURL != "" { + remoteOptions = append(remoteOptions, jaegerremote.WithSamplingServerURL(config.SamplingServerURL)) } - - if cfg.ReporterLogSpans { - rc.LogSpans = cfg.ReporterLogSpans + if config.SamplerMaxOperations != 0 { + remoteOptions = append(remoteOptions, jaegerremote.WithMaxOperations(config.SamplerMaxOperations)) } - - if cfg.Endpoint != "" { - u, err := url.ParseRequestURI(cfg.Endpoint) - if err != nil { - return nil, errors.Wrapf(err, "cannot parse endpoint=%s", cfg.Endpoint) - } - rc.CollectorEndpoint = u.String() - user := cfg.User - pswd := cfg.Password - if user != "" && pswd == "" || user == "" && pswd != "" { - return nil, errors.Errorf("you must set %s and %s parameters together", cfg.User, cfg.Password) - } - rc.User = user - rc.Password = pswd - } else { - host := jaeger.DefaultUDPSpanServerHost - if cfg.AgentHost != "" { - host = cfg.AgentHost - } - - port := jaeger.DefaultUDPSpanServerPort - if cfg.AgentPort != 0 { - port = cfg.AgentPort - } - rc.LocalAgentHostPort = net.JoinHostPort(host, strconv.Itoa(port)) + if config.OperationNameLateBinding { + remoteOptions = append(remoteOptions, jaegerremote.WithOperationNameLateBinding(true)) + } + // SamplerRefreshInterval is the interval for polling the backend for sampling strategies. + // Ref: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/sdk-environment-variables.md#general-sdk-configuration. + if config.SamplerRefreshInterval != 0 { + remoteOptions = append(remoteOptions, jaegerremote.WithSamplingRefreshInterval(config.SamplerRefreshInterval)) + } + // InitialSamplingRate is the sampling probability when the backend is unreachable. + if config.InitialSamplingRate != 0.0 { + remoteOptions = append(remoteOptions, jaegerremote.WithInitialSampler(tracesdk.TraceIDRatioBased(config.InitialSamplingRate))) } - return rc, nil + return remoteOptions } -// parseTags parses the given string into a collection of Tags. +// parseTags parses the given string into a collection of attributes. // Spec for this value: // - comma separated list of key=value // - value can be specified using the notation ${envVar:defaultValue}, where `envVar` // is an environment variable and `defaultValue` is the value to use in case the env var is not set. -func parseTags(sTags string) []opentracing.Tag { +// TODO(aditi): when Lighstep and Elastic APM have been migrated, move 'parseTags()' to the common 'tracing' package. +func parseTags(sTags string) []attribute.KeyValue { pairs := strings.Split(sTags, ",") - tags := make([]opentracing.Tag, 0) + tags := make([]attribute.KeyValue, 0) for _, p := range pairs { kv := strings.SplitN(p, "=", 2) + if len(kv) < 2 { + continue // to avoid panic + } k, v := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1]) if strings.HasPrefix(v, "${") && strings.HasSuffix(v, "}") { @@ -174,9 +203,24 @@ func parseTags(sTags string) []opentracing.Tag { } } - tag := opentracing.Tag{Key: k, Value: v} + tag := attribute.String(k, v) tags = append(tags, tag) } return tags } + +// printDeprecationWarnings logs deprecation warnings for config options that are no +// longer supported. +func printDeprecationWarnings(config Config, l glog.Logger) { + commonDeprecationMessage := " has been deprecated as a config option." + if config.RPCMetrics { + level.Info(l).Log("msg", "RPC Metrics"+commonDeprecationMessage) + } + if config.Gen128Bit { + level.Info(l).Log("msg", "Gen128Bit"+commonDeprecationMessage) + } + if config.Disabled { + level.Info(l).Log("msg", "Disabled"+commonDeprecationMessage) + } +} diff --git a/pkg/tracing/jaeger/jaeger.go b/pkg/tracing/jaeger/jaeger.go index 7a33d7d8a7..17b7314704 100644 --- a/pkg/tracing/jaeger/jaeger.go +++ b/pkg/tracing/jaeger/jaeger.go @@ -5,65 +5,102 @@ package jaeger import ( "context" - "fmt" - "io" - "strings" - "github.com/thanos-io/thanos/pkg/tracing" + "github.com/thanos-io/thanos/pkg/tracing/migration" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/opentracing/opentracing-go" - "github.com/prometheus/client_golang/prometheus" - "github.com/uber/jaeger-client-go" - "github.com/uber/jaeger-client-go/config" - jaeger_prometheus "github.com/uber/jaeger-lib/metrics/prometheus" + "go.opentelemetry.io/otel/attribute" + otel_jaeger "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.7.0" + "gopkg.in/yaml.v2" ) -// Tracer extends opentracing.Tracer. -type Tracer struct { - opentracing.Tracer -} - -// GetTraceIDFromSpanContext return TraceID from span.Context. -func (t *Tracer) GetTraceIDFromSpanContext(ctx opentracing.SpanContext) (string, bool) { - if c, ok := ctx.(jaeger.SpanContext); ok { - return fmt.Sprintf("%016x", c.TraceID().Low), true +// NewTracerProvider returns a new instance of an OpenTelemetry tracer provider. +func NewTracerProvider(ctx context.Context, logger log.Logger, conf []byte) (*tracesdk.TracerProvider, error) { + config := Config{} + if err := yaml.Unmarshal(conf, &config); err != nil { + return nil, err } - return "", false -} -// NewTracer create tracer from YAML. -func NewTracer(ctx context.Context, logger log.Logger, metrics *prometheus.Registry, conf []byte) (opentracing.Tracer, io.Closer, error) { - var ( - cfg *config.Configuration - err error - jaegerTracer opentracing.Tracer - closer io.Closer - ) - if conf != nil { - level.Info(logger).Log("msg", "loading Jaeger tracing configuration from YAML") - cfg, err = ParseConfigFromYaml(conf) + printDeprecationWarnings(config, logger) + + var exporter *otel_jaeger.Exporter + var err error + + if config.Endpoint != "" { + collectorOptions := getCollectorEndpoints(config) + + exporter, err = otel_jaeger.New(otel_jaeger.WithCollectorEndpoint(collectorOptions...)) + if err != nil { + return nil, err + } + } else if config.AgentHost != "" && config.AgentPort != 0 { + jaegerAgentEndpointOptions := getAgentEndpointOptions(config) + + exporter, err = otel_jaeger.New(otel_jaeger.WithAgentEndpoint(jaegerAgentEndpointOptions...)) + if err != nil { + return nil, err + } } else { - level.Info(logger).Log("msg", "loading Jaeger tracing configuration from ENV") - cfg, err = config.FromEnv() + exporter, err = otel_jaeger.New(otel_jaeger.WithAgentEndpoint()) + if err != nil { + return nil, err + } } - if err != nil { - return nil, nil, err + + var tags []attribute.KeyValue + if config.Tags != "" { + tags = getAttributesFromTags(config) + } + + sampler := getSampler(config) + var processorOptions []tracesdk.BatchSpanProcessorOption + var processor tracesdk.SpanProcessor + if config.ReporterMaxQueueSize != 0 { + processorOptions = append(processorOptions, tracesdk.WithMaxQueueSize(config.ReporterMaxQueueSize)) } - cfg.Headers = &jaeger.HeadersConfig{ - JaegerDebugHeader: strings.ToLower(tracing.ForceTracingBaggageKey), + //Ref: https://epsagon.com/observability/opentelemetry-best-practices-overview-part-2-2/ . + if config.ReporterFlushInterval != 0 { + processorOptions = append(processorOptions, tracesdk.WithBatchTimeout(config.ReporterFlushInterval)) } - cfg.Headers.ApplyDefaults() - jaegerTracer, closer, err = cfg.NewTracer( - config.Metrics(jaeger_prometheus.New(jaeger_prometheus.WithRegisterer(metrics))), - config.Logger(&jaegerLogger{ - logger: logger, - }), + + processor = tracesdk.NewBatchSpanProcessor(exporter, processorOptions...) + + tp := newTraceProvider(ctx, config.ServiceName, logger, processor, sampler, tags) + + return tp, nil +} + +// getAttributesFromTags returns tags as OTel attributes. +func getAttributesFromTags(config Config) []attribute.KeyValue { + return parseTags(config.Tags) +} + +func newTraceProvider(ctx context.Context, serviceName string, logger log.Logger, processor tracesdk.SpanProcessor, + sampler tracesdk.Sampler, tags []attribute.KeyValue) *tracesdk.TracerProvider { + + resource, err := resource.New( + ctx, + resource.WithAttributes(semconv.ServiceNameKey.String(serviceName)), + resource.WithAttributes(tags...), ) - t := &Tracer{ - jaegerTracer, + if err != nil { + level.Warn(logger).Log("msg", "jaeger: detecting resources for tracing provider failed", "err", err) } - return t, closer, err + + tp := tracesdk.NewTracerProvider( + tracesdk.WithSpanProcessor(processor), + tracesdk.WithSampler( + migration.SamplerWithOverride( + sampler, migration.ForceTracingAttributeKey, + ), + ), + tracesdk.WithResource(resource), + ) + + return tp } diff --git a/pkg/tracing/jaeger/jaeger_test.go b/pkg/tracing/jaeger/jaeger_test.go new file mode 100644 index 0000000000..bc220bed0e --- /dev/null +++ b/pkg/tracing/jaeger/jaeger_test.go @@ -0,0 +1,230 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package jaeger + +import ( + "context" + "os" + "strings" + "testing" + + "github.com/opentracing/opentracing-go" + "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/pkg/tracing" + "github.com/thanos-io/thanos/pkg/tracing/migration" + + "github.com/go-kit/log" + "go.opentelemetry.io/otel/attribute" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +var parentConfig = ParentBasedSamplerConfig{LocalParentSampled: true} + +// This test shows that if sample factor will enable tracing on client process, even when it would be disabled on server +// it will be still enabled for all spans within this span. +func TestContextTracing_ClientEnablesTracing(t *testing.T) { + exp := tracetest.NewInMemoryExporter() + config := Config{ + SamplerType: "probabilistic", + SamplerParam: 1.0, + SamplerParentConfig: parentConfig, + } + sampler := getSampler(config) + + tracerOtel := newTraceProvider( + context.Background(), + "tracerOtel", + log.NewNopLogger(), + tracesdk.NewSimpleSpanProcessor(exp), + sampler, + []attribute.KeyValue{}, + ) + tracer, _ := migration.Bridge(tracerOtel, log.NewNopLogger()) + clientRoot, clientCtx := tracing.StartSpan(tracing.ContextWithTracer(context.Background(), tracer), "a") + + config.SamplerParam = 0.0 + sampler2 := getSampler(config) + // Simulate Server process with different tracer, but with client span in context. + srvTracerOtel := newTraceProvider( + context.Background(), + "srvTracerOtel", + log.NewNopLogger(), + tracesdk.NewSimpleSpanProcessor(exp), + sampler2, // never sample + []attribute.KeyValue{}, + ) + srvTracer, _ := migration.Bridge(srvTracerOtel, log.NewNopLogger()) + + srvRoot, srvCtx := tracing.StartSpan(tracing.ContextWithTracer(clientCtx, srvTracer), "b") + srvChild, _ := tracing.StartSpan(srvCtx, "bb") + + tracing.CountSpans_ClientEnablesTracing(t, exp, clientRoot, srvRoot, srvChild) +} + +// This test shows that if sample factor will disable tracing on client process, when it would be enabled on server +// it will be still disabled for all spans within this span. +func TestContextTracing_ClientDisablesTracing(t *testing.T) { + exp := tracetest.NewInMemoryExporter() + + config := Config{ + SamplerType: "probabilistic", + SamplerParam: 0.0, + SamplerParentConfig: parentConfig, + } + sampler := getSampler(config) + tracerOtel := newTraceProvider( + context.Background(), + "tracerOtel", + log.NewNopLogger(), + tracesdk.NewSimpleSpanProcessor(exp), + sampler, // never sample + []attribute.KeyValue{}, + ) + tracer, _ := migration.Bridge(tracerOtel, log.NewNopLogger()) + + clientRoot, clientCtx := tracing.StartSpan(tracing.ContextWithTracer(context.Background(), tracer), "a") + + config.SamplerParam = 1.0 + sampler2 := getSampler(config) + // Simulate Server process with different tracer, but with client span in context. + srvTracerOtel := newTraceProvider( + context.Background(), + "srvTracerOtel", + log.NewNopLogger(), + tracesdk.NewSimpleSpanProcessor(exp), + sampler2, // never sample + []attribute.KeyValue{}, + ) + srvTracer, _ := migration.Bridge(srvTracerOtel, log.NewNopLogger()) + + srvRoot, srvCtx := tracing.StartSpan(tracing.ContextWithTracer(clientCtx, srvTracer), "b") + srvChild, _ := tracing.StartSpan(srvCtx, "bb") + + tracing.ContextTracing_ClientDisablesTracing(t, exp, clientRoot, srvRoot, srvChild) +} + +// This test shows that if span will contain special baggage (for example from special HTTP header), even when sample +// factor will disable client & server tracing, it will be still enabled for all spans within this span. +func TestContextTracing_ForceTracing(t *testing.T) { + exp := tracetest.NewInMemoryExporter() + config := Config{ + SamplerType: "probabilistic", + SamplerParam: 0.0, + SamplerParentConfig: parentConfig, + } + sampler := getSampler(config) + tracerOtel := newTraceProvider( + context.Background(), + "tracerOtel", + log.NewNopLogger(), + tracesdk.NewSimpleSpanProcessor(exp), + sampler, + []attribute.KeyValue{}, + ) + tracer, _ := migration.Bridge(tracerOtel, log.NewNopLogger()) + + // Start the root span with the tag to force tracing. + clientRoot, clientCtx := tracing.StartSpan( + tracing.ContextWithTracer(context.Background(), tracer), + "a", + opentracing.Tag{Key: migration.ForceTracingAttributeKey, Value: "true"}, + ) + + // Simulate Server process with different tracer, but with client span in context. + srvTracerOtel := newTraceProvider( + context.Background(), + "srvTracerOtel", + log.NewNopLogger(), + tracesdk.NewSimpleSpanProcessor(exp), + sampler, + []attribute.KeyValue{}, + ) + srvTracer, _ := migration.Bridge(srvTracerOtel, log.NewNopLogger()) + + srvRoot, srvCtx := tracing.StartSpan(tracing.ContextWithTracer(clientCtx, srvTracer), "b") + srvChild, _ := tracing.StartSpan(srvCtx, "bb") + + tracing.ContextTracing_ForceTracing(t, exp, clientRoot, srvRoot, srvChild) +} + +func TestParseTags(t *testing.T) { + for _, tcase := range []struct { + input string + expected []attribute.KeyValue + }{ + { + input: "key=value", + expected: []attribute.KeyValue{attribute.String("key", "value")}, + }, + { + input: "key1=value1,key2=value2", + expected: []attribute.KeyValue{attribute.String("key1", "value1"), + attribute.String("key2", "value2")}, + }, + { + input: "", + expected: []attribute.KeyValue{}, + }, + { + // Incorrectly formatted string with leading comma still yields the right tags. + input: ",key=value", + expected: []attribute.KeyValue{attribute.String("key", "value")}, + }, + { + // Incorrectly formatted string with trailing comma still yields the right tags. + input: "key=value,", + expected: []attribute.KeyValue{attribute.String("key", "value")}, + }, + { + // Leading and trailing spaces in tags are trimmed. + input: " key=value ", + expected: []attribute.KeyValue{attribute.String("key", "value")}, + }, + { + input: "key=${env:default_val}", + expected: []attribute.KeyValue{attribute.String("key", "default_val")}, + }, + } { + if ok := t.Run("", func(t *testing.T) { + exists := false + envVal := "" + envVar := "" + // Check if env vars are used. + if strings.Contains(tcase.input, "${") { + envVal, envVar, exists = extractValueOfEnvVar(tcase.input) + // Set a temporary value just for testing. + tempEnvVal := "temp_val" + os.Setenv(envVar, tempEnvVal) + tcase.expected = []attribute.KeyValue{attribute.String("key", tempEnvVal)} + } + attrs := parseTags(tcase.input) + testutil.Equals(t, tcase.expected, attrs) + + // Reset the env var to the old value, if needed. + if exists { + os.Setenv(envVar, envVal) + } + }); !ok { + return + } + } +} + +func extractValueOfEnvVar(input string) (string, string, bool) { + kv := strings.SplitN(input, "=", 2) + _, v := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1]) + + if strings.HasPrefix(v, "${") && strings.HasSuffix(v, "}") { + ed := strings.SplitN(v[2:len(v)-1], ":", 2) + e, d := ed[0], ed[1] + envVal, exists := os.LookupEnv(e) + if !exists { + return d, e, exists + } + return envVal, e, exists + } + + return "", "", false +} diff --git a/pkg/tracing/jaeger/logger.go b/pkg/tracing/jaeger/logger.go deleted file mode 100644 index 35a7147c2a..0000000000 --- a/pkg/tracing/jaeger/logger.go +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package jaeger - -import ( - "fmt" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" -) - -type jaegerLogger struct { - logger log.Logger -} - -func (l *jaegerLogger) Infof(format string, args ...interface{}) { - level.Info(l.logger).Log("msg", fmt.Sprintf(format, args...)) -} - -func (l *jaegerLogger) Error(msg string) { - level.Error(l.logger).Log("msg", msg) -} diff --git a/pkg/tracing/jaeger/remote.go b/pkg/tracing/jaeger/remote.go new file mode 100644 index 0000000000..9c55ccf9a8 --- /dev/null +++ b/pkg/tracing/jaeger/remote.go @@ -0,0 +1,124 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// This file contains utility functions required to implement a rate limiting sampler. +// Since these are part of the 'internal' package folder in the OpenTelemetry-go repo, I have added +// these functions here to be used by the parent 'tracing/jaeger' package. +// Ref: https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/samplers/jaegerremote/internal/utils/rate_limiter.go + +package jaeger + +import ( + "math" + "sync" + "time" + + tracesdk "go.opentelemetry.io/otel/sdk/trace" + oteltrace "go.opentelemetry.io/otel/trace" +) + +type RateLimiter struct { + lock sync.Mutex + + creditsPerSecond float64 + balance float64 + maxBalance float64 + lastTick time.Time + + timeNow func() time.Time +} + +type rateLimitingSampler struct { + rateLimiter *RateLimiter + maxTracesPerSecond float64 +} + +// NewRateLimiter creates a new RateLimiter. +func NewRateLimiter(creditsPerSecond, maxBalance float64) *RateLimiter { + return &RateLimiter{ + creditsPerSecond: creditsPerSecond, + balance: maxBalance, + maxBalance: maxBalance, + lastTick: time.Now(), + timeNow: time.Now, + } +} + +// CheckCredit tries to reduce the current balance by itemCost provided that the current balance +// is not lest than itemCost. +func (rl *RateLimiter) CheckCredit(itemCost float64) bool { + rl.lock.Lock() + defer rl.lock.Unlock() + + // if we have enough credits to pay for current item, then reduce balance and allow + if rl.balance >= itemCost { + rl.balance -= itemCost + return true + } + // otherwise check if balance can be increased due to time elapsed, and try again + rl.updateBalance() + if rl.balance >= itemCost { + rl.balance -= itemCost + return true + } + return false +} + +// updateBalance recalculates current balance based on time elapsed. Must be called while holding a lock. +func (rl *RateLimiter) updateBalance() { + // calculate how much time passed since the last tick, and update current tick + currentTime := rl.timeNow() + elapsedTime := currentTime.Sub(rl.lastTick) + rl.lastTick = currentTime + // calculate how much credit have we accumulated since the last tick + rl.balance += elapsedTime.Seconds() * rl.creditsPerSecond + if rl.balance > rl.maxBalance { + rl.balance = rl.maxBalance + } +} + +// Update changes the main parameters of the rate limiter in-place, while retaining +// the current accumulated balance (pro-rated to the new maxBalance value). Using this method +// instead of creating a new rate limiter helps to avoid thundering herd when sampling +// strategies are updated. +func (rl *RateLimiter) Update(creditsPerSecond, maxBalance float64) { + rl.lock.Lock() + defer rl.lock.Unlock() + + rl.updateBalance() // get up to date balance + rl.balance = rl.balance * maxBalance / rl.maxBalance + rl.creditsPerSecond = creditsPerSecond + rl.maxBalance = maxBalance +} + +func (r *rateLimitingSampler) Description() string { + return "rateLimitingSampler{}" +} + +func (r *rateLimitingSampler) ShouldSample(p tracesdk.SamplingParameters) tracesdk.SamplingResult { + psc := oteltrace.SpanContextFromContext(p.ParentContext) + if r.rateLimiter.CheckCredit(1.0) { + return tracesdk.SamplingResult{ + Decision: tracesdk.RecordAndSample, + Tracestate: psc.TraceState(), + } + } + return tracesdk.SamplingResult{ + Decision: tracesdk.Drop, + Tracestate: psc.TraceState(), + } +} + +func (r *rateLimitingSampler) init(rateLimit float64) { + if r.rateLimiter == nil { + r.rateLimiter = NewRateLimiter(rateLimit, math.Max(rateLimit, 1.0)) + } else { + r.rateLimiter.Update(rateLimit, math.Max(rateLimit, 1.0)) + } +} + +func (r *rateLimitingSampler) Update(maxTracesPerSecond float64) { + if r.maxTracesPerSecond != maxTracesPerSecond { + r.init(maxTracesPerSecond) + } +} diff --git a/pkg/tracing/migration/bridge.go b/pkg/tracing/migration/bridge.go index 4ace2e85c5..626f004c62 100644 --- a/pkg/tracing/migration/bridge.go +++ b/pkg/tracing/migration/bridge.go @@ -11,10 +11,9 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/opentracing/opentracing-go" - ot_propagator "go.opentelemetry.io/contrib/propagators/ot" + "go.opentelemetry.io/contrib/propagators/autoprop" "go.opentelemetry.io/otel" bridge "go.opentelemetry.io/otel/bridge/opentracing" - "go.opentelemetry.io/otel/propagation" tracesdk "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" ) @@ -27,18 +26,17 @@ import ( // NOTE: After instrumentation migration is finished, this bridge should be // removed. func Bridge(tp *tracesdk.TracerProvider, l log.Logger) (opentracing.Tracer, io.Closer) { - compositePropagator := propagation.NewCompositeTextMapPropagator(ot_propagator.OT{}, propagation.TraceContext{}, propagation.Baggage{}) otel.SetErrorHandler(otelErrHandler(func(err error) { level.Error(l).Log("msg", "OpenTelemetry ErrorHandler", "err", err) })) - otel.SetTextMapPropagator(compositePropagator) + otel.SetTextMapPropagator(autoprop.NewTextMapPropagator()) otel.SetTracerProvider(tp) bridgeTracer, _ := bridge.NewTracerPair(tp.Tracer("")) bridgeTracer.SetWarningHandler(func(warn string) { level.Warn(l).Log("msg", "OpenTelemetry BridgeWarningHandler", "warn", warn) }) - bridgeTracer.SetTextMapPropagator(propagation.TraceContext{}) + bridgeTracer.SetTextMapPropagator(autoprop.NewTextMapPropagator()) tpShutdownFunc := func() error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) diff --git a/pkg/tracing/otlp/config_yaml.go b/pkg/tracing/otlp/config_yaml.go new file mode 100644 index 0000000000..ed471612f0 --- /dev/null +++ b/pkg/tracing/otlp/config_yaml.go @@ -0,0 +1,143 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package otlp + +import ( + "time" + + "github.com/thanos-io/thanos/pkg/exthttp" + + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" +) + +type retryConfig struct { + RetryEnabled bool `yaml:"retry_enabled"` + RetryInitialInterval time.Duration `yaml:"retry_initial_interval"` + RetryMaxInterval time.Duration `yaml:"retry_max_interval"` + RetryMaxElapsedTime time.Duration `yaml:"retry_max_elapsed_time"` +} + +type Config struct { + ClientType string `yaml:"client_type"` + ReconnectionPeriod time.Duration `yaml:"reconnection_period"` + Compression string `yaml:"compression"` + Insecure bool `yaml:"insecure"` + Endpoint string `yaml:"endpoint"` + URLPath string `yaml:"url_path"` + Timeout time.Duration `yaml:"timeout"` + RetryConfig retryConfig `yaml:"retry_config"` + Headers map[string]string `yaml:"headers"` + TLSConfig exthttp.TLSConfig `yaml:"tls_config"` +} + +func traceGRPCOptions(config Config) []otlptracegrpc.Option { + var options []otlptracegrpc.Option + if config.Endpoint != "" { + options = append(options, otlptracegrpc.WithEndpoint(config.Endpoint)) + } + + if config.Insecure { + options = append(options, otlptracegrpc.WithInsecure()) + } + + if config.ReconnectionPeriod != 0 { + options = append(options, otlptracegrpc.WithReconnectionPeriod(config.ReconnectionPeriod)) + } + + if config.Timeout != 0 { + options = append(options, otlptracegrpc.WithTimeout(config.Timeout)) + } + + if config.Compression != "" { + if config.Compression == "gzip" { + options = append(options, otlptracegrpc.WithCompressor(config.Compression)) + } + } + + if config.RetryConfig.RetryEnabled { + options = append(options, otlptracegrpc.WithRetry(createGRPCRetryConfig(config))) + } + + if config.Headers != nil { + options = append(options, otlptracegrpc.WithHeaders(config.Headers)) + } + + return options +} + +func traceHTTPOptions(config Config) []otlptracehttp.Option { + var options []otlptracehttp.Option + if config.Endpoint != "" { + options = append(options, otlptracehttp.WithEndpoint(config.Endpoint)) + } + + if config.Insecure { + options = append(options, otlptracehttp.WithInsecure()) + } else { + tlsConfig, _ := exthttp.NewTLSConfig(&config.TLSConfig) + options = append(options, otlptracehttp.WithTLSClientConfig(tlsConfig)) + } + + if config.URLPath != "" { + options = append(options, otlptracehttp.WithURLPath(config.URLPath)) + } + + if config.Compression != "" { + if config.Compression == "gzip" { + options = append(options, otlptracehttp.WithCompression(otlptracehttp.GzipCompression)) + } + } + + if config.Timeout != 0 { + options = append(options, otlptracehttp.WithTimeout(config.Timeout)) + } + + if config.RetryConfig.RetryEnabled { + options = append(options, otlptracehttp.WithRetry(createHTTPRetryConfig(config))) + } + + if config.Headers != nil { + options = append(options, otlptracehttp.WithHeaders(config.Headers)) + } + // how to specify JSON/binary format here? + + return options +} + +func createHTTPRetryConfig(config Config) otlptracehttp.RetryConfig { + + var retryConfig otlptracehttp.RetryConfig + if config.RetryConfig.RetryInitialInterval != 0 { + retryConfig.InitialInterval = config.RetryConfig.RetryInitialInterval + } + + if config.RetryConfig.RetryMaxInterval != 0 { + retryConfig.MaxInterval = config.RetryConfig.RetryMaxInterval + } + + if config.RetryConfig.RetryMaxElapsedTime != 0 { + retryConfig.MaxElapsedTime = config.RetryConfig.RetryMaxElapsedTime + } + + return retryConfig +} + +func createGRPCRetryConfig(config Config) otlptracegrpc.RetryConfig { + + var retryConfig otlptracegrpc.RetryConfig + if config.RetryConfig.RetryInitialInterval != 0 { + retryConfig.InitialInterval = config.RetryConfig.RetryInitialInterval + } + + if config.RetryConfig.RetryMaxInterval != 0 { + retryConfig.MaxInterval = config.RetryConfig.RetryMaxInterval + } + + if config.RetryConfig.RetryMaxElapsedTime != 0 { + retryConfig.MaxElapsedTime = config.RetryConfig.RetryMaxElapsedTime + } + + return retryConfig +} diff --git a/pkg/tracing/otlp/otlp.go b/pkg/tracing/otlp/otlp.go new file mode 100644 index 0000000000..58c27f2bdd --- /dev/null +++ b/pkg/tracing/otlp/otlp.go @@ -0,0 +1,84 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package otlp + +import ( + "context" + "strings" + + "github.com/thanos-io/thanos/pkg/tracing/migration" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + _ "google.golang.org/grpc/encoding/gzip" + "gopkg.in/yaml.v2" +) + +const ( + TracingClientGRPC string = "grpc" + TracingClientHTTP string = "http" +) + +// NewOTELTracer returns an OTLP exporter based tracer. +func NewTracerProvider(ctx context.Context, logger log.Logger, conf []byte) (*tracesdk.TracerProvider, error) { + config := Config{} + if err := yaml.Unmarshal(conf, &config); err != nil { + return nil, err + } + + var exporter *otlptrace.Exporter + var err error + switch strings.ToLower(config.ClientType) { + case TracingClientHTTP: + options := traceHTTPOptions(config) + + client := otlptracehttp.NewClient(options...) + exporter, err = otlptrace.New(ctx, client) + if err != nil { + return nil, err + } + + case TracingClientGRPC: + options := traceGRPCOptions(config) + client := otlptracegrpc.NewClient(options...) + exporter, err = otlptrace.New(ctx, client) + if err != nil { + return nil, err + } + + default: + return nil, errors.New("otlp: invalid client type. Only 'http' and 'grpc' are accepted. ") + } + + processor := tracesdk.NewBatchSpanProcessor(exporter) + tp := newTraceProvider(ctx, processor, logger) + + return tp, nil +} + +func newTraceProvider(ctx context.Context, processor tracesdk.SpanProcessor, logger log.Logger) *tracesdk.TracerProvider { + resource, err := resource.New(ctx) + if err != nil { + level.Warn(logger).Log("msg", "jaeger: detecting resources for tracing provider failed", "err", err) + } + + sampler := tracesdk.ParentBased(tracesdk.TraceIDRatioBased(1.0)) + + tp := tracesdk.NewTracerProvider( + tracesdk.WithSpanProcessor(processor), + tracesdk.WithResource(resource), + tracesdk.WithSampler( + migration.SamplerWithOverride( + sampler, migration.ForceTracingAttributeKey, + ), + ), + ) + return tp +} diff --git a/pkg/tracing/otlp/otlp_test.go b/pkg/tracing/otlp/otlp_test.go new file mode 100644 index 0000000000..c3ee36ddc7 --- /dev/null +++ b/pkg/tracing/otlp/otlp_test.go @@ -0,0 +1,35 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package otlp + +import ( + "context" + "testing" + + "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/pkg/tracing" + "github.com/thanos-io/thanos/pkg/tracing/migration" + + "github.com/go-kit/log" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +// This test creates an OTLP tracer, starts a span and checks whether it is logged in the exporter. +func TestContextTracing_ClientEnablesTracing(t *testing.T) { + exp := tracetest.NewInMemoryExporter() + + tracerOtel := newTraceProvider( + context.Background(), + tracesdk.NewSimpleSpanProcessor(exp), + log.NewNopLogger()) + tracer, _ := migration.Bridge(tracerOtel, log.NewNopLogger()) + clientRoot, _ := tracing.StartSpan(tracing.ContextWithTracer(context.Background(), tracer), "a") + + testutil.Equals(t, 0, len(exp.GetSpans())) + + clientRoot.Finish() + testutil.Equals(t, 1, len(exp.GetSpans())) + testutil.Equals(t, 1, tracing.CountSampledSpans(exp.GetSpans())) +} diff --git a/pkg/tracing/testutil.go b/pkg/tracing/testutil.go new file mode 100644 index 0000000000..5f4c081b54 --- /dev/null +++ b/pkg/tracing/testutil.go @@ -0,0 +1,71 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package tracing + +import ( + "testing" + + "github.com/thanos-io/thanos/pkg/testutil" + + opentracing "github.com/opentracing/opentracing-go" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +func CountSpans_ClientEnablesTracing(t *testing.T, exp *tracetest.InMemoryExporter, clientRoot, srvRoot, srvChild opentracing.Span) { + testutil.Equals(t, 0, len(exp.GetSpans())) + + srvChild.Finish() + testutil.Equals(t, 1, len(exp.GetSpans())) + testutil.Equals(t, 1, CountSampledSpans(exp.GetSpans())) + + srvRoot.Finish() + testutil.Equals(t, 2, len(exp.GetSpans())) + testutil.Equals(t, 2, CountSampledSpans(exp.GetSpans())) + + clientRoot.Finish() + testutil.Equals(t, 3, len(exp.GetSpans())) + testutil.Equals(t, 3, CountSampledSpans(exp.GetSpans())) +} + +func ContextTracing_ClientDisablesTracing(t *testing.T, exp *tracetest.InMemoryExporter, clientRoot, srvRoot, srvChild opentracing.Span) { + testutil.Equals(t, 0, len(exp.GetSpans())) + + // Since we are not recording neither sampling, no spans should show up. + srvChild.Finish() + testutil.Equals(t, 0, len(exp.GetSpans())) + + srvRoot.Finish() + testutil.Equals(t, 0, len(exp.GetSpans())) + + clientRoot.Finish() + testutil.Equals(t, 0, len(exp.GetSpans())) +} + +func ContextTracing_ForceTracing(t *testing.T, exp *tracetest.InMemoryExporter, clientRoot, srvRoot, srvChild opentracing.Span) { + testutil.Equals(t, 0, len(exp.GetSpans())) + + srvChild.Finish() + testutil.Equals(t, 1, len(exp.GetSpans())) + testutil.Equals(t, 1, CountSampledSpans(exp.GetSpans())) + + srvRoot.Finish() + testutil.Equals(t, 2, len(exp.GetSpans())) + testutil.Equals(t, 2, CountSampledSpans(exp.GetSpans())) + + clientRoot.Finish() + testutil.Equals(t, 3, len(exp.GetSpans())) + testutil.Equals(t, 3, CountSampledSpans(exp.GetSpans())) +} + +// Utility function for use with tests in pkg/tracing. +func CountSampledSpans(ss tracetest.SpanStubs) int { + var count int + for _, s := range ss { + if s.SpanContext.IsSampled() { + count++ + } + } + + return count +} diff --git a/scripts/cfggen/main.go b/scripts/cfggen/main.go index 070bb84fbd..d275d29cc4 100644 --- a/scripts/cfggen/main.go +++ b/scripts/cfggen/main.go @@ -39,6 +39,7 @@ import ( "github.com/thanos-io/thanos/pkg/tracing/google_cloud" "github.com/thanos-io/thanos/pkg/tracing/jaeger" "github.com/thanos-io/thanos/pkg/tracing/lightstep" + "github.com/thanos-io/thanos/pkg/tracing/otlp" ) var ( @@ -57,10 +58,11 @@ var ( } tracingConfigs = map[trclient.TracingProvider]interface{}{ - trclient.Jaeger: jaeger.Config{}, - trclient.GoogleCloud: google_cloud.Config{}, - trclient.ElasticAPM: elasticapm.Config{}, - trclient.Lightstep: lightstep.Config{}, + trclient.OpenTelemetryProtocol: otlp.Config{}, + trclient.Jaeger: jaeger.Config{}, + trclient.GoogleCloud: google_cloud.Config{}, + trclient.ElasticAPM: elasticapm.Config{}, + trclient.Lightstep: lightstep.Config{}, } indexCacheConfigs = map[storecache.IndexCacheProvider]interface{}{ storecache.INMEMORY: storecache.InMemoryIndexCacheConfig{}, diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index bddc07afd0..c9f0403dec 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -158,11 +158,13 @@ func NewPrometheusWithSidecarCustomImage(e e2e.Environment, name, promConfig, we if minTime != "" { args["--min-time"] = minTime } - sidecarRunnable := e.Runnable(fmt.Sprintf("sidecar-%s", name)).WithPorts(map[string]int{"http": 8080, "grpc": 9091}).Init(wrapWithDefaults(e2e.StartOptions{ - Image: sidecarImage, - Command: e2e.NewCommand("sidecar", e2e.BuildArgs(args)...), - Readiness: e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200), - })) + sidecarRunnable := e.Runnable(fmt.Sprintf("sidecar-%s", name)). + WithPorts(map[string]int{"http": 8080, "grpc": 9091}). + Init(wrapWithDefaults(e2e.StartOptions{ + Image: sidecarImage, + Command: e2e.NewCommand("sidecar", e2e.BuildArgs(args)...), + Readiness: e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200), + })) sidecar := e2emon.AsInstrumented(sidecarRunnable, "http") return prom, sidecar } @@ -205,6 +207,40 @@ func NewAvalanche(e e2e.Environment, name string, o AvalancheOptions) *e2emon.In })), "http") } +func NewPrometheusWithJaegerTracingSidecarCustomImage(e e2e.Environment, name, promConfig, webConfig, + promImage, minTime, sidecarImage, jaegerConfig string, enableFeatures ...string) ( + *e2emon.InstrumentedRunnable, *e2emon.InstrumentedRunnable) { + prom := NewPrometheus(e, name, promConfig, webConfig, promImage, enableFeatures...) + + args := map[string]string{ + "--debug.name": fmt.Sprintf("sidecar-%v", name), + "--grpc-address": ":9091", + "--grpc-grace-period": "0s", + "--http-address": ":8080", + "--prometheus.url": "http://" + prom.InternalEndpoint("http"), + "--tsdb.path": prom.InternalDir(), + "--log.level": "debug", + "--tracing.config": jaegerConfig, + } + if len(webConfig) > 0 { + args["--prometheus.http-client"] = defaultPromHttpConfig() + } + if minTime != "" { + args["--min-time"] = minTime + } + + sidecarRunnable := e.Runnable(fmt.Sprintf("sidecar-%s", name)). + WithPorts(map[string]int{"http": 8080, "grpc": 9091}). + Init(wrapWithDefaults(e2e.StartOptions{ + Image: sidecarImage, + Command: e2e.NewCommand("sidecar", e2e.BuildArgs(args)...), + Readiness: e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200), + })) + sidecar := e2emon.AsInstrumented(sidecarRunnable, "http") + + return prom, sidecar +} + type QuerierBuilder struct { name string routePrefix string diff --git a/test/e2e/tracing_test.go b/test/e2e/tracing_test.go new file mode 100644 index 0000000000..0788452efc --- /dev/null +++ b/test/e2e/tracing_test.go @@ -0,0 +1,116 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package e2e_test + +import ( + "context" + "fmt" + "io" + "net/http" + "strings" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/tracing/client" + "github.com/thanos-io/thanos/pkg/tracing/jaeger" + "github.com/thanos-io/thanos/test/e2e/e2ethanos" + + "github.com/efficientgo/e2e" + e2emon "github.com/efficientgo/e2e/monitoring" + "github.com/efficientgo/tools/core/pkg/testutil" + "gopkg.in/yaml.v2" +) + +// Test to check if the trace provider works as expected. +func TestJaegerTracing(t *testing.T) { + env, err := e2e.NewDockerEnvironment("e2e-tracing-test") + testutil.Ok(t, err) + t.Cleanup(env.Close) + name := "testing" + newJaegerRunnable := env.Runnable(fmt.Sprintf("jaeger-%s", name)). + WithPorts( + map[string]int{ + "http": 16686, + "http.admin": 14269, + "jaeger.thrift-model.proto": 14250, + "jaeger.thrift": 14268, + }). + Init(e2e.StartOptions{ + Image: "jaegertracing/all-in-one:1.33", + Readiness: e2e.NewHTTPReadinessProbe("http.admin", "/", 200, 200), + }) + newJaeger := e2emon.AsInstrumented(newJaegerRunnable, "http.admin") + testutil.Ok(t, e2e.StartAndWaitReady(newJaeger)) + + jaegerConfig, err := yaml.Marshal(client.TracingConfig{ + Type: client.Jaeger, + Config: jaeger.Config{ + ServiceName: "thanos-sidecar", + SamplerType: "const", + SamplerParam: 1, + Endpoint: "http://" + newJaeger.InternalEndpoint("jaeger.thrift") + "/api/traces", + }, + }) + testutil.Ok(t, err) + + prom1, sidecar1 := e2ethanos.NewPrometheusWithJaegerTracingSidecarCustomImage(env, "alone", e2ethanos.DefaultPromConfig("prom-alone", 0, "", "", e2ethanos.LocalPrometheusTarget), "", + e2ethanos.DefaultPrometheusImage(), "", e2ethanos.DefaultImage(), string(jaegerConfig), "") + testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1)) + + qb := e2ethanos.NewQuerierBuilder(env, "1", sidecar1.InternalEndpoint("grpc")) + q := qb.WithTracingConfig(fmt.Sprintf(`type: JAEGER +config: + sampler_type: const + sampler_param: 1 + service_name: thanos-query + endpoint: %s`, "http://"+newJaeger.InternalEndpoint("jaeger.thrift")+"/api/traces")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + t.Cleanup(cancel) + + queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "job": "myself", + "prometheus": "prom-alone", + "replica": "0", + }, + }) + + url := "http://" + strings.TrimSpace(newJaeger.Endpoint("http")+"/api/traces?service=thanos-query&operation=proxy.series") + request, err := http.NewRequest("GET", url, nil) + testutil.Ok(t, err) + client := &http.Client{} + + testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() error { + response, err := client.Do(request) + if err != nil { + return err + } + + if response.StatusCode != http.StatusOK { + return errors.New("status code not OK") + } + + defer response.Body.Close() + + body, err := io.ReadAll(response.Body) + testutil.Ok(t, err) + + resp := string(body) + if strings.Contains(resp, `"data":[]`) { + return errors.New("no data returned") + } + + testutil.Assert(t, strings.Contains(resp, `"serviceName":"thanos-query"`)) + testutil.Assert(t, strings.Contains(resp, `"serviceName":"thanos-sidecar"`)) + return nil + })) +}