diff --git a/.gitignore b/.gitignore index 86d805d9d28..1fba4de9569 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,5 @@ # Fossa .fossa.yml +# direnv +.envrc diff --git a/common/config/config.go b/common/config/config.go index adb38371f28..28eab689542 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -38,6 +38,7 @@ import ( "go.temporal.io/server/common/masker" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" + "go.temporal.io/server/common/telemetry" ) type ( @@ -64,6 +65,8 @@ type ( DynamicConfigClient *dynamicconfig.FileBasedClientConfig `yaml:"dynamicConfigClient"` // NamespaceDefaults is the default config for every namespace NamespaceDefaults NamespaceDefaults `yaml:"namespaceDefaults"` + // ExporterConfig allows the specification of process-wide OTEL exporters + ExporterConfig telemetry.ExportConfig `yaml:"otel"` } // Service contains the service specific config items diff --git a/common/resource/fx.go b/common/resource/fx.go index 645e59640a3..ffdc778542f 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -32,6 +32,7 @@ import ( "time" "go.uber.org/fx" + "google.golang.org/grpc" "go.temporal.io/api/workflowservice/v1" @@ -62,6 +63,7 @@ import ( "go.temporal.io/server/common/rpc/encryption" "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/searchattribute" + "go.temporal.io/server/common/telemetry" ) type ( @@ -416,7 +418,18 @@ func RPCFactoryProvider( tlsConfigProvider encryption.TLSConfigProvider, dc *dynamicconfig.Collection, clusterMetadata *cluster.Config, + traceInterceptor telemetry.ClientTraceInterceptor, ) common.RPCFactory { svcCfg := cfg.Services[string(svcName)] - return rpc.NewFactory(&svcCfg.RPC, string(svcName), logger, tlsConfigProvider, dc, clusterMetadata) + return rpc.NewFactory( + &svcCfg.RPC, + string(svcName), + logger, + tlsConfigProvider, + dc, + clusterMetadata, + []grpc.UnaryClientInterceptor{ + grpc.UnaryClientInterceptor(traceInterceptor), + }, + ) } diff --git a/common/rpc/grpc.go b/common/rpc/grpc.go index 01cab7890a8..fabdbc5c3c6 100644 --- a/common/rpc/grpc.go +++ b/common/rpc/grpc.go @@ -61,7 +61,7 @@ const ( // The hostName syntax is defined in // https://github.com/grpc/grpc/blob/master/doc/naming.md. // e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. -func Dial(hostName string, tlsConfig *tls.Config, logger log.Logger) (*grpc.ClientConn, error) { +func Dial(hostName string, tlsConfig *tls.Config, logger log.Logger, interceptors ...grpc.UnaryClientInterceptor) (*grpc.ClientConn, error) { // Default to insecure grpcSecureOpt := grpc.WithInsecure() if tlsConfig != nil { @@ -83,9 +83,12 @@ func Dial(hostName string, tlsConfig *tls.Config, logger log.Logger) (*grpc.Clie grpcSecureOpt, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxInternodeRecvPayloadSize)), grpc.WithChainUnaryInterceptor( - versionHeadersInterceptor, - metrics.NewClientMetricsTrailerPropagatorInterceptor(logger), - errorInterceptor, + append( + interceptors, + versionHeadersInterceptor, + metrics.NewClientMetricsTrailerPropagatorInterceptor(logger), + errorInterceptor, + )..., ), grpc.WithDefaultServiceConfig(DefaultServiceConfig), grpc.WithDisableServiceConfig(), diff --git a/common/rpc/rpc.go b/common/rpc/rpc.go index 868bf5e4411..38ac6b677d1 100644 --- a/common/rpc/rpc.go +++ b/common/rpc/rpc.go @@ -50,8 +50,9 @@ type RPCFactory struct { clusterMetadata *cluster.Config sync.Mutex - grpcListener net.Listener - tlsFactory encryption.TLSConfigProvider + grpcListener net.Listener + tlsFactory encryption.TLSConfigProvider + clientInterceptors []grpc.UnaryClientInterceptor } // NewFactory builds a new RPCFactory @@ -63,14 +64,16 @@ func NewFactory( tlsProvider encryption.TLSConfigProvider, dc *dynamicconfig.Collection, clusterMetadata *cluster.Config, + clientInterceptors []grpc.UnaryClientInterceptor, ) *RPCFactory { return &RPCFactory{ - config: cfg, - serviceName: sName, - logger: logger, - dc: dc, - tlsFactory: tlsProvider, - clusterMetadata: clusterMetadata, + config: cfg, + serviceName: sName, + logger: logger, + dc: dc, + tlsFactory: tlsProvider, + clusterMetadata: clusterMetadata, + clientInterceptors: clientInterceptors, } } @@ -225,7 +228,7 @@ func (d *RPCFactory) CreateInternodeGRPCConnection(hostName string) *grpc.Client } func (d *RPCFactory) dial(hostName string, tlsClientConfig *tls.Config) *grpc.ClientConn { - connection, err := Dial(hostName, tlsClientConfig, d.logger) + connection, err := Dial(hostName, tlsClientConfig, d.logger, d.clientInterceptors...) if err != nil { d.logger.Fatal("Failed to create gRPC connection", tag.Error(err)) return nil diff --git a/common/rpc/test/rpc_localstore_tls_test.go b/common/rpc/test/rpc_localstore_tls_test.go index 355a6754987..e913c8d99d3 100644 --- a/common/rpc/test/rpc_localstore_tls_test.go +++ b/common/rpc/test/rpc_localstore_tls_test.go @@ -34,6 +34,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "google.golang.org/grpc" "google.golang.org/grpc/credentials" "go.temporal.io/server/common/config" @@ -50,6 +51,8 @@ const ( frontendServerCertSerialNumber = 150 ) +var noExtraInterceptors = []grpc.UnaryClientInterceptor{} + type localStoreRPCSuite struct { *require.Assertions suite.Suite @@ -128,7 +131,7 @@ func (s *localStoreRPCSuite) SetupSuite() { provider, err := encryption.NewTLSConfigProviderFromConfig(serverCfgInsecure.TLS, metrics.NoopClient, s.logger, nil) s.NoError(err) - insecureFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) + insecureFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors) s.NotNil(insecureFactory) s.insecureRPCFactory = i(insecureFactory) @@ -336,22 +339,22 @@ func (s *localStoreRPCSuite) setupFrontend() { provider, err := encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLS.TLS, metrics.NoopClient, s.logger, nil) s.NoError(err) - frontendMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) + frontendMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors) s.NotNil(frontendMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, metrics.NoopClient, s.logger, nil) s.NoError(err) - frontendServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) + frontendServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors) s.NotNil(frontendServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSSystemWorker.TLS, metrics.NoopClient, s.logger, nil) s.NoError(err) - frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) + frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors) s.NotNil(frontendSystemWorkerMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, metrics.NoopClient, s.logger, nil) s.NoError(err) - frontendMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) + frontendMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors) s.NotNil(frontendMutualTLSRefreshFactory) s.frontendMutualTLSRPCFactory = f(frontendMutualTLSFactory) @@ -365,7 +368,7 @@ func (s *localStoreRPCSuite) setupFrontend() { s.frontendRollingCerts, s.dynamicCACertPool, s.wrongCACertPool) - dynamicServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, s.dynamicConfigProvider, dynamicconfig.NewNoopCollection(), clusterMetadata) + dynamicServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, s.dynamicConfigProvider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors) s.frontendDynamicTLSFactory = f(dynamicServerTLSFactory) s.internodeDynamicTLSFactory = i(dynamicServerTLSFactory) @@ -373,13 +376,13 @@ func (s *localStoreRPCSuite) setupFrontend() { provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreRootCAForceTLS.TLS, metrics.NoopClient, s.logger, nil) s.NoError(err) - frontendRootCAForceTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) + frontendRootCAForceTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors) s.NotNil(frontendServerTLSFactory) s.frontendConfigRootCAForceTLSFactory = f(frontendRootCAForceTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSRemoteCluster.TLS, metrics.NoopClient, s.logger, nil) s.NoError(err) - remoteClusterMutualTLSRPCFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) + remoteClusterMutualTLSRPCFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors) s.NotNil(remoteClusterMutualTLSRPCFactory) s.remoteClusterMutualTLSRPCFactory = r(remoteClusterMutualTLSRPCFactory) } @@ -415,22 +418,22 @@ func (s *localStoreRPCSuite) setupInternode() { provider, err := encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLS.TLS, metrics.NoopClient, s.logger, nil) s.NoError(err) - internodeMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) + internodeMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors) s.NotNil(internodeMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, metrics.NoopClient, s.logger, nil) s.NoError(err) - internodeServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) + internodeServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors) s.NotNil(internodeServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreAltMutualTLS.TLS, metrics.NoopClient, s.logger, nil) s.NoError(err) - internodeMutualAltTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) + internodeMutualAltTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors) s.NotNil(internodeMutualAltTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, metrics.NoopClient, s.logger, nil) s.NoError(err) - internodeMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) + internodeMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors) s.NotNil(internodeMutualTLSRefreshFactory) s.internodeMutualTLSRPCFactory = i(internodeMutualTLSFactory) diff --git a/common/telemetry/config.go b/common/telemetry/config.go new file mode 100644 index 00000000000..36bee9ad1ea --- /dev/null +++ b/common/telemetry/config.go @@ -0,0 +1,427 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package telemetry + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + otelsdkmetricexp "go.opentelemetry.io/otel/sdk/metric/export" + otelsdktrace "go.opentelemetry.io/otel/sdk/trace" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials/insecure" + "gopkg.in/yaml.v3" +) + +const ( + // the following defaults were taken from the grpc docs as of grpc v1.46. + // they are not available programatically + + defaultReadBufferSize = 32 * 1024 + defaultWriteBufferSize = 32 * 1024 + defaultMinConnectTimeout = 10 * time.Second + + // the following defaults were taken from the otel library as of v1.7. + // they are not available programatically + + retryDefaultEnabled = true + retryDefaultInitialInterval = 5 * time.Second + retryDefaultMaxInterval = 30 * time.Second + retryDefaultMaxElapsedTime = 1 * time.Minute +) + +type ( + metadata struct { + Name string + Labels map[string]string + } + + connection struct { + Kind string + Metadata metadata + Spec interface{} `yaml:"-"` + } + + grpcconn struct { + Endpoint string + Block bool + ConnectParams struct { + MinConnectTimeout time.Duration `yaml:"min_connect_timeout"` + Backoff struct { + BaseDelay time.Duration `yaml:"base_delay"` + Multiplier float64 + Jitter float64 + MaxDelay time.Duration `yaml:"max_delay"` + } + } `yaml:"connect_params"` + UserAgent string `yaml:"user_agent"` + ReadBufferSize int `yaml:"read_buffer_size"` + WriteBufferSize int `yaml:"write_buffer_size"` + Authority string + Insecure bool + + cc *grpc.ClientConn + } + + exporter struct { + Kind struct { + Signal string + Model string + Protocol string + } + Metadata metadata + Spec interface{} `yaml:"-"` + } + + otlpGrpcExporter struct { + ConnectionName string `yaml:"connection_name"` + Connection grpcconn + Headers map[string]string + Timeout time.Duration + Retry struct { + Enabled bool + InitialInterval time.Duration `yaml:"initial_interval"` + MaxInterval time.Duration `yaml:"max_interval"` + MaxElapsedTime time.Duration `yaml:"max_elapsed_time"` + } + } + + otlpGrpcSpanExporter struct { + otlpGrpcExporter `yaml:",inline"` + } + otlpGrpcMetricExporter struct { + otlpGrpcExporter `yaml:",inline"` + } + + exportConfig struct { + Connections []connection + Exporters []exporter + } + + // sharedConnSpanExporter and sharedConnMetricExporter exist to wrap a span + // exporter that uses a shared *grpc.ClientConn so that the grpc.Dial call + // doesn't happen until Start() is called. Without this wrapper the + // grpc.ClientConn (which can only be created via grpc.Dial or + // grpc.DialContext) would need to exist at _construction_ time, meaning + // that we would need to dial at construction rather then during the start + // phase. + + sharedConnSpanExporter struct { + baseOpts []otlptracegrpc.Option + dialer interface { + Dial(context.Context) (*grpc.ClientConn, error) + } + startOnce sync.Once + otelsdktrace.SpanExporter + } + + sharedConnMetricExporter struct { + baseOpts []otlpmetricgrpc.Option + dialer interface { + Dial(context.Context) (*grpc.ClientConn, error) + } + startOnce sync.Once + otelsdkmetricexp.Exporter + } + + // ExportConfig represents YAML structured configuration for a set of OTEL + // trace/span/log exporters. + ExportConfig struct { + inner exportConfig `yaml:",inline"` + } +) + +// UnmarshalYAML loads the state of an ExportConfig from parsed YAML +func (ec *ExportConfig) UnmarshalYAML(n *yaml.Node) error { + return n.Decode(&ec.inner) +} + +func (ec *ExportConfig) SpanExporters() ([]otelsdktrace.SpanExporter, error) { + return ec.inner.SpanExporters() +} + +func (ec *ExportConfig) MetricExporters() ([]otelsdkmetricexp.Exporter, error) { + return ec.inner.MetricExporters() +} + +// Dial returns the cached *grpc.ClientConn instance or creates a new one, +// caches and then returns it. This function is not threadsafe. +func (g *grpcconn) Dial(ctx context.Context) (*grpc.ClientConn, error) { + var err error + if g.cc == nil { + g.cc, err = grpc.DialContext(ctx, g.Endpoint, g.dialOpts()...) + } + return g.cc, err +} + +func (g *grpcconn) dialOpts() []grpc.DialOption { + out := []grpc.DialOption{ + grpc.WithReadBufferSize(coalesce(g.ReadBufferSize, defaultReadBufferSize)), + grpc.WithWriteBufferSize(coalesce(g.WriteBufferSize, defaultWriteBufferSize)), + grpc.WithUserAgent(g.UserAgent), + grpc.WithConnectParams(grpc.ConnectParams{ + MinConnectTimeout: coalesce(g.ConnectParams.MinConnectTimeout, defaultMinConnectTimeout), + Backoff: backoff.Config{ + BaseDelay: coalesce(g.ConnectParams.Backoff.BaseDelay, backoff.DefaultConfig.BaseDelay), + MaxDelay: coalesce(g.ConnectParams.Backoff.MaxDelay, backoff.DefaultConfig.MaxDelay), + Jitter: coalesce(g.ConnectParams.Backoff.Jitter, backoff.DefaultConfig.Jitter), + Multiplier: coalesce(g.ConnectParams.Backoff.Multiplier, backoff.DefaultConfig.Multiplier), + }, + }), + } + if g.Insecure { + out = append(out, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + if g.Block { + out = append(out, grpc.WithBlock()) + } + if g.Authority != "" { + out = append(out, grpc.WithAuthority(g.Authority)) + } + return out +} + +// SpanExporters builds the set of OTEL SpanExporter objects defined by the YAML +// unmarshaled into this ExportConfig object. The returned SpanExporters have +// not been started. +func (ec *exportConfig) SpanExporters() ([]otelsdktrace.SpanExporter, error) { + out := make([]otelsdktrace.SpanExporter, 0, len(ec.Exporters)) + for _, expcfg := range ec.Exporters { + if !strings.HasPrefix(expcfg.Kind.Signal, "trace") { + continue + } + switch spec := expcfg.Spec.(type) { + case *otlpGrpcSpanExporter: + spanexp, err := ec.buildOtlpGrpcSpanExporter(spec) + if err != nil { + return nil, err + } + out = append(out, spanexp) + default: + return nil, fmt.Errorf("unsupported span exporter type: %T", spec) + } + } + return out, nil +} + +func (ec *exportConfig) MetricExporters() ([]otelsdkmetricexp.Exporter, error) { + out := make([]otelsdkmetricexp.Exporter, 0, len(ec.Exporters)) + for _, expcfg := range ec.Exporters { + if !strings.HasPrefix(expcfg.Kind.Signal, "metric") { + continue + } + switch spec := expcfg.Spec.(type) { + case *otlpGrpcMetricExporter: + metricexp, err := ec.buildOtlpGrpcMetricExporter(spec) + if err != nil { + return nil, err + } + out = append(out, metricexp) + default: + return nil, fmt.Errorf("unsupported metric exporter type: %T", spec) + } + } + return out, nil + +} + +func (ec *exportConfig) buildOtlpGrpcMetricExporter( + cfg *otlpGrpcMetricExporter, +) (otelsdkmetricexp.Exporter, error) { + dopts := cfg.Connection.dialOpts() + opts := []otlpmetricgrpc.Option{ + otlpmetricgrpc.WithEndpoint(cfg.Connection.Endpoint), + otlpmetricgrpc.WithHeaders(cfg.Headers), + otlpmetricgrpc.WithTimeout(coalesce(cfg.Timeout, 10*time.Second)), + otlpmetricgrpc.WithDialOption(dopts...), + otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetryConfig{ + Enabled: coalesce(cfg.Retry.Enabled, retryDefaultEnabled), + InitialInterval: coalesce(cfg.Retry.InitialInterval, retryDefaultInitialInterval), + MaxInterval: coalesce(cfg.Retry.MaxInterval, retryDefaultMaxInterval), + MaxElapsedTime: coalesce(cfg.Retry.MaxElapsedTime, retryDefaultMaxElapsedTime), + }), + } + + // work around https://github.com/open-telemetry/opentelemetry-go/issues/2940 + if cfg.Connection.Insecure { + opts = append(opts, otlpmetricgrpc.WithInsecure()) + } + + if cfg.ConnectionName == "" { + return otlpmetricgrpc.NewUnstarted(opts...), nil + } + + conncfg, ok := ec.findNamedGrpcConnCfg(cfg.ConnectionName) + if !ok { + return nil, fmt.Errorf("OTEL exporter connection %q not found", cfg.ConnectionName) + } + return &sharedConnMetricExporter{ + baseOpts: opts, + dialer: conncfg, + }, nil +} + +func (ec *exportConfig) buildOtlpGrpcSpanExporter( + cfg *otlpGrpcSpanExporter, +) (otelsdktrace.SpanExporter, error) { + opts := []otlptracegrpc.Option{ + otlptracegrpc.WithEndpoint(cfg.Connection.Endpoint), + otlptracegrpc.WithHeaders(cfg.Headers), + otlptracegrpc.WithTimeout(coalesce(cfg.Timeout, 10*time.Second)), + otlptracegrpc.WithDialOption(cfg.Connection.dialOpts()...), + otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{ + Enabled: coalesce(cfg.Retry.Enabled, retryDefaultEnabled), + InitialInterval: coalesce(cfg.Retry.InitialInterval, retryDefaultInitialInterval), + MaxInterval: coalesce(cfg.Retry.MaxInterval, retryDefaultMaxInterval), + MaxElapsedTime: coalesce(cfg.Retry.MaxElapsedTime, retryDefaultMaxElapsedTime), + }), + } + + // work around https://github.com/open-telemetry/opentelemetry-go/issues/2940 + if cfg.Connection.Insecure { + opts = append(opts, otlptracegrpc.WithInsecure()) + } + + if cfg.ConnectionName == "" { + return otlptracegrpc.NewUnstarted(opts...), nil + } + + conncfg, ok := ec.findNamedGrpcConnCfg(cfg.ConnectionName) + if !ok { + return nil, fmt.Errorf("OTEL exporter connection %q not found", cfg.ConnectionName) + } + return &sharedConnSpanExporter{ + baseOpts: opts, + dialer: conncfg, + }, nil +} + +// Start initiates the connection to an upstream grpc OTLP server +func (scse *sharedConnSpanExporter) Start(ctx context.Context) error { + var err error + scse.startOnce.Do(func() { + var cc *grpc.ClientConn + cc, err = scse.dialer.Dial(ctx) + if err != nil { + return + } + opts := append(scse.baseOpts, otlptracegrpc.WithGRPCConn(cc)) + scse.SpanExporter, err = otlptracegrpc.New(ctx, opts...) + }) + return err +} + +// Start initiates the connection to an upstream grpc OTLP server +func (scme *sharedConnMetricExporter) Start(ctx context.Context) error { + var err error + scme.startOnce.Do(func() { + var cc *grpc.ClientConn + cc, err = scme.dialer.Dial(ctx) + if err != nil { + return + } + opts := append(scme.baseOpts, otlpmetricgrpc.WithGRPCConn(cc)) + scme.Exporter, err = otlpmetricgrpc.New(ctx, opts...) + }) + return err +} + +func (ec *exportConfig) findNamedGrpcConnCfg(name string) (*grpcconn, bool) { + if name == "" { + return nil, false + } + for _, conn := range ec.Connections { + if gconn, ok := conn.Spec.(*grpcconn); ok && conn.Metadata.Name == name { + return gconn, true + } + } + return nil, false +} + +// UnmarshalYAML loads the state of a generic connection from parsed YAML +func (c *connection) UnmarshalYAML(n *yaml.Node) error { + type conn connection + type overlay struct { + *conn `yaml:",inline"` + Spec yaml.Node `yaml:"spec"` + } + obj := overlay{conn: (*conn)(c)} + err := n.Decode(&obj) + if err != nil { + return err + } + switch c.Kind { + case "grpc": + c.Spec = &grpcconn{} + default: + return fmt.Errorf("unsupported connection kind: %q", c.Kind) + } + return obj.Spec.Decode(c.Spec) +} + +// UnmarshalYAML loads the state of a generic exporter from parsed YAML +func (e *exporter) UnmarshalYAML(n *yaml.Node) error { + type exp exporter + type overlay struct { + *exp `yaml:",inline"` + Spec yaml.Node `yaml:"spec"` + } + obj := overlay{exp: (*exp)(e)} + err := n.Decode(&obj) + if err != nil { + return err + } + descriptor := fmt.Sprintf("%v+%v+%v", e.Kind.Signal, e.Kind.Model, e.Kind.Protocol) + switch descriptor { + case "traces+otlp+grpc", "trace+otlp+grpc": + e.Spec = new(otlpGrpcSpanExporter) + case "metrics+otlp+grpc", "metric+otlp+grpc": + e.Spec = new(otlpGrpcMetricExporter) + default: + return fmt.Errorf( + "unsupported exporter kind: signal=%q; model=%q; protocol=%q", + e.Kind.Signal, + e.Kind.Model, + e.Kind.Protocol, + ) + } + return obj.Spec.Decode(e.Spec) +} + +func coalesce[T comparable](vals ...T) T { + var zero T + for _, v := range vals { + if v != zero { + return v + } + } + return zero +} diff --git a/common/telemetry/config_test.go b/common/telemetry/config_test.go new file mode 100644 index 00000000000..eb5a45a114f --- /dev/null +++ b/common/telemetry/config_test.go @@ -0,0 +1,156 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package telemetry_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.temporal.io/server/common/telemetry" + "gopkg.in/yaml.v3" +) + +var basicOTLPTraceOnlyConfig = ` +exporters: + - kind: + signal: traces + model: otlp + protocol: grpc + spec: + headers: + a: b + c: d + timeout: 10s + retry: + enabled: true + initial_interval: 1s + max_interval: 1s + max_elapsed_time: 1s + connection: + block: false + insecure: true + endpoint: localhost:4317 +` + +var sharedConnOTLPConfig = ` +otel: + connections: + - kind: grpc + metadata: + name: conn1 + spec: + endpoint: localhost:4317 + exporters: + - kind: + signal: traces + model: otlp + protocol: grpc + spec: + connection_name: conn1 + - kind: + signal: metrics + model: otlp + protocol: grpc + spec: + connection_name: conn1 +` + +func TestEmptyConfig(t *testing.T) { + cfg := telemetry.ExportConfig{} + exporters, err := cfg.SpanExporters() + require.NoError(t, err) + require.Len(t, exporters, 0) +} + +func TestExportersWithSharedConn(t *testing.T) { + root := struct{ Otel telemetry.PrivateExportConfig }{} + err := yaml.Unmarshal([]byte(sharedConnOTLPConfig), &root) + require.NoError(t, err) + cfg := &root.Otel + + spanExporters, err := cfg.SpanExporters() + require.NoError(t, err) + require.Len(t, spanExporters, 1) + + metricExporters, err := cfg.MetricExporters() + require.NoError(t, err) + require.Len(t, metricExporters, 1) +} + +func TestSharedConn(t *testing.T) { + root := struct{ Otel telemetry.PrivateExportConfig }{} + err := yaml.Unmarshal([]byte(sharedConnOTLPConfig), &root) + require.NoError(t, err) + cfg := &root.Otel + require.Len(t, cfg.Connections, 1) + require.Len(t, cfg.Exporters, 2) + + exp := cfg.Exporters[0] + require.Equal(t, exp.Kind.Signal, "traces") + require.Equal(t, exp.Kind.Model, "otlp") + require.Equal(t, exp.Kind.Protocol, "grpc") + require.NotNil(t, exp.Spec) + sspec, ok := exp.Spec.(*telemetry.OTLPGRPCSpanExporter) + require.True(t, ok) + require.Equal(t, "conn1", sspec.ConnectionName) + + exp = cfg.Exporters[1] + require.Equal(t, exp.Kind.Signal, "metrics") + require.Equal(t, exp.Kind.Model, "otlp") + require.Equal(t, exp.Kind.Protocol, "grpc") + require.NotNil(t, exp.Spec) + mspec, ok := exp.Spec.(*telemetry.OTLPGRPCMetricExporter) + require.True(t, ok) + require.Equal(t, "conn1", mspec.ConnectionName) +} + +func TestOTLPTraceGRPC(t *testing.T) { + cfg := telemetry.PrivateExportConfig{} + err := yaml.Unmarshal([]byte(basicOTLPTraceOnlyConfig), &cfg) + require.NoError(t, err) + require.Len(t, cfg.Connections, 0) + require.Len(t, cfg.Exporters, 1) + + exp := cfg.Exporters[0] + require.Equal(t, exp.Kind.Signal, "traces") + require.Equal(t, exp.Kind.Model, "otlp") + require.Equal(t, exp.Kind.Protocol, "grpc") + require.NotNil(t, exp.Spec) + + spec, ok := exp.Spec.(*telemetry.OTLPGRPCSpanExporter) + require.True(t, ok) + require.Equal(t, map[string]string{"a": "b", "c": "d"}, spec.Headers) + require.Equal(t, 10*time.Second, spec.Timeout) + require.True(t, spec.Retry.Enabled) + require.Equal(t, time.Second, spec.Retry.InitialInterval) + require.Equal(t, time.Second, spec.Retry.MaxInterval) + require.Equal(t, time.Second, spec.Retry.MaxElapsedTime) + + conn := spec.Connection + require.True(t, conn.Insecure) + require.Equal(t, "localhost:4317", conn.Endpoint) + require.False(t, conn.Block) +} diff --git a/common/telemetry/export_test.go b/common/telemetry/export_test.go new file mode 100644 index 00000000000..64c3bc80946 --- /dev/null +++ b/common/telemetry/export_test.go @@ -0,0 +1,31 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package telemetry + +type ( + OTLPGRPCSpanExporter = otlpGrpcSpanExporter + OTLPGRPCMetricExporter = otlpGrpcMetricExporter + PrivateExportConfig = exportConfig +) diff --git a/common/telemetry/grpc.go b/common/telemetry/grpc.go new file mode 100644 index 00000000000..c0456c074b1 --- /dev/null +++ b/common/telemetry/grpc.go @@ -0,0 +1,72 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package telemetry + +import ( + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" +) + +type ( + // ServerTraceInterceptor gives a named type to the + // grpc.UnaryServerInterceptor implementation provided by otelgrpc + ServerTraceInterceptor grpc.UnaryServerInterceptor + + // ClientTraceInterceptor gives a named type to the + // grpc.UnaryClientInterceptor implementation provided by otelgrpc + ClientTraceInterceptor grpc.UnaryClientInterceptor +) + +// NewServerTraceInterceptor creates a new gRPC server interceptor that tracks +// each request with an encapsulating span using the provided TracerProvider and +// TextMapPropagator. +func NewServerTraceInterceptor( + tp trace.TracerProvider, + tmp propagation.TextMapPropagator, +) ServerTraceInterceptor { + return ServerTraceInterceptor( + otelgrpc.UnaryServerInterceptor( + otelgrpc.WithPropagators(tmp), + otelgrpc.WithTracerProvider(tp), + ), + ) +} + +// NewClientTraceInterceptor creates a new gRPC client interceptor that tracks +// each request with an encapsulating span using the provided TracerProvider and +// TextMapPropagator. +func NewClientTraceInterceptor( + tp trace.TracerProvider, + tmp propagation.TextMapPropagator, +) ClientTraceInterceptor { + return ClientTraceInterceptor( + otelgrpc.UnaryClientInterceptor( + otelgrpc.WithPropagators(tmp), + otelgrpc.WithTracerProvider(tp), + ), + ) +} diff --git a/go.mod b/go.mod index 37c68d9f1e8..9361c4c95a1 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,10 @@ require ( github.com/urfave/cli v1.22.5 github.com/urfave/cli/v2 v2.4.0 github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 go.opentelemetry.io/otel v1.7.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.30.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.7.0 go.opentelemetry.io/otel/exporters/prometheus v0.30.0 go.opentelemetry.io/otel/metric v0.30.0 go.opentelemetry.io/otel/sdk v1.7.0 @@ -77,6 +80,7 @@ require ( github.com/aws/smithy-go v1.11.2 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/cenkalti/backoff/v4 v4.1.3 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -90,6 +94,7 @@ require ( github.com/googleapis/gax-go/v2 v2.4.0 // indirect github.com/googleapis/go-type-adapters v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -112,7 +117,11 @@ require ( github.com/twmb/murmur3 v1.1.6 // indirect github.com/uber-common/bark v1.3.0 // indirect go.opencensus.io v0.23.0 // indirect - go.opentelemetry.io/otel/trace v1.7.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.30.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0 // indirect + go.opentelemetry.io/otel/trace v1.7.0 + go.opentelemetry.io/proto/otlp v0.16.0 // indirect go.uber.org/dig v1.14.1 // indirect golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect diff --git a/go.sum b/go.sum index d8200a3e217..451be4dcf16 100644 --- a/go.sum +++ b/go.sum @@ -116,6 +116,8 @@ github.com/brianvoe/gofakeit/v6 v6.15.0/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2 github.com/cactus/go-statsd-client/statsd v0.0.0-20191106001114-12b4e2b38748/go.mod h1:l/bIBLeOl9eX+wxJAzxS4TveKRtAqlyDpHjhkfO0MEI= github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c h1:HIGF0r/56+7fuIZw2V4isE22MK6xpxWx7BbV8dJ290w= github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c/go.mod h1:l/bIBLeOl9eX+wxJAzxS4TveKRtAqlyDpHjhkfO0MEI= +github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= +github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -196,6 +198,8 @@ github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY github.com/golang-jwt/jwt/v4 v4.4.1 h1:pC5DB52sCeK48Wlb9oPcdhnjkz1TKt1D/P7WKJ0kUcQ= github.com/golang-jwt/jwt/v4 v4.4.1/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= +github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -288,6 +292,8 @@ github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+ github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 h1:BZHcxBETFHIdVyhyEfOvn/RdU/QGdLI4y34qQGjGWO0= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -466,8 +472,20 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 h1:WenoaOMNP71oq3KkMZ/jnxI9xU/JSCLw8yZILSI2lfU= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0/go.mod h1:J0dBVrt7dPS/lKJyQoW0xzQiUr4r2Ik1VwPjAUWnofI= go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM= go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 h1:7Yxsak1q4XrJ5y7XBnNwqWx9amMZvoidCctv62XOQ6Y= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0/go.mod h1:M1hVZHNxcbkAlcvrOMlpQ4YOO3Awf+4N2dxkZL3xm04= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.30.0 h1:Os0ds8fJp2AUa9DNraFWIycgUzevz47i6UvnSh+8LQ0= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.30.0/go.mod h1:8Lz1GGcrx1kPGE3zqDrK7ZcPzABEfIQqBjq7roQa5ZA= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.30.0 h1:7E8znQuiqnaFDDl1zJYUpoqHteZI6u2rrcxH3Gwoiis= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.30.0/go.mod h1:RejW0QAFotPIixlFZKZka4/70S5UaFOqDO9DYOgScIs= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0 h1:cMDtmgJ5FpRvqx9x2Aq+Mm0O6K/zcUkH73SFz20TuBw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0/go.mod h1:ceUgdyfNv4h4gLxHR0WNfDiiVmZFodZhZSbOLhpxqXE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.7.0 h1:MFAyzUPrTwLOwCi+cltN0ZVyy4phU41lwH+lyMyQTS4= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.7.0/go.mod h1:E+/KKhwOSw8yoPxSSuUHG6vKppkvhN+S1Jc7Nib3k3o= go.opentelemetry.io/otel/exporters/prometheus v0.30.0 h1:YXo5ZY5nofaEYMCMTTMaRH2cLDZB8+0UGuk5RwMfIo0= go.opentelemetry.io/otel/exporters/prometheus v0.30.0/go.mod h1:qN5feW+0/d661KDtJuATEmHtw5bKBK7NSvNEP927zSs= go.opentelemetry.io/otel/metric v0.30.0 h1:Hs8eQZ8aQgs0U49diZoaS6Uaxw3+bBE3lcMUKBFIk3c= @@ -479,6 +497,8 @@ go.opentelemetry.io/otel/sdk/metric v0.30.0/go.mod h1:8AKFRi5HyvTR0RRty3paN1aMC9 go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o= go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.opentelemetry.io/proto/otlp v0.16.0 h1:WHzDWdXUvbc5bG2ObdrGfaNpQz7ft7QN9HHmJlbiB1E= +go.opentelemetry.io/proto/otlp v0.16.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.temporal.io/api v1.8.0/go.mod h1:7m1ZOVUFi/54a5IMzMeELnvDy5sJwRfz11zi3Jrww8w= go.temporal.io/api v1.8.1-0.20220603192404-e65836719706 h1:9zrW4CMQUgBMx9IUZ0qE/HhRxZEugmgvFTXBZhIdlsw= go.temporal.io/api v1.8.1-0.20220603192404-e65836719706/go.mod h1:7m1ZOVUFi/54a5IMzMeELnvDy5sJwRfz11zi3Jrww8w= @@ -496,8 +516,9 @@ go.uber.org/dig v1.14.1 h1:fyakRgZDdi2F8FgwJJoRGangMSPTIxPSLGzR3Oh0/54= go.uber.org/dig v1.14.1/go.mod h1:52EKx/Vjdpz9EzeNcweC4YMsTrDdFn9mS/+Uw5ZnVTI= go.uber.org/fx v1.17.1 h1:S42dZ6Pok8hQ3jxKwo6ZMYcCgHQA/wAS/gnpRa1Pksg= go.uber.org/fx v1.17.1/go.mod h1:yO7KN5rhlARljyo4LR047AjaV6J+KFzd/Z7rnTbEn0A= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= @@ -951,6 +972,7 @@ google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnD google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.40.1/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= +google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= diff --git a/host/onebox.go b/host/onebox.go index e5fe943b6b7..d53d94842cf 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -39,6 +39,7 @@ import ( "go.temporal.io/api/operatorservice/v1" "go.temporal.io/api/workflowservice/v1" + otelsdktrace "go.opentelemetry.io/otel/sdk/trace" "go.temporal.io/server/api/adminservice/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/client" @@ -71,6 +72,7 @@ import ( "go.temporal.io/server/service/worker" "go.temporal.io/server/service/worker/archiver" "go.temporal.io/server/service/worker/replicator" + "go.temporal.io/server/temporal" ) // Temporal hosts all of temporal services in one process @@ -127,6 +129,7 @@ type ( workerConfig *WorkerConfig mockAdminClient map[string]adminservice.AdminServiceClient namespaceReplicationTaskExecutor namespace.ReplicationTaskExecutor + spanExporters []otelsdktrace.SpanExporter } // HistoryConfig contains configs for history service @@ -158,6 +161,7 @@ type ( WorkerConfig *WorkerConfig MockAdminClient map[string]adminservice.AdminServiceClient NamespaceReplicationTaskExecutor namespace.ReplicationTaskExecutor + SpanExporters []otelsdktrace.SpanExporter } ) @@ -183,6 +187,7 @@ func NewTemporal(params *TemporalParams) *temporalImpl { workerConfig: params.WorkerConfig, mockAdminClient: params.MockAdminClient, namespaceReplicationTaskExecutor: params.NamespaceReplicationTaskExecutor, + spanExporters: params.SpanExporters, } } @@ -432,6 +437,8 @@ func (c *temporalImpl) startFrontend(hosts map[string][]string, startWG *sync.Wa fx.Provide(func() log.Logger { return c.logger }), fx.Provide(func() *esclient.Config { return c.esConfig }), fx.Provide(func() esclient.Client { return c.esClient }), + fx.Supply(c.spanExporters), + temporal.ServiceTracingModule, frontend.Module, fx.Populate(&frontendService, &clientBean, &namespaceRegistry), fx.NopLogger, @@ -528,6 +535,8 @@ func (c *temporalImpl) startHistory( fx.Provide(func() *esclient.Config { return c.esConfig }), fx.Provide(func() esclient.Client { return c.esClient }), fx.Provide(workflow.NewTaskGeneratorProvider), + fx.Supply(c.spanExporters), + temporal.ServiceTracingModule, history.QueueProcessorModule, history.Module, replication.Module, @@ -603,6 +612,8 @@ func (c *temporalImpl) startMatching(hosts map[string][]string, startWG *sync.Wa fx.Provide(func() persistenceClient.AbstractDataStoreFactory { return nil }), fx.Provide(func() dynamicconfig.Client { return newIntegrationConfigClient(dynamicconfig.NewNoopClient()) }), fx.Provide(func() log.Logger { return c.logger }), + fx.Supply(c.spanExporters), + temporal.ServiceTracingModule, matching.Module, fx.Populate(&matchingService, &clientBean, &namespaceRegistry), fx.NopLogger, @@ -697,7 +708,8 @@ func (c *temporalImpl) startWorker(hosts map[string][]string, startWG *sync.Wait fx.Provide(func() log.Logger { return c.logger }), fx.Provide(func() esclient.Client { return c.esClient }), fx.Provide(func() *esclient.Config { return c.esConfig }), - + fx.Supply(c.spanExporters), + temporal.ServiceTracingModule, worker.Module, fx.Populate(&workerService, &clientBean, &namespaceRegistry), fx.NopLogger, diff --git a/service/frontend/fx.go b/service/frontend/fx.go index ed4885d4852..a7ea0f841eb 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -61,6 +61,7 @@ import ( "go.temporal.io/server/common/rpc/interceptor" "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/searchattribute" + "go.temporal.io/server/common/telemetry" "go.temporal.io/server/service" "go.temporal.io/server/service/frontend/configs" ) @@ -134,6 +135,7 @@ func GrpcServerOptionsProvider( namespaceValidatorInterceptor *interceptor.NamespaceValidatorInterceptor, telemetryInterceptor *interceptor.TelemetryInterceptor, rateLimitInterceptor *interceptor.RateLimitInterceptor, + traceInterceptor telemetry.ServerTraceInterceptor, sdkVersionInterceptor *interceptor.SDKVersionInterceptor, authorizer authorization.Authorizer, claimMapper authorization.ClaimMapper, @@ -159,6 +161,7 @@ func GrpcServerOptionsProvider( interceptors := []grpc.UnaryServerInterceptor{ namespaceLogInterceptor.Intercept, rpc.ServiceErrorInterceptor, + grpc.UnaryServerInterceptor(traceInterceptor), metrics.NewServerMetricsContextInjectorInterceptor(), telemetryInterceptor.Intercept, namespaceValidatorInterceptor.Intercept, diff --git a/service/fx.go b/service/fx.go index 5e895f0d1aa..dbc27ebafd4 100644 --- a/service/fx.go +++ b/service/fx.go @@ -35,6 +35,7 @@ import ( persistenceClient "go.temporal.io/server/common/persistence/client" "go.temporal.io/server/common/rpc" "go.temporal.io/server/common/rpc/interceptor" + "go.temporal.io/server/common/telemetry" ) func PersistenceMaxQpsFn( @@ -61,6 +62,7 @@ func GrpcServerOptionsProvider( rpcFactory common.RPCFactory, telemetryInterceptor *interceptor.TelemetryInterceptor, rateLimitInterceptor *interceptor.RateLimitInterceptor, + tracingInterceptor telemetry.ServerTraceInterceptor, ) []grpc.ServerOption { grpcServerOptions, err := rpcFactory.GetInternodeGRPCServerOptions() @@ -72,6 +74,7 @@ func GrpcServerOptionsProvider( grpcServerOptions, grpc.ChainUnaryInterceptor( rpc.ServiceErrorInterceptor, + grpc.UnaryServerInterceptor(tracingInterceptor), metrics.NewServerMetricsContextInjectorInterceptor(), metrics.NewServerMetricsTrailerPropagatorInterceptor(logger), telemetryInterceptor.Intercept, diff --git a/service/history/consts/const.go b/service/history/consts/const.go index 28823340b14..5031f6002ba 100644 --- a/service/history/consts/const.go +++ b/service/history/consts/const.go @@ -36,6 +36,7 @@ import ( const ( IdentityHistoryService = "history-service" IdentityResetter = "history-resetter" + LibraryName = "go.temporal.io/service/history" ) var ( diff --git a/service/history/fx.go b/service/history/fx.go index f8b9b207434..c345286eb95 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -52,6 +52,7 @@ import ( "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service" "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" @@ -131,6 +132,7 @@ func HandlerProvider(args NewHandlerArgs) *Handler { controller: args.ShardController, eventNotifier: args.EventNotifier, replicationTaskFetcherFactory: args.ReplicationTaskFetcherFactory, + tracer: args.TracerProvider.Tracer(consts.LibraryName), } // prevent us from trying to serve requests before shard controller is started and ready diff --git a/service/history/handler.go b/service/history/handler.go index 71a815fef18..f1b0ac6d166 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -33,6 +33,7 @@ import ( "sync/atomic" "github.com/pborman/uuid" + "go.opentelemetry.io/otel/trace" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" @@ -98,6 +99,7 @@ type ( archivalMetadata archiver.ArchivalMetadata hostInfoProvider membership.HostInfoProvider controller *shard.ControllerImpl + tracer trace.Tracer } NewHandlerArgs struct { @@ -122,6 +124,7 @@ type ( ShardController *shard.ControllerImpl EventNotifier events.Notifier ReplicationTaskFetcherFactory replication.TaskFetcherFactory + TracerProvider trace.TracerProvider } ) diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 93f9f0149e6..fe6c1c00b15 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -32,6 +32,7 @@ import ( "time" "github.com/pborman/uuid" + "go.opentelemetry.io/otel/trace" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" @@ -114,6 +115,7 @@ type ( workflowDeleteManager workflow.DeleteManager eventSerializer serialization.Serializer workflowConsistencyChecker api.WorkflowConsistencyChecker + tracer trace.Tracer } ) @@ -132,6 +134,7 @@ func NewEngineWithShardContext( queueProcessorFactories []queues.ProcessorFactory, replicationTaskFetcherFactory replication.TaskFetcherFactory, replicationTaskExecutorProvider replication.TaskExecutorProvider, + tracerProvider trace.TracerProvider, ) shard.Engine { currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName() @@ -166,6 +169,7 @@ func NewEngineWithShardContext( workflowDeleteManager: workflowDeleteManager, eventSerializer: eventSerializer, workflowConsistencyChecker: api.NewWorkflowConsistencyChecker(shard, historyCache), + tracer: tracerProvider.Tracer(consts.LibraryName), } historyEngImpl.queueProcessors = make(map[tasks.Category]queues.Processor) diff --git a/service/history/historyEngineFactory.go b/service/history/historyEngineFactory.go index 0a6dd6d4ccd..2eb3b636a6d 100644 --- a/service/history/historyEngineFactory.go +++ b/service/history/historyEngineFactory.go @@ -25,6 +25,7 @@ package history import ( + "go.opentelemetry.io/otel/trace" "go.uber.org/fx" "go.temporal.io/server/client" @@ -56,6 +57,7 @@ type ( QueueProcessorFactories []queues.ProcessorFactory `group:"queueProcessorFactory"` ReplicationTaskFetcherFactory replication.TaskFetcherFactory ReplicationTaskExecutorProvider replication.TaskExecutorProvider + TracerProvider trace.TracerProvider } historyEngineFactory struct { @@ -80,5 +82,6 @@ func (f *historyEngineFactory) CreateEngine( f.QueueProcessorFactories, f.ReplicationTaskFetcherFactory, f.ReplicationTaskExecutorProvider, + f.TracerProvider, ) } diff --git a/service/history/shard/controller_impl.go b/service/history/shard/controller_impl.go index 9adf75b4a53..9cfb05f5633 100644 --- a/service/history/shard/controller_impl.go +++ b/service/history/shard/controller_impl.go @@ -31,6 +31,7 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/otel/trace" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/client" "go.temporal.io/server/common" @@ -84,6 +85,7 @@ type ( clusterMetadata cluster.Metadata archivalMetadata archiver.ArchivalMetadata hostInfoProvider membership.HostInfoProvider + tracer trace.Tracer } ) diff --git a/service/history/shard/fx.go b/service/history/shard/fx.go index 495b83c2d49..0223057f5cd 100644 --- a/service/history/shard/fx.go +++ b/service/history/shard/fx.go @@ -25,6 +25,7 @@ package shard import ( + "go.opentelemetry.io/otel/trace" "go.uber.org/fx" "go.temporal.io/server/api/historyservice/v1" @@ -42,6 +43,7 @@ import ( "go.temporal.io/server/common/resource" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/consts" ) var Module = fx.Options( @@ -68,6 +70,7 @@ func ShardControllerProvider( archivalMetadata archiver.ArchivalMetadata, hostInfoProvider membership.HostInfoProvider, engineFactory EngineFactory, + tracerProvider trace.TracerProvider, ) *ControllerImpl { return &ControllerImpl{ status: common.DaemonStatusInitialized, @@ -95,5 +98,6 @@ func ShardControllerProvider( archivalMetadata: archivalMetadata, hostInfoProvider: hostInfoProvider, engineFactory: engineFactory, + tracer: tracerProvider.Tracer(consts.LibraryName), } } diff --git a/temporal/fx.go b/temporal/fx.go index 84f77233260..cc2e72a2445 100644 --- a/temporal/fx.go +++ b/temporal/fx.go @@ -27,10 +27,18 @@ package temporal import ( "context" "fmt" + "strings" "time" "go.temporal.io/server/service/history/replication" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + otelresource "go.opentelemetry.io/otel/sdk/resource" + otelsdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.10.0" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "go.uber.org/fx" @@ -38,7 +46,9 @@ import ( "github.com/pborman/uuid" "go.temporal.io/server/common/collection" + "go.temporal.io/server/common/headers" "go.temporal.io/server/common/resource" + "go.temporal.io/server/common/telemetry" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/client" @@ -132,6 +142,7 @@ func NewServerFx(opts ...ServerOption) *ServerFx { ServerFxImplModule, fx.Supply(opts), fx.Provide(ServerOptionsProvider), + TraceExportModule, fx.Provide(PersistenceFactoryProvider), fx.Provide(HistoryServiceProvider), @@ -328,6 +339,8 @@ type ( Authorizer authorization.Authorizer ClaimMapper authorization.ClaimMapper DataStoreFactory persistenceClient.AbstractDataStoreFactory + SpanExporters []otelsdktrace.SpanExporter + InstanceID resource.InstanceID `optional:"true"` } ) @@ -373,6 +386,8 @@ func HistoryServiceProvider( fx.Provide(func() esclient.Client { return params.EsClient }), fx.Provide(params.PersistenceFactoryProvider), fx.Provide(workflow.NewTaskGeneratorProvider), + fx.Supply(params.SpanExporters), + ServiceTracingModule, resource.DefaultOptions, history.QueueProcessorModule, history.Module, @@ -430,6 +445,8 @@ func MatchingServiceProvider( fx.Provide(func() metrics.Reporter { return params.ServerReporter }), fx.Provide(func() esclient.Client { return params.EsClient }), fx.Provide(params.PersistenceFactoryProvider), + fx.Supply(params.SpanExporters), + ServiceTracingModule, resource.DefaultOptions, matching.Module, fx.NopLogger, @@ -486,6 +503,8 @@ func FrontendServiceProvider( fx.Provide(func() resource.NamespaceLogger { return params.NamespaceLogger }), fx.Provide(func() esclient.Client { return params.EsClient }), fx.Provide(params.PersistenceFactoryProvider), + fx.Supply(params.SpanExporters), + ServiceTracingModule, resource.DefaultOptions, frontend.Module, fx.NopLogger, @@ -541,6 +560,8 @@ func WorkerServiceProvider( fx.Provide(func() metrics.Reporter { return params.ServerReporter }), fx.Provide(func() esclient.Client { return params.EsClient }), fx.Provide(params.PersistenceFactoryProvider), + fx.Supply(params.SpanExporters), + ServiceTracingModule, resource.DefaultOptions, worker.Module, fx.NopLogger, @@ -762,3 +783,134 @@ func verifyPersistenceCompatibleVersion(config config.Persistence, persistenceSe } return nil } + +// TraceExportModule holds process-global telemetry fx state defining the set of +// OTEL trace/span exporters used by tracing instrumentation. The following +// types can be overriden/augmented with fx.Replace/fx.Decorate: +// +// - []go.opentelemetry.io/otel/sdk/trace.SpanExporter +var TraceExportModule = fx.Options( + fx.Invoke(func(log log.Logger) { + otel.SetErrorHandler(otel.ErrorHandlerFunc( + func(err error) { + log.Warn("OTEL error", tag.Error(err), tag.ErrorType(err)) + }), + ) + }), + + fx.Provide(func(lc fx.Lifecycle, c *config.Config) ([]otelsdktrace.SpanExporter, error) { + exporters, err := c.ExporterConfig.SpanExporters() + if err != nil { + return nil, err + } + lc.Append(fx.Hook{ + OnStart: startAll(exporters), + OnStop: shutdownAll(exporters), + }) + return exporters, nil + }), +) + +// ServiceTracingModule holds per-service (i.e. frontend/history/matching/worker) fx +// state. The following types can be overriden with fx.Replace/fx.Decorate: +// +// - []go.opentelemetry.io/otel/sdk/trace.BatchSpanProcessorOption +// default: empty slice +// - []go.opentelemetry.io/otel/sdk/trace.SpanProcessor +// default: wrap each otelsdktrace.SpanExporter with otelsdktrace.NewBatchSpanProcessor +// - *go.opentelemetry.io/otel/sdk/resource.Resource +// default: resource.Default() augmented with the supplied serviceName +// - []go.opentelemetry.io/otel/sdk/trace.TracerProviderOption +// default: the provided resource.Resource and each of the otelsdktrace.SpanExporter +// - go.opentelemetry.io/otel/trace.TracerProvider +// default: otelsdktrace.NewTracerProvider with each of the otelsdktrace.TracerProviderOption +// - go.opentelemetry.io/otel/ppropagation.TextMapPropagator +// default: propagation.TraceContext{} +// - telemetry.ServerTraceInterceptor +// - telemetry.ClientTraceInterceptor +var ServiceTracingModule = fx.Options( + fx.Supply([]otelsdktrace.BatchSpanProcessorOption{}), + fx.Provide( + fx.Annotate( + func(exps []otelsdktrace.SpanExporter, opts []otelsdktrace.BatchSpanProcessorOption) []otelsdktrace.SpanProcessor { + sps := make([]otelsdktrace.SpanProcessor, 0, len(exps)) + for _, exp := range exps { + sps = append(sps, otelsdktrace.NewBatchSpanProcessor(exp, opts...)) + } + return sps + }, + fx.ParamTags(`optional:"true"`, ``), + ), + ), + fx.Provide( + fx.Annotate( + func(rsn resource.ServiceName, rsi resource.InstanceID) (*otelresource.Resource, error) { + serviceName := string(rsn) + if !strings.HasPrefix(serviceName, "io.temporal.") { + serviceName = fmt.Sprintf("io.temporal.%s", serviceName) + } + attrs := []attribute.KeyValue{ + semconv.ServiceNameKey.String(serviceName), + semconv.ServiceVersionKey.String(headers.ServerVersion), + } + if rsi != "" { + attrs = append(attrs, semconv.ServiceInstanceIDKey.String(string(rsi))) + } + return otelresource.New(context.Background(), + otelresource.WithProcess(), + otelresource.WithOS(), + otelresource.WithHost(), + otelresource.WithContainer(), + otelresource.WithAttributes(attrs...), + ) + }, + fx.ParamTags(``, `optional:"true"`), + ), + ), + fx.Provide( + func(r *otelresource.Resource, sps []otelsdktrace.SpanProcessor) []otelsdktrace.TracerProviderOption { + opts := make([]otelsdktrace.TracerProviderOption, 0, len(sps)+1) + opts = append(opts, otelsdktrace.WithResource(r)) + for _, sp := range sps { + opts = append(opts, otelsdktrace.WithSpanProcessor(sp)) + } + return opts + }, + ), + fx.Provide(func(lc fx.Lifecycle, opts []otelsdktrace.TracerProviderOption) trace.TracerProvider { + tp := otelsdktrace.NewTracerProvider(opts...) + lc.Append(fx.Hook{OnStop: tp.Shutdown}) + return tp + }), + // Haven't had use for baggage propagation yet + fx.Provide(func() propagation.TextMapPropagator { return propagation.TraceContext{} }), + fx.Provide(telemetry.NewServerTraceInterceptor), + fx.Provide(telemetry.NewClientTraceInterceptor), +) + +func startAll(exporters []otelsdktrace.SpanExporter) func(ctx context.Context) error { + type starter interface{ Start(context.Context) error } + return func(ctx context.Context) error { + for _, e := range exporters { + if starter, ok := e.(starter); ok { + err := starter.Start(ctx) + if err != nil { + return err + } + } + } + return nil + } +} + +func shutdownAll(exporters []otelsdktrace.SpanExporter) func(ctx context.Context) error { + return func(ctx context.Context) error { + for _, e := range exporters { + err := e.Shutdown(ctx) + if err != nil { + return err + } + } + return nil + } +}