Skip to content

Commit

Permalink
Use metrics.Client for sdk.metrics.Handler (#2302)
Browse files Browse the repository at this point in the history
This change allows to completely switch temporal/server to using otel via config. A backup tally instance for sdk is not required any more and relevant config marked as deprecated.
  • Loading branch information
Ardagan authored Dec 15, 2021
1 parent 8750b1c commit 8e9a8d4
Show file tree
Hide file tree
Showing 21 changed files with 194 additions and 141 deletions.
2 changes: 1 addition & 1 deletion common/metrics/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type (
Statsd *StatsdConfig `yaml:"statsd"`
// Prometheus is the configuration for prometheus reporter
Prometheus *PrometheusConfig `yaml:"prometheus"`
// {optional} Config for Prometheus metrics reporter for SDK reported metrics.
// Deprecated {optional} Config for Prometheus metrics reporter for SDK reported metrics.
PrometheusSDK *PrometheusConfig `yaml:"prometheusSDK"`
}

Expand Down
8 changes: 6 additions & 2 deletions common/metrics/opentelemetry_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type (
serviceIdx ServiceIdx
scopeWrapper func(impl internalScope) internalScope
gaugeCache OtelGaugeCache
userScope *opentelemetryUserScope
}
)

Expand All @@ -52,6 +53,8 @@ func newOpentelemeteryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx,
return NewTagFilteringScope(tagsFilterConfig, impl)
}

userScope := newOpentelemetryUserScope(reporter, clientConfig.Tags, gaugeCache)

globalRootScope := newOpentelemetryScope(serviceIdx, reporter, nil, clientConfig.Tags, getMetricDefs(serviceIdx), false, gaugeCache, false)

totalScopes := len(ScopeDefs[Common]) + len(ScopeDefs[serviceIdx])
Expand All @@ -62,6 +65,7 @@ func newOpentelemeteryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx,
serviceIdx: serviceIdx,
scopeWrapper: scopeWrapper,
gaugeCache: gaugeCache,
userScope: userScope,
}

for idx, def := range ScopeDefs[Common] {
Expand Down Expand Up @@ -126,7 +130,7 @@ func (m *opentelemetryClient) Scope(scopeIdx int, tags ...Tag) Scope {
}

// UserScope returns a new metrics scope that can be used to add additional
// information to the metrics emitted by user code
// information to the metrics emitted by user code.
func (m *opentelemetryClient) UserScope() UserScope {
return m.rootScope.userScope()
return m.userScope
}
32 changes: 14 additions & 18 deletions common/metrics/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"sync/atomic"
"time"

"github.com/uber-go/tally/v4"

"go.temporal.io/server/build"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
Expand All @@ -46,8 +44,8 @@ const (

// RuntimeMetricsReporter A struct containing the state of the RuntimeMetricsReporter.
type RuntimeMetricsReporter struct {
scope tally.Scope
buildInfoScope tally.Scope
scope UserScope
buildInfoScope UserScope
reportInterval time.Duration
started int32
quit chan struct{}
Expand All @@ -58,7 +56,7 @@ type RuntimeMetricsReporter struct {

// NewRuntimeMetricsReporter Creates a new RuntimeMetricsReporter.
func NewRuntimeMetricsReporter(
scope tally.Scope,
scope UserScope,
reportInterval time.Duration,
logger log.Logger,
instanceID string,
Expand Down Expand Up @@ -93,34 +91,32 @@ func (r *RuntimeMetricsReporter) report() {
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)

r.scope.Gauge(NumGoRoutinesGauge).Update(float64(runtime.NumGoroutine()))
r.scope.Gauge(GoMaxProcsGauge).Update(float64(runtime.GOMAXPROCS(0)))
r.scope.Gauge(MemoryAllocatedGauge).Update(float64(memStats.Alloc))
r.scope.Gauge(MemoryHeapGauge).Update(float64(memStats.HeapAlloc))
r.scope.Gauge(MemoryHeapIdleGauge).Update(float64(memStats.HeapIdle))
r.scope.Gauge(MemoryHeapInuseGauge).Update(float64(memStats.HeapInuse))
r.scope.Gauge(MemoryStackGauge).Update(float64(memStats.StackInuse))
r.scope.UpdateGauge(NumGoRoutinesGauge, float64(runtime.NumGoroutine()))
r.scope.UpdateGauge(GoMaxProcsGauge, float64(runtime.GOMAXPROCS(0)))
r.scope.UpdateGauge(MemoryAllocatedGauge, float64(memStats.Alloc))
r.scope.UpdateGauge(MemoryHeapGauge, float64(memStats.HeapAlloc))
r.scope.UpdateGauge(MemoryHeapIdleGauge, float64(memStats.HeapIdle))
r.scope.UpdateGauge(MemoryHeapInuseGauge, float64(memStats.HeapInuse))
r.scope.UpdateGauge(MemoryStackGauge, float64(memStats.StackInuse))

// memStats.NumGC is a perpetually incrementing counter (unless it wraps at 2^32)
num := memStats.NumGC
lastNum := atomic.SwapUint32(&r.lastNumGC, num) // reset for the next iteration
if delta := num - lastNum; delta > 0 {
r.scope.Counter(NumGCCounter).Inc(int64(delta))
r.scope.AddCounter(NumGCCounter, int64(delta))
if delta > 255 {
// too many GCs happened, the timestamps buffer got wrapped around. Report only the last 256
lastNum = num - 256
}
for i := lastNum; i != num; i++ {
pause := memStats.PauseNs[i%256]
r.scope.Timer(GcPauseMsTimer).Record(time.Duration(pause))
r.scope.RecordTimer(GcPauseMsTimer, time.Duration(pause))
}
}

// report build info
buildInfoGauge := r.buildInfoScope.Gauge(buildInfoMetricName)
buildAgeGauge := r.buildInfoScope.Gauge(buildAgeMetricName)
buildInfoGauge.Update(1.0)
buildAgeGauge.Update(float64(time.Since(r.buildTime)))
r.buildInfoScope.UpdateGauge(buildInfoMetricName, 1.0)
r.buildInfoScope.UpdateGauge(buildAgeMetricName, float64(time.Since(r.buildTime)))
}

// Start Starts the reporter thread that periodically emits metrics.
Expand Down
28 changes: 14 additions & 14 deletions common/metrics/tally_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ import (

// TallyClient is used for reporting metrics by various Temporal services
type TallyClient struct {
// parentReporter is the parent scope for the metrics
parentScope tally.Scope
childScopes map[int]tally.Scope
metricDefs map[int]metricDefinition
serviceIdx ServiceIdx
scopeWrapper func(impl internalScope) internalScope
perUnitBuckets map[MetricUnit]tally.Buckets
// This is the scope provided by user to the client. It contains no client-specific tags.
globalRootScope tally.Scope
childScopes map[int]tally.Scope
metricDefs map[int]metricDefinition
serviceIdx ServiceIdx
scopeWrapper func(impl internalScope) internalScope
perUnitBuckets map[MetricUnit]tally.Buckets
}

// NewClient creates and returns a new instance of
Expand All @@ -59,12 +59,12 @@ func NewClient(clientConfig *ClientConfig, scope tally.Scope, serviceIdx Service

totalScopes := len(ScopeDefs[Common]) + len(ScopeDefs[serviceIdx])
metricsClient := &TallyClient{
parentScope: scope,
childScopes: make(map[int]tally.Scope, totalScopes),
metricDefs: getMetricDefs(serviceIdx),
serviceIdx: serviceIdx,
scopeWrapper: scopeWrapper,
perUnitBuckets: perUnitBuckets,
globalRootScope: scope,
childScopes: make(map[int]tally.Scope, totalScopes),
metricDefs: getMetricDefs(serviceIdx),
serviceIdx: serviceIdx,
scopeWrapper: scopeWrapper,
perUnitBuckets: perUnitBuckets,
}

for idx, def := range ScopeDefs[Common] {
Expand Down Expand Up @@ -156,5 +156,5 @@ func (m *TallyClient) Scope(scopeIdx int, tags ...Tag) Scope {
// UserScope returns a new metrics scope that can be used to add additional
// information to the metrics emitted by user code
func (m *TallyClient) UserScope() UserScope {
return newTallyUserScope(m.parentScope)
return newTallyUserScope(m.globalRootScope)
}
3 changes: 0 additions & 3 deletions common/resource/bootstrapParams.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
package resource

import (
"github.com/uber-go/tally/v4"

"go.temporal.io/server/client"
"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver"
Expand All @@ -49,7 +47,6 @@ type (
ThrottledLogger log.Logger
NamespaceLogger log.Logger

MetricsScope tally.Scope
MembershipFactoryInitializer MembershipFactoryInitializerFunc
RPCFactory common.RPCFactory
ClientFactoryProvider client.FactoryProvider
Expand Down
13 changes: 6 additions & 7 deletions common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"os"
"time"

"github.com/uber-go/tally/v4"
"github.com/uber/tchannel-go"
"go.temporal.io/api/workflowservice/v1"
"go.uber.org/fx"
Expand Down Expand Up @@ -78,12 +77,12 @@ var Module = fx.Options(
fx.Provide(SnTaggedLoggerProvider),
fx.Provide(ThrottledLoggerProvider),
fx.Provide(PersistenceConfigProvider),
fx.Provide(MetricsScopeProvider),
fx.Provide(HostNameProvider),
fx.Provide(ServiceNameProvider),
fx.Provide(TimeSourceProvider),
fx.Provide(cluster.NewMetadataFromConfig),
fx.Provide(MetricsClientProvider),
fx.Provide(MetricsUserScopeProvider),
fx.Provide(SearchAttributeProviderProvider),
fx.Provide(SearchAttributeManagerProvider),
fx.Provide(NamespaceRegistryProvider),
Expand Down Expand Up @@ -141,10 +140,6 @@ func PersistenceConfigProvider(params *BootstrapParams) *config.Persistence {
return &params.PersistenceConfig
}

func MetricsScopeProvider(params *BootstrapParams) tally.Scope {
return params.MetricsScope
}

func ServiceNameProvider(params *BootstrapParams) ServiceName {
return ServiceName(params.Name)
}
Expand Down Expand Up @@ -276,8 +271,12 @@ func InstanceIDProvider(params *BootstrapParams) InstanceID {
return InstanceID(params.InstanceID)
}

func MetricsUserScopeProvider(serverMetricsClient metrics.Client) metrics.UserScope {
return serverMetricsClient.UserScope()
}

func RuntimeMetricsReporterProvider(
metricsScope tally.Scope,
metricsScope metrics.UserScope,
logger SnTaggedLogger,
instanceID InstanceID,
) *metrics.RuntimeMetricsReporter {
Expand Down
14 changes: 6 additions & 8 deletions common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"sync/atomic"
"time"

"github.com/uber-go/tally/v4"
"github.com/uber/tchannel-go"
"go.temporal.io/api/workflowservice/v1"

Expand Down Expand Up @@ -83,7 +82,6 @@ type (
serviceName string
hostName string
hostInfo *membership.HostInfo
metricsScope tally.Scope
clusterMetadata cluster.Metadata
saProvider searchattribute.Provider
saManager searchattribute.Manager
Expand Down Expand Up @@ -312,10 +310,10 @@ func New(

// static infos

numShards: numShards,
serviceName: params.Name,
hostName: hostName,
metricsScope: params.MetricsScope,
numShards: numShards,
serviceName: params.Name,
hostName: hostName,

clusterMetadata: clusterMetadata,
saProvider: saProvider,
saManager: saManager,
Expand Down Expand Up @@ -368,7 +366,7 @@ func New(

// internal vars
runtimeMetricsReporter: metrics.NewRuntimeMetricsReporter(
params.MetricsScope,
params.MetricsClient.UserScope(),
time.Minute,
logger,
params.InstanceID,
Expand All @@ -389,7 +387,7 @@ func (h *Impl) Start() {
return
}

h.metricsScope.Counter(metrics.RestartCount).Inc(1)
h.metricsClient.UserScope().AddCounter(metrics.RestartCount, 1)
h.runtimeMetricsReporter.Start()

h.clusterMetadata.Start()
Expand Down
18 changes: 8 additions & 10 deletions common/sdk/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ import (
"crypto/tls"
"fmt"

"github.com/uber-go/tally/v4"
sdkclient "go.temporal.io/sdk/client"
sdktally "go.temporal.io/sdk/contrib/tally"

"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
Expand All @@ -45,27 +43,27 @@ type (
}

clientFactory struct {
hostPort string
tlsConfig *tls.Config
scope tally.Scope
hostPort string
tlsConfig *tls.Config
metricsHandler *MetricsHandler
}
)

var _ ClientFactory = (*clientFactory)(nil)

func NewClientFactory(hostPort string, tlsConfig *tls.Config, scope tally.Scope) *clientFactory {
func NewClientFactory(hostPort string, tlsConfig *tls.Config, metricsHandler *MetricsHandler) *clientFactory {
return &clientFactory{
hostPort: hostPort,
tlsConfig: tlsConfig,
scope: scope,
hostPort: hostPort,
tlsConfig: tlsConfig,
metricsHandler: metricsHandler,
}
}

func (f *clientFactory) NewClient(namespaceName string, logger log.Logger) (sdkclient.Client, error) {
sdkClient, err := sdkclient.NewClient(sdkclient.Options{
HostPort: f.hostPort,
Namespace: namespaceName,
MetricsHandler: sdktally.NewMetricsHandler(f.scope),
MetricsHandler: f.metricsHandler,
Logger: log.NewSdkLogger(logger),
ConnectionOptions: sdkclient.ConnectionOptions{
TLS: f.tlsConfig,
Expand Down
Loading

0 comments on commit 8e9a8d4

Please sign in to comment.