Skip to content

Commit

Permalink
Check SDK versions via version check call
Browse files Browse the repository at this point in the history
  • Loading branch information
bergundy committed Jan 12, 2022
1 parent 86605c4 commit 9c761a0
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 42 deletions.
8 changes: 8 additions & 0 deletions common/headers/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var (
type (
// VersionChecker is used to check client/server compatibility and client's capabilities
VersionChecker interface {
GetClientNameAndVersion(ctx context.Context) (string, string)
ClientSupported(ctx context.Context, enableClientVersionCheck bool) error
ClientSupportsFeature(ctx context.Context, feature string) bool
}
Expand All @@ -98,6 +99,13 @@ func NewVersionChecker(supportedClients map[string]string, serverVersion string)
}
}

func (vc *versionChecker) GetClientNameAndVersion(ctx context.Context) (string, string) {
headers := GetValues(ctx, ClientNameHeaderName, ClientVersionHeaderName)
clientName := headers[0]
clientVersion := headers[1]
return clientName, clientVersion
}

// ClientSupported returns an error if client is unsupported, nil otherwise.
func (vc *versionChecker) ClientSupported(ctx context.Context, enableClientVersionCheck bool) error {
if !enableClientVersionCheck {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v0.24.0
go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a
go.temporal.io/sdk v1.12.0
go.temporal.io/version v0.0.0-20201015012359-4d3bb966d193
go.temporal.io/version v0.2.1
go.uber.org/atomic v1.9.0
go.uber.org/fx v1.14.2
go.uber.org/multierr v1.7.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,8 @@ go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a h1:FYS1fRCzAbcek09VA3wJb
go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a/go.mod h1:EMC/8OQVvVUeTSfVQ/OpAaIHmVadlzaKwHVjUmgz3tk=
go.temporal.io/sdk v1.12.0 h1:QkqOpmgXVnHHCFP9HbSbyrF3jYgLBKY/3NdZyR7e5nQ=
go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o=
go.temporal.io/version v0.0.0-20201015012359-4d3bb966d193 h1:jhIqHkAE74DnEXipymFTzmTxyboMYmv6iVkkCFC1pas=
go.temporal.io/version v0.0.0-20201015012359-4d3bb966d193/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78=
go.temporal.io/version v0.2.1 h1:wP7ha/DvyAtwIZY0xL5TA6BcsR1svK/WsqzFmmwAPhE=
go.temporal.io/version v0.2.1/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
5 changes: 5 additions & 0 deletions service/frontend/dcRedirectionHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (s *dcRedirectionHandlerSuite) SetupTest() {
s.mockResource.GetSearchAttributesProvider(),
s.mockResource.GetClusterMetadata(),
s.mockResource.GetArchivalMetadata(),
s,
)

s.mockFrontendHandler = workflowservicemock.NewMockWorkflowServiceServer(s.controller)
Expand All @@ -147,6 +148,10 @@ func (s *dcRedirectionHandlerSuite) TearDownTest() {
s.controller.Finish()
}

// RecordSDKInfo is a noop in this test suite
func (s *dcRedirectionHandlerSuite) RecordSDKInfo(name, version string) {
}

func (s *dcRedirectionHandlerSuite) TestDescribeTaskQueue() {
apiName := "DescribeTaskQueue"

Expand Down
2 changes: 2 additions & 0 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ func AdminHandlerProvider(
func HandlerProvider(
params *resource.BootstrapParams,
serviceConfig *Config,
versionChecker *VersionChecker,
namespaceReplicationQueue FEReplicatorNamespaceReplicationQueue,
visibilityMgr manager.VisibilityManager,
logger resource.SnTaggedLogger,
Expand Down Expand Up @@ -429,6 +430,7 @@ func HandlerProvider(
saProvider,
clusterMetadata,
archivalMetadata,
versionChecker,
)
handler := NewDCRedirectionHandler(wfHandler, params.DCRedirectionPolicy, logger, clientBean, metricsClient, timeSource, namespaceRegistry, clusterMetadata)
return handler
Expand Down
43 changes: 41 additions & 2 deletions service/frontend/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package frontend
import (
"runtime"
"sync"
"sync/atomic"
"time"

enumsbp "go.temporal.io/api/enums/v1"
Expand All @@ -42,9 +43,27 @@ import (

const VersionCheckInterval = 24 * time.Hour

type sdkNameVersion struct {
name string
version string
}

type sdkCount struct {
count int64
}

type SDKInfoRecorder interface {
RecordSDKInfo(name, version string)
}

type VersionChecker struct {
config *Config
shutdownChan chan struct{}
config *Config
shutdownChan chan struct{}
// Using a sync map to support effienct concurrent updates.
// Key type is sdkNameVersion and value type is sdkCount.
// Note that we never delete keys from this map as the cardinality of
// client versions should be fairly low.
sdkInfoCounter sync.Map
metricsScope metrics.Scope
clusterMetadataManager persistence.ClusterMetadataManager
startOnce sync.Once
Expand Down Expand Up @@ -80,9 +99,20 @@ func (vc *VersionChecker) Stop() {
}
}

func (vc *VersionChecker) RecordSDKInfo(name, version string) {
info := sdkNameVersion{name, version}
valIface, ok := vc.sdkInfoCounter.Load(info)
if !ok {
// Store if wasn't added racy
valIface, _ = vc.sdkInfoCounter.LoadOrStore(info, &sdkCount{})
}
atomic.AddInt64(&valIface.(*sdkCount).count, 1)
}

func (vc *VersionChecker) versionCheckLoop() {
timer := time.NewTicker(VersionCheckInterval)
defer timer.Stop()

vc.performVersionCheck()
for {
select {
Expand Down Expand Up @@ -132,6 +162,14 @@ func isUpdateNeeded(metadata *persistence.GetClusterMetadataResponse) bool {
}

func (vc *VersionChecker) createVersionCheckRequest(metadata *persistence.GetClusterMetadataResponse) (*check.VersionCheckRequest, error) {
sdkInfo := make([]check.SDKInfo, 0)
vc.sdkInfoCounter.Range(func(key, value interface{}) bool {
timesSeen := atomic.SwapInt64(&value.(*sdkCount).count, 0)
nameVersion := key.(sdkNameVersion)
sdkInfo = append(sdkInfo, check.SDKInfo{Name: nameVersion.name, Version: nameVersion.version, TimesSeen: timesSeen})
return true
})

return &check.VersionCheckRequest{
Product: headers.ClientNameServer,
Version: headers.ServerVersion,
Expand All @@ -140,6 +178,7 @@ func (vc *VersionChecker) createVersionCheckRequest(metadata *persistence.GetClu
DB: vc.clusterMetadataManager.GetName(),
ClusterID: metadata.ClusterId,
Timestamp: time.Now().UnixNano(),
SDKInfo: sdkInfo,
}, nil
}

Expand Down
Loading

0 comments on commit 9c761a0

Please sign in to comment.