Skip to content

Commit

Permalink
Simplify system retry logic: Part 1 (#3172)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Aug 4, 2022
1 parent 3182dc2 commit 0b4bf47
Show file tree
Hide file tree
Showing 37 changed files with 1,726 additions and 879 deletions.
21 changes: 20 additions & 1 deletion common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
package client

import (
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/common"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
Expand All @@ -33,6 +35,10 @@ import (
"go.temporal.io/server/common/quotas"
)

var (
retryPolicy = common.CreatePersistenceClientRetryPolicy()
)

type (
// Factory defines the interface for any implementation that can vend
// persistence layer objects backed by a datastore. The actual datastore
Expand Down Expand Up @@ -123,6 +129,7 @@ func (f *factoryImpl) NewShardManager() (p.ShardManager, error) {
if f.metricsClient != nil {
result = p.NewShardPersistenceMetricsClient(result, f.metricsClient, f.logger)
}
result = p.NewShardPersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError)
return result, nil
}

Expand All @@ -140,6 +147,7 @@ func (f *factoryImpl) NewMetadataManager() (p.MetadataManager, error) {
if f.metricsClient != nil {
result = p.NewMetadataPersistenceMetricsClient(result, f.metricsClient, f.logger)
}
result = p.NewMetadataPersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError)
return result, nil
}

Expand All @@ -157,6 +165,7 @@ func (f *factoryImpl) NewClusterMetadataManager() (p.ClusterMetadataManager, err
if f.metricsClient != nil {
result = p.NewClusterMetadataPersistenceMetricsClient(result, f.metricsClient, f.logger)
}
result = p.NewClusterMetadataPersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError)
return result, nil
}

Expand All @@ -174,6 +183,7 @@ func (f *factoryImpl) NewExecutionManager() (p.ExecutionManager, error) {
if f.metricsClient != nil {
result = p.NewExecutionPersistenceMetricsClient(result, f.metricsClient, f.logger)
}
result = p.NewExecutionPersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError)
return result, nil
}

Expand All @@ -189,11 +199,20 @@ func (f *factoryImpl) NewNamespaceReplicationQueue() (p.NamespaceReplicationQueu
if f.metricsClient != nil {
result = p.NewQueuePersistenceMetricsClient(result, f.metricsClient, f.logger)
}

result = p.NewQueuePersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError)
return p.NewNamespaceReplicationQueue(result, f.serializer, f.clusterName, f.metricsClient, f.logger)
}

// Close closes this factory
func (f *factoryImpl) Close() {
f.dataStoreFactory.Close()
}

func IsPersistenceTransientError(err error) bool {
switch err.(type) {
case *serviceerror.Unavailable:
return true
}

return false
}
10 changes: 10 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,16 @@ func (e *TransactionSizeLimitError) Error() string {
return e.Msg
}

func IsConflictErr(err error) bool {
switch err.(type) {
case *CurrentWorkflowConditionFailedError,
*WorkflowConditionFailedError,
*ConditionFailedError:
return true
}
return false
}

// UnixMilliseconds returns t as a Unix time, the number of milliseconds elapsed since January 1, 1970 UTC.
// It should be used for all CQL timestamp.
func UnixMilliseconds(t time.Time) int64 {
Expand Down
Loading

0 comments on commit 0b4bf47

Please sign in to comment.