Skip to content

Commit

Permalink
Check SDK versions via version check call (#2365)
Browse files Browse the repository at this point in the history
  • Loading branch information
bergundy authored Jan 21, 2022
1 parent aff2a82 commit 65e4958
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 11 deletions.
8 changes: 8 additions & 0 deletions common/headers/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ func NewVersionChecker(supportedClients map[string]string, serverVersion string)
}
}

// GetClientNameAndVersion extracts SDK name and version from context headers
func GetClientNameAndVersion(ctx context.Context) (string, string) {
headers := GetValues(ctx, ClientNameHeaderName, ClientVersionHeaderName)
clientName := headers[0]
clientVersion := headers[1]
return clientName, clientVersion
}

// ClientSupported returns an error if client is unsupported, nil otherwise.
func (vc *versionChecker) ClientSupported(ctx context.Context, enableClientVersionCheck bool) error {
if !enableClientVersionCheck {
Expand Down
99 changes: 99 additions & 0 deletions common/rpc/interceptor/sdk_version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// The MIT License
//
// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package interceptor

import (
"context"
"sync"

"go.temporal.io/server/common/headers"
"go.temporal.io/version/check"
"google.golang.org/grpc"
)

type sdkNameVersion struct {
name string
version string
}

type SDKVersionInterceptor struct {
sdkInfoSet map[sdkNameVersion]struct{}
lock sync.RWMutex
maxSetSize int
}

const defaultMaxSetSize = 100

// NewSDKVersionInterceptor creates a new SDKVersionInterceptor with default max set size
func NewSDKVersionInterceptor() *SDKVersionInterceptor {
return &SDKVersionInterceptor{
sdkInfoSet: make(map[sdkNameVersion]struct{}),
maxSetSize: defaultMaxSetSize,
}
}

// Intercept a grpc request
func (vi *SDKVersionInterceptor) Intercept(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
sdkName, sdkVersion := headers.GetClientNameAndVersion(ctx)
if sdkName != "" && sdkVersion != "" {
vi.RecordSDKInfo(sdkName, sdkVersion)
}
return handler(ctx, req)
}

// RecordSDKInfo records name and version tuple in memory
func (vi *SDKVersionInterceptor) RecordSDKInfo(name, version string) {
info := sdkNameVersion{name, version}

vi.lock.RLock()
overCap := len(vi.sdkInfoSet) >= vi.maxSetSize
_, found := vi.sdkInfoSet[info]
vi.lock.RUnlock()

if !overCap && !found {
vi.lock.Lock()
vi.sdkInfoSet[info] = struct{}{}
vi.lock.Unlock()
}
}

// GetAndResetSDKInfo gets all recorded name, version tuples and resets internal records
func (vi *SDKVersionInterceptor) GetAndResetSDKInfo() []check.SDKInfo {
vi.lock.Lock()
currSet := vi.sdkInfoSet
vi.sdkInfoSet = make(map[sdkNameVersion]struct{})
vi.lock.Unlock()

sdkInfo := make([]check.SDKInfo, 0, len(currSet))
for k := range currSet {
sdkInfo = append(sdkInfo, check.SDKInfo{Name: k.name, Version: k.version})
}
return sdkInfo
}
84 changes: 84 additions & 0 deletions common/rpc/interceptor/sdk_version_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// The MIT License
//
// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package interceptor

import (
"context"
"sort"
"testing"

"github.com/stretchr/testify/assert"

"go.temporal.io/server/common/headers"
)

func TestSDKVersionRecorder(t *testing.T) {
interceptor := &SDKVersionInterceptor{
sdkInfoSet: make(map[sdkNameVersion]struct{}),
maxSetSize: 2,
}

sdkVersion := "1.10.1"

// Record first tuple
ctx := headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures)
interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})

// Record second tuple
ctx = headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameTypeScriptSDK, headers.SupportedServerVersions, headers.AllFeatures)
interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})

// Do not record when over capacity
ctx = headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameJavaSDK, headers.SupportedServerVersions, headers.AllFeatures)
interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})

// Empty SDK version should not be recorded
ctx = headers.SetVersionsForTests(context.Background(), "", headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures)
interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})

// Empty SDK name should not be recorded
ctx = headers.SetVersionsForTests(context.Background(), sdkVersion, "", headers.SupportedServerVersions, headers.AllFeatures)
interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})

info := interceptor.GetAndResetSDKInfo()
sort.SliceStable(info, func(i, j int) bool {
return info[i].Name < info[j].Name
})
assert.Equal(t, 2, len(info))
assert.Equal(t, headers.ClientNameGoSDK, info[0].Name)
assert.Equal(t, sdkVersion, info[0].Version)
assert.Equal(t, headers.ClientNameTypeScriptSDK, info[1].Name)
assert.Equal(t, sdkVersion, info[1].Version)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v0.25.0
go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a
go.temporal.io/sdk v1.12.0
go.temporal.io/version v0.0.0-20201015012359-4d3bb966d193
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.9.0
go.uber.org/fx v1.14.2
go.uber.org/multierr v1.7.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,8 @@ go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a h1:FYS1fRCzAbcek09VA3wJb
go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a/go.mod h1:EMC/8OQVvVUeTSfVQ/OpAaIHmVadlzaKwHVjUmgz3tk=
go.temporal.io/sdk v1.12.0 h1:QkqOpmgXVnHHCFP9HbSbyrF3jYgLBKY/3NdZyR7e5nQ=
go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o=
go.temporal.io/version v0.0.0-20201015012359-4d3bb966d193 h1:jhIqHkAE74DnEXipymFTzmTxyboMYmv6iVkkCFC1pas=
go.temporal.io/version v0.0.0-20201015012359-4d3bb966d193/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
8 changes: 8 additions & 0 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var Module = fx.Options(
fx.Provide(NamespaceCountLimitInterceptorProvider),
fx.Provide(NamespaceValidatorInterceptorProvider),
fx.Provide(NamespaceRateLimitInterceptorProvider),
fx.Provide(SDKVersionInterceptorProvider),
fx.Provide(GrpcServerOptionsProvider),
fx.Provide(VisibilityManagerProvider),
fx.Provide(ThrottledLoggerRpsFnProvider),
Expand Down Expand Up @@ -131,6 +132,7 @@ func GrpcServerOptionsProvider(
namespaceValidatorInterceptor *interceptor.NamespaceValidatorInterceptor,
telemetryInterceptor *interceptor.TelemetryInterceptor,
rateLimitInterceptor *interceptor.RateLimitInterceptor,
sdkVersionInterceptor *interceptor.SDKVersionInterceptor,
authorizer authorization.Authorizer,
claimMapper authorization.ClaimMapper,
audienceGetter authorization.JWTAudienceMapper,
Expand Down Expand Up @@ -168,6 +170,7 @@ func GrpcServerOptionsProvider(
logger,
audienceGetter,
),
sdkVersionInterceptor.Intercept,
}
if len(customInterceptors) > 0 {
interceptors = append(interceptors, customInterceptors...)
Expand Down Expand Up @@ -283,6 +286,10 @@ func NamespaceValidatorInterceptorProvider(
)
}

func SDKVersionInterceptorProvider() *interceptor.SDKVersionInterceptor {
return interceptor.NewSDKVersionInterceptor()
}

func PersistenceMaxQpsProvider(
serviceConfig *Config,
) persistenceClient.PersistenceMaxQps {
Expand Down Expand Up @@ -394,6 +401,7 @@ func AdminHandlerProvider(
func HandlerProvider(
params *resource.BootstrapParams,
serviceConfig *Config,
versionChecker *VersionChecker,
namespaceReplicationQueue FEReplicatorNamespaceReplicationQueue,
visibilityMgr manager.VisibilityManager,
logger resource.SnTaggedLogger,
Expand Down
31 changes: 23 additions & 8 deletions service/frontend/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/rpc/interceptor"
)

const VersionCheckInterval = 24 * time.Hour
Expand All @@ -49,18 +50,21 @@ type VersionChecker struct {
clusterMetadataManager persistence.ClusterMetadataManager
startOnce sync.Once
stopOnce sync.Once
sdkVersionRecorder *interceptor.SDKVersionInterceptor
}

func NewVersionChecker(
config *Config,
metricsClient metrics.Client,
clusterMetadataManager persistence.ClusterMetadataManager,
sdkVersionRecorder *interceptor.SDKVersionInterceptor,
) *VersionChecker {
return &VersionChecker{
config: config,
shutdownChan: make(chan struct{}),
metricsScope: metricsClient.Scope(metrics.VersionCheckScope),
clusterMetadataManager: clusterMetadataManager,
sdkVersionRecorder: sdkVersionRecorder,
}
}

Expand Down Expand Up @@ -140,6 +144,7 @@ func (vc *VersionChecker) createVersionCheckRequest(metadata *persistence.GetClu
DB: vc.clusterMetadataManager.GetName(),
ClusterID: metadata.ClusterId,
Timestamp: time.Now().UnixNano(),
SDKInfo: vc.sdkVersionRecorder.GetAndResetSDKInfo(),
}, nil
}

Expand All @@ -152,7 +157,12 @@ func (vc *VersionChecker) saveVersionInfo(resp *check.VersionCheckResponse) erro
if err != nil {
return err
}
metadata.VersionInfo = toVersionInfo(resp)
// TODO(bergundy): Extract and save version info per SDK
versionInfo, err := toVersionInfo(resp)
if err != nil {
return err
}
metadata.VersionInfo = versionInfo
saved, err := vc.clusterMetadataManager.SaveClusterMetadata(&persistence.SaveClusterMetadataRequest{
ClusterMetadata: metadata.ClusterMetadata, Version: metadata.Version})
if err != nil {
Expand All @@ -164,14 +174,19 @@ func (vc *VersionChecker) saveVersionInfo(resp *check.VersionCheckResponse) erro
return nil
}

func toVersionInfo(resp *check.VersionCheckResponse) *versionpb.VersionInfo {
return &versionpb.VersionInfo{
Current: convertReleaseInfo(resp.Current),
Recommended: convertReleaseInfo(resp.Recommended),
Instructions: resp.Instructions,
Alerts: convertAlerts(resp.Alerts),
LastUpdateTime: timestamp.TimePtr(time.Now().UTC()),
func toVersionInfo(resp *check.VersionCheckResponse) (*versionpb.VersionInfo, error) {
for _, product := range resp.Products {
if product.Product == headers.ClientNameServer {
return &versionpb.VersionInfo{
Current: convertReleaseInfo(product.Current),
Recommended: convertReleaseInfo(product.Recommended),
Instructions: product.Instructions,
Alerts: convertAlerts(product.Alerts),
LastUpdateTime: timestamp.TimePtr(time.Now().UTC()),
}, nil
}
}
return nil, serviceerror.NewNotFound("version info update was not found in response")
}

func convertAlerts(alerts []check.Alert) []*versionpb.Alert {
Expand Down

0 comments on commit 65e4958

Please sign in to comment.