Skip to content

Commit

Permalink
Use interceptor to record sdk versions
Browse files Browse the repository at this point in the history
  • Loading branch information
bergundy committed Jan 12, 2022
1 parent ab38a73 commit 8da885f
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 128 deletions.
3 changes: 1 addition & 2 deletions common/headers/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ var (
type (
// VersionChecker is used to check client/server compatibility and client's capabilities
VersionChecker interface {
GetClientNameAndVersion(ctx context.Context) (string, string)
ClientSupported(ctx context.Context, enableClientVersionCheck bool) error
ClientSupportsFeature(ctx context.Context, feature string) bool
}
Expand All @@ -99,7 +98,7 @@ func NewVersionChecker(supportedClients map[string]string, serverVersion string)
}
}

func (vc *versionChecker) GetClientNameAndVersion(ctx context.Context) (string, string) {
func GetClientNameAndVersion(ctx context.Context) (string, string) {
headers := GetValues(ctx, ClientNameHeaderName, ClientVersionHeaderName)
clientName := headers[0]
clientVersion := headers[1]
Expand Down
97 changes: 97 additions & 0 deletions common/rpc/interceptor/sdk_version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// The MIT License
//
// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package interceptor

import (
"context"
"sync"
"sync/atomic"

"go.temporal.io/server/common/headers"
"go.temporal.io/version/check"
"google.golang.org/grpc"
)

type SDKInfoRecorder interface {
RecordSDKInfo(name, version string)
GetAndResetSDKInfo() []check.SDKInfo
}

type SDKVersionInterceptor struct {
// Using a sync map to support effienct concurrent updates.
// Key type is sdkNameVersion and value type is sdkCount.
// Note that we never delete keys from this map as the cardinality of
// client versions should be fairly low.
sdkInfoCounter sync.Map
}

type sdkNameVersion struct {
name string
version string
}

type sdkCount struct {
count int64
}

var _ grpc.UnaryServerInterceptor = (*TelemetryInterceptor)(nil).Intercept

func NewSDKVersionInterceptor() *SDKVersionInterceptor {
return &SDKVersionInterceptor{}
}

func (vi *SDKVersionInterceptor) Intercept(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
sdkName, sdkVersion := headers.GetClientNameAndVersion(ctx)
if sdkName != "" && sdkVersion != "" {
vi.RecordSDKInfo(sdkName, sdkVersion)
}
return handler(ctx, req)
}

func (vi *SDKVersionInterceptor) RecordSDKInfo(name, version string) {
info := sdkNameVersion{name, version}
valIface, ok := vi.sdkInfoCounter.Load(info)
if !ok {
// Store if wasn't added racy
valIface, _ = vi.sdkInfoCounter.LoadOrStore(info, &sdkCount{})
}
atomic.AddInt64(&valIface.(*sdkCount).count, 1)
}

func (vi *SDKVersionInterceptor) GetAndResetSDKInfo() []check.SDKInfo {
sdkInfo := make([]check.SDKInfo, 0)
vi.sdkInfoCounter.Range(func(key, value interface{}) bool {
timesSeen := atomic.SwapInt64(&value.(*sdkCount).count, 0)
nameVersion := key.(sdkNameVersion)
sdkInfo = append(sdkInfo, check.SDKInfo{Name: nameVersion.name, Version: nameVersion.version, TimesSeen: timesSeen})
return true
})
return sdkInfo
}
55 changes: 55 additions & 0 deletions common/rpc/interceptor/sdk_version_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// The MIT License
//
// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package interceptor

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"go.temporal.io/server/common/headers"
)

func TestSDKVersionRecorder(t *testing.T) {
interceptor := NewSDKVersionInterceptor()

sdkVersion := "1.10.1"
ctx := headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures)
interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})

ctx = headers.SetVersionsForTests(context.Background(), "", headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures)
interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})

info := interceptor.GetAndResetSDKInfo()
assert.Equal(t, 1, len(info))
assert.Equal(t, headers.ClientNameGoSDK, info[0].Name)
assert.Equal(t, sdkVersion, info[0].Version)
assert.Equal(t, int64(1), info[0].TimesSeen)
}
5 changes: 0 additions & 5 deletions service/frontend/dcRedirectionHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func (s *dcRedirectionHandlerSuite) SetupTest() {
s.mockResource.GetSearchAttributesProvider(),
s.mockResource.GetClusterMetadata(),
s.mockResource.GetArchivalMetadata(),
s,
)

s.mockFrontendHandler = workflowservicemock.NewMockWorkflowServiceServer(s.controller)
Expand All @@ -148,10 +147,6 @@ func (s *dcRedirectionHandlerSuite) TearDownTest() {
s.controller.Finish()
}

// RecordSDKInfo is a noop in this test suite
func (s *dcRedirectionHandlerSuite) RecordSDKInfo(name, version string) {
}

func (s *dcRedirectionHandlerSuite) TestDescribeTaskQueue() {
apiName := "DescribeTaskQueue"

Expand Down
8 changes: 7 additions & 1 deletion service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var Module = fx.Options(
fx.Provide(NamespaceCountLimitInterceptorProvider),
fx.Provide(NamespaceValidatorInterceptorProvider),
fx.Provide(NamespaceRateLimitInterceptorProvider),
fx.Provide(SDKVersionInterceptorProvider),
fx.Provide(GrpcServerOptionsProvider),
fx.Provide(VisibilityManagerProvider),
fx.Provide(ThrottledLoggerRpsFnProvider),
Expand Down Expand Up @@ -131,6 +132,7 @@ func GrpcServerOptionsProvider(
namespaceValidatorInterceptor *interceptor.NamespaceValidatorInterceptor,
telemetryInterceptor *interceptor.TelemetryInterceptor,
rateLimitInterceptor *interceptor.RateLimitInterceptor,
sdkVersionInterceptor *interceptor.SDKVersionInterceptor,
authorizer authorization.Authorizer,
claimMapper authorization.ClaimMapper,
audienceGetter authorization.JWTAudienceMapper,
Expand Down Expand Up @@ -168,6 +170,7 @@ func GrpcServerOptionsProvider(
logger,
audienceGetter,
),
sdkVersionInterceptor.Intercept,
}
if len(customInterceptors) > 0 {
interceptors = append(interceptors, customInterceptors...)
Expand Down Expand Up @@ -283,6 +286,10 @@ func NamespaceValidatorInterceptorProvider(
)
}

func SDKVersionInterceptorProvider() *interceptor.SDKVersionInterceptor {
return interceptor.NewSDKVersionInterceptor()
}

func PersistenceMaxQpsProvider(
serviceConfig *Config,
) persistenceClient.PersistenceMaxQps {
Expand Down Expand Up @@ -433,7 +440,6 @@ func HandlerProvider(
saProvider,
clusterMetadata,
archivalMetadata,
versionChecker,
)
handler := NewDCRedirectionHandler(wfHandler, params.DCRedirectionPolicy, logger, clientBean, metricsClient, timeSource, namespaceRegistry, clusterMetadata)
return handler
Expand Down
46 changes: 7 additions & 39 deletions service/frontend/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ package frontend
import (
"runtime"
"sync"
"sync/atomic"
"time"

enumsbp "go.temporal.io/api/enums/v1"
Expand All @@ -39,47 +38,33 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/rpc/interceptor"
)

const VersionCheckInterval = 24 * time.Hour

type sdkNameVersion struct {
name string
version string
}

type sdkCount struct {
count int64
}

type SDKInfoRecorder interface {
RecordSDKInfo(name, version string)
}

type VersionChecker struct {
config *Config
shutdownChan chan struct{}
// Using a sync map to support effienct concurrent updates.
// Key type is sdkNameVersion and value type is sdkCount.
// Note that we never delete keys from this map as the cardinality of
// client versions should be fairly low.
sdkInfoCounter sync.Map
config *Config
shutdownChan chan struct{}
metricsScope metrics.Scope
clusterMetadataManager persistence.ClusterMetadataManager
startOnce sync.Once
stopOnce sync.Once
sdkVersionRecorder interceptor.SDKInfoRecorder
}

func NewVersionChecker(
config *Config,
metricsClient metrics.Client,
clusterMetadataManager persistence.ClusterMetadataManager,
sdkVersionRecorder interceptor.SDKInfoRecorder,
) *VersionChecker {
return &VersionChecker{
config: config,
shutdownChan: make(chan struct{}),
metricsScope: metricsClient.Scope(metrics.VersionCheckScope),
clusterMetadataManager: clusterMetadataManager,
sdkVersionRecorder: sdkVersionRecorder,
}
}

Expand All @@ -99,16 +84,6 @@ func (vc *VersionChecker) Stop() {
}
}

func (vc *VersionChecker) RecordSDKInfo(name, version string) {
info := sdkNameVersion{name, version}
valIface, ok := vc.sdkInfoCounter.Load(info)
if !ok {
// Store if wasn't added racy
valIface, _ = vc.sdkInfoCounter.LoadOrStore(info, &sdkCount{})
}
atomic.AddInt64(&valIface.(*sdkCount).count, 1)
}

func (vc *VersionChecker) versionCheckLoop() {
timer := time.NewTicker(VersionCheckInterval)
defer timer.Stop()
Expand Down Expand Up @@ -162,13 +137,6 @@ func isUpdateNeeded(metadata *persistence.GetClusterMetadataResponse) bool {
}

func (vc *VersionChecker) createVersionCheckRequest(metadata *persistence.GetClusterMetadataResponse) (*check.VersionCheckRequest, error) {
sdkInfo := make([]check.SDKInfo, 0)
vc.sdkInfoCounter.Range(func(key, value interface{}) bool {
timesSeen := atomic.SwapInt64(&value.(*sdkCount).count, 0)
nameVersion := key.(sdkNameVersion)
sdkInfo = append(sdkInfo, check.SDKInfo{Name: nameVersion.name, Version: nameVersion.version, TimesSeen: timesSeen})
return true
})

return &check.VersionCheckRequest{
Product: headers.ClientNameServer,
Expand All @@ -178,7 +146,7 @@ func (vc *VersionChecker) createVersionCheckRequest(metadata *persistence.GetClu
DB: vc.clusterMetadataManager.GetName(),
ClusterID: metadata.ClusterId,
Timestamp: time.Now().UnixNano(),
SDKInfo: sdkInfo,
SDKInfo: vc.sdkVersionRecorder.GetAndResetSDKInfo(),
}, nil
}

Expand Down
Loading

0 comments on commit 8da885f

Please sign in to comment.