Skip to content

Commit

Permalink
Per-service fx-ified OTEL tracing (#2896)
Browse files Browse the repository at this point in the history
Build trace exporters from YAML and make TracerProvider instances available to the history/matching/worker/frontend services. Also puts gRPC instrumentation interceptors in place so that gRPC requests and responses are given their own spans.
  • Loading branch information
Matt McShane authored Jun 22, 2022
1 parent 73881a3 commit f86b8d2
Show file tree
Hide file tree
Showing 23 changed files with 962 additions and 29 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@
# Fossa
.fossa.yml

# direnv
.envrc
3 changes: 3 additions & 0 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.temporal.io/server/common/masker"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client"
"go.temporal.io/server/common/telemetry"
)

type (
Expand All @@ -64,6 +65,8 @@ type (
DynamicConfigClient *dynamicconfig.FileBasedClientConfig `yaml:"dynamicConfigClient"`
// NamespaceDefaults is the default config for every namespace
NamespaceDefaults NamespaceDefaults `yaml:"namespaceDefaults"`
// ExporterConfig allows the specification of process-wide OTEL exporters
ExporterConfig telemetry.ExportConfig `yaml:"otel"`
}

// Service contains the service specific config items
Expand Down
15 changes: 14 additions & 1 deletion common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"time"

"go.uber.org/fx"
"google.golang.org/grpc"

"go.temporal.io/api/workflowservice/v1"

Expand Down Expand Up @@ -62,6 +63,7 @@ import (
"go.temporal.io/server/common/rpc/encryption"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/telemetry"
)

type (
Expand Down Expand Up @@ -416,7 +418,18 @@ func RPCFactoryProvider(
tlsConfigProvider encryption.TLSConfigProvider,
dc *dynamicconfig.Collection,
clusterMetadata *cluster.Config,
traceInterceptor telemetry.ClientTraceInterceptor,
) common.RPCFactory {
svcCfg := cfg.Services[string(svcName)]
return rpc.NewFactory(&svcCfg.RPC, string(svcName), logger, tlsConfigProvider, dc, clusterMetadata)
return rpc.NewFactory(
&svcCfg.RPC,
string(svcName),
logger,
tlsConfigProvider,
dc,
clusterMetadata,
[]grpc.UnaryClientInterceptor{
grpc.UnaryClientInterceptor(traceInterceptor),
},
)
}
11 changes: 7 additions & 4 deletions common/rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const (
// The hostName syntax is defined in
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
func Dial(hostName string, tlsConfig *tls.Config, logger log.Logger) (*grpc.ClientConn, error) {
func Dial(hostName string, tlsConfig *tls.Config, logger log.Logger, interceptors ...grpc.UnaryClientInterceptor) (*grpc.ClientConn, error) {
// Default to insecure
grpcSecureOpt := grpc.WithInsecure()
if tlsConfig != nil {
Expand All @@ -83,9 +83,12 @@ func Dial(hostName string, tlsConfig *tls.Config, logger log.Logger) (*grpc.Clie
grpcSecureOpt,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxInternodeRecvPayloadSize)),
grpc.WithChainUnaryInterceptor(
versionHeadersInterceptor,
metrics.NewClientMetricsTrailerPropagatorInterceptor(logger),
errorInterceptor,
append(
interceptors,
versionHeadersInterceptor,
metrics.NewClientMetricsTrailerPropagatorInterceptor(logger),
errorInterceptor,
)...,
),
grpc.WithDefaultServiceConfig(DefaultServiceConfig),
grpc.WithDisableServiceConfig(),
Expand Down
21 changes: 12 additions & 9 deletions common/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ type RPCFactory struct {
clusterMetadata *cluster.Config

sync.Mutex
grpcListener net.Listener
tlsFactory encryption.TLSConfigProvider
grpcListener net.Listener
tlsFactory encryption.TLSConfigProvider
clientInterceptors []grpc.UnaryClientInterceptor
}

// NewFactory builds a new RPCFactory
Expand All @@ -63,14 +64,16 @@ func NewFactory(
tlsProvider encryption.TLSConfigProvider,
dc *dynamicconfig.Collection,
clusterMetadata *cluster.Config,
clientInterceptors []grpc.UnaryClientInterceptor,
) *RPCFactory {
return &RPCFactory{
config: cfg,
serviceName: sName,
logger: logger,
dc: dc,
tlsFactory: tlsProvider,
clusterMetadata: clusterMetadata,
config: cfg,
serviceName: sName,
logger: logger,
dc: dc,
tlsFactory: tlsProvider,
clusterMetadata: clusterMetadata,
clientInterceptors: clientInterceptors,
}
}

Expand Down Expand Up @@ -225,7 +228,7 @@ func (d *RPCFactory) CreateInternodeGRPCConnection(hostName string) *grpc.Client
}

func (d *RPCFactory) dial(hostName string, tlsClientConfig *tls.Config) *grpc.ClientConn {
connection, err := Dial(hostName, tlsClientConfig, d.logger)
connection, err := Dial(hostName, tlsClientConfig, d.logger, d.clientInterceptors...)
if err != nil {
d.logger.Fatal("Failed to create gRPC connection", tag.Error(err))
return nil
Expand Down
27 changes: 15 additions & 12 deletions common/rpc/test/rpc_localstore_tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"go.temporal.io/server/common/config"
Expand All @@ -50,6 +51,8 @@ const (
frontendServerCertSerialNumber = 150
)

var noExtraInterceptors = []grpc.UnaryClientInterceptor{}

type localStoreRPCSuite struct {
*require.Assertions
suite.Suite
Expand Down Expand Up @@ -128,7 +131,7 @@ func (s *localStoreRPCSuite) SetupSuite() {

provider, err := encryption.NewTLSConfigProviderFromConfig(serverCfgInsecure.TLS, metrics.NoopClient, s.logger, nil)
s.NoError(err)
insecureFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata)
insecureFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors)
s.NotNil(insecureFactory)
s.insecureRPCFactory = i(insecureFactory)

Expand Down Expand Up @@ -336,22 +339,22 @@ func (s *localStoreRPCSuite) setupFrontend() {

provider, err := encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLS.TLS, metrics.NoopClient, s.logger, nil)
s.NoError(err)
frontendMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata)
frontendMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors)
s.NotNil(frontendMutualTLSFactory)

provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, metrics.NoopClient, s.logger, nil)
s.NoError(err)
frontendServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata)
frontendServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors)
s.NotNil(frontendServerTLSFactory)

provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSSystemWorker.TLS, metrics.NoopClient, s.logger, nil)
s.NoError(err)
frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata)
frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors)
s.NotNil(frontendSystemWorkerMutualTLSFactory)

provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, metrics.NoopClient, s.logger, nil)
s.NoError(err)
frontendMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata)
frontendMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors)
s.NotNil(frontendMutualTLSRefreshFactory)

s.frontendMutualTLSRPCFactory = f(frontendMutualTLSFactory)
Expand All @@ -365,21 +368,21 @@ func (s *localStoreRPCSuite) setupFrontend() {
s.frontendRollingCerts,
s.dynamicCACertPool,
s.wrongCACertPool)
dynamicServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, s.dynamicConfigProvider, dynamicconfig.NewNoopCollection(), clusterMetadata)
dynamicServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, s.dynamicConfigProvider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors)
s.frontendDynamicTLSFactory = f(dynamicServerTLSFactory)
s.internodeDynamicTLSFactory = i(dynamicServerTLSFactory)

s.frontendMutualTLSRPCRefreshFactory = f(frontendMutualTLSRefreshFactory)

provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreRootCAForceTLS.TLS, metrics.NoopClient, s.logger, nil)
s.NoError(err)
frontendRootCAForceTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata)
frontendRootCAForceTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors)
s.NotNil(frontendServerTLSFactory)
s.frontendConfigRootCAForceTLSFactory = f(frontendRootCAForceTLSFactory)

provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSRemoteCluster.TLS, metrics.NoopClient, s.logger, nil)
s.NoError(err)
remoteClusterMutualTLSRPCFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata)
remoteClusterMutualTLSRPCFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors)
s.NotNil(remoteClusterMutualTLSRPCFactory)
s.remoteClusterMutualTLSRPCFactory = r(remoteClusterMutualTLSRPCFactory)
}
Expand Down Expand Up @@ -415,22 +418,22 @@ func (s *localStoreRPCSuite) setupInternode() {

provider, err := encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLS.TLS, metrics.NoopClient, s.logger, nil)
s.NoError(err)
internodeMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata)
internodeMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors)
s.NotNil(internodeMutualTLSFactory)

provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, metrics.NoopClient, s.logger, nil)
s.NoError(err)
internodeServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata)
internodeServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors)
s.NotNil(internodeServerTLSFactory)

provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreAltMutualTLS.TLS, metrics.NoopClient, s.logger, nil)
s.NoError(err)
internodeMutualAltTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata)
internodeMutualAltTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors)
s.NotNil(internodeMutualAltTLSFactory)

provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, metrics.NoopClient, s.logger, nil)
s.NoError(err)
internodeMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata)
internodeMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata, noExtraInterceptors)
s.NotNil(internodeMutualTLSRefreshFactory)

s.internodeMutualTLSRPCFactory = i(internodeMutualTLSFactory)
Expand Down
Loading

0 comments on commit f86b8d2

Please sign in to comment.