diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 8878bff490c..2f05560866a 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1171,6 +1171,8 @@ const ( ParentClosePolicyProcessorScope // AddSearchAttributesWorkflowScope is scope used by all metrics emitted by worker.AddSearchAttributesWorkflowScope module AddSearchAttributesWorkflowScope + // MigrationWorkflowScope is scope used by metrics emitted by migration related workflows + MigrationWorkflowScope NumWorkerScopes ) @@ -1707,6 +1709,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ BatcherScope: {operation: "batcher"}, ParentClosePolicyProcessorScope: {operation: "ParentClosePolicyProcessor"}, AddSearchAttributesWorkflowScope: {operation: "AddSearchAttributesWorkflow"}, + MigrationWorkflowScope: {operation: "MigrationWorkflow"}, }, Server: { ServerTlsScope: {operation: "ServerTls"}, @@ -2161,6 +2164,8 @@ const ( ScavengerValidationRequestsCount ScavengerValidationFailuresCount AddSearchAttributesFailuresCount + CatchUpReadyShardCountGauge + HandoverReadyShardCountGauge NumWorkerMetrics ) @@ -2607,6 +2612,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ ScavengerValidationRequestsCount: NewCounterDef("scavenger_validation_requests"), ScavengerValidationFailuresCount: NewCounterDef("scavenger_validation_failures"), AddSearchAttributesFailuresCount: NewCounterDef("add_search_attributes_failures"), + CatchUpReadyShardCountGauge: NewGaugeDef("catchup_ready_shard_count"), + HandoverReadyShardCountGauge: NewGaugeDef("handover_ready_shard_count"), }, Server: { TlsCertsExpired: NewGaugeDef("certificates_expired"), diff --git a/service/worker/fx.go b/service/worker/fx.go index 7f435bbb821..33d7ca03b5d 100644 --- a/service/worker/fx.go +++ b/service/worker/fx.go @@ -35,11 +35,11 @@ import ( "go.temporal.io/server/common/resource" "go.temporal.io/server/service" "go.temporal.io/server/service/worker/addsearchattributes" - "go.temporal.io/server/service/worker/scanner/replication" + "go.temporal.io/server/service/worker/migration" ) var Module = fx.Options( - replication.Module, + migration.Module, addsearchattributes.Module, resource.Module, fx.Provide(ParamsExpandProvider), diff --git a/service/worker/scanner/replication/fx.go b/service/worker/migration/fx.go similarity index 95% rename from service/worker/scanner/replication/fx.go rename to service/worker/migration/fx.go index a754782e5c3..c37f659c01e 100644 --- a/service/worker/scanner/replication/fx.go +++ b/service/worker/migration/fx.go @@ -22,7 +22,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package replication +package migration import ( "go.temporal.io/api/workflowservice/v1" @@ -31,6 +31,7 @@ import ( "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/common/config" "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" workercommon "go.temporal.io/server/service/worker/common" @@ -46,6 +47,7 @@ type ( HistoryClient historyservice.HistoryServiceClient FrontendClient workflowservice.WorkflowServiceClient Logger log.Logger + MetricsClient metrics.Client } fxResult struct { @@ -90,5 +92,6 @@ func (wc *replicationWorkerComponent) activities() *activities { historyClient: wc.HistoryClient, frontendClient: wc.FrontendClient, logger: wc.Logger, + metricsClient: wc.MetricsClient, } } diff --git a/service/worker/scanner/replication/replication.go b/service/worker/migration/replication.go similarity index 84% rename from service/worker/scanner/replication/replication.go rename to service/worker/migration/replication.go index 31ea4b852ba..0119507ddce 100644 --- a/service/worker/scanner/replication/replication.go +++ b/service/worker/migration/replication.go @@ -22,7 +22,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package replication +package migration import ( "context" @@ -44,6 +44,7 @@ import ( "go.temporal.io/server/common/definition" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" ) @@ -82,6 +83,7 @@ type ( historyClient historyservice.HistoryServiceClient frontendClient workflowservice.WorkflowServiceClient logger log.Logger + metricsClient metrics.Client } genReplicationForShardRange struct { @@ -408,18 +410,27 @@ func (a *activities) checkReplicationOnce(ctx context.Context, waitRequest waitR } // check that every shard has caught up + readyShardCount := 0 + logged := false for _, shard := range resp.Shards { - clusterInfo, ok := shard.RemoteClusters[waitRequest.RemoteCluster] - if !ok { - return false, fmt.Errorf("GetReplicationStatus response for shard %d does not contains remote cluster %s", shard.ShardId, waitRequest.RemoteCluster) - } - if clusterInfo.AckedTaskId == shard.MaxReplicationTaskId { - continue // already caught up, continue to check next shard. + clusterInfo, hasClusterInfo := shard.RemoteClusters[waitRequest.RemoteCluster] + if hasClusterInfo { + if clusterInfo.AckedTaskId == shard.MaxReplicationTaskId || + (clusterInfo.AckedTaskId >= waitRequest.WaitForTaskIds[shard.ShardId] && + shard.ShardLocalTime.Sub(*clusterInfo.AckedTaskVisibilityTime) <= waitRequest.AllowedLagging) { + readyShardCount++ + continue + } } - - if clusterInfo.AckedTaskId < waitRequest.WaitForTaskIds[shard.ShardId] || - shard.ShardLocalTime.Sub(*clusterInfo.AckedTaskVisibilityTime) > waitRequest.AllowedLagging { - a.logger.Info("Wait for remote ack", + // shard is not ready, log first non-ready shard + if !logged { + logged = true + if !hasClusterInfo { + a.logger.Info("Wait catchup missing remote cluster info", tag.ShardID(shard.ShardId), tag.ClusterName(waitRequest.RemoteCluster)) + // this is not expected, so fail activity to surface the error, but retryPolicy will keep retrying. + return false, fmt.Errorf("GetReplicationStatus response for shard %d does not contains remote cluster %s", shard.ShardId, waitRequest.RemoteCluster) + } + a.logger.Info("Wait catchup not ready", tag.NewInt32("ShardId", shard.ShardId), tag.NewInt64("AckedTaskId", clusterInfo.AckedTaskId), tag.NewInt64("WaitForTaskId", waitRequest.WaitForTaskIds[shard.ShardId]), @@ -427,11 +438,16 @@ func (a *activities) checkReplicationOnce(ctx context.Context, waitRequest waitR tag.NewDurationTag("ActualLagging", shard.ShardLocalTime.Sub(*clusterInfo.AckedTaskVisibilityTime)), tag.NewStringTag("RemoteCluster", waitRequest.RemoteCluster), ) - return false, nil } } - return true, nil + // emit metrics about how many shards are ready + a.metricsClient.Scope( + metrics.MigrationWorkflowScope, + metrics.TargetClusterTag(waitRequest.RemoteCluster), + ).UpdateGauge(metrics.CatchUpReadyShardCountGauge, float64(readyShardCount)) + + return readyShardCount == len(resp.Shards), nil } func (a *activities) WaitHandover(ctx context.Context, waitRequest waitHandoverRequest) error { @@ -462,32 +478,50 @@ func (a *activities) checkHandoverOnce(ctx context.Context, waitRequest waitHand return false, fmt.Errorf("GetReplicationStatus returns %d shards, expecting %d", len(resp.Shards), waitRequest.ShardCount) } + readyShardCount := 0 + logged := false // check that every shard is ready to handover for _, shard := range resp.Shards { - clusterInfo, ok := shard.RemoteClusters[waitRequest.RemoteCluster] - if !ok { - return false, fmt.Errorf("GetReplicationStatus response for shard %d does not contains remote cluster %s", shard.ShardId, waitRequest.RemoteCluster) - } - handoverInfo, ok := shard.HandoverNamespaces[waitRequest.Namespace] - if !ok { - return false, fmt.Errorf("namespace %s on shard %d is not in handover state", waitRequest.Namespace, shard.ShardId) + clusterInfo, hasClusterInfo := shard.RemoteClusters[waitRequest.RemoteCluster] + handoverInfo, hasHandoverInfo := shard.HandoverNamespaces[waitRequest.Namespace] + if hasClusterInfo && hasHandoverInfo { + if clusterInfo.AckedTaskId == shard.MaxReplicationTaskId || clusterInfo.AckedTaskId >= handoverInfo.HandoverReplicationTaskId { + readyShardCount++ + continue + } } + // shard is not ready, log first non-ready shard + if !logged { + logged = true + if !hasClusterInfo { + a.logger.Info("Wait handover missing remote cluster info", tag.ShardID(shard.ShardId), tag.ClusterName(waitRequest.RemoteCluster)) + // this is not expected, so fail activity to surface the error, but retryPolicy will keep retrying. + return false, fmt.Errorf("GetReplicationStatus response for shard %d does not contains remote cluster %s", shard.ShardId, waitRequest.RemoteCluster) + } - if clusterInfo.AckedTaskId == shard.MaxReplicationTaskId && clusterInfo.AckedTaskId >= handoverInfo.HandoverReplicationTaskId { - continue // already caught up, continue to check next shard. + if !hasHandoverInfo { + // this could happen before namespace cache refresh + a.logger.Info("Wait handover missing handover namespace info", tag.ShardID(shard.ShardId), tag.ClusterName(waitRequest.RemoteCluster), tag.WorkflowNamespace(waitRequest.Namespace)) + } else { + a.logger.Info("Wait handover not ready", + tag.NewInt32("ShardId", shard.ShardId), + tag.NewInt64("AckedTaskId", clusterInfo.AckedTaskId), + tag.NewInt64("HandoverTaskId", handoverInfo.HandoverReplicationTaskId), + tag.NewStringTag("Namespace", waitRequest.Namespace), + tag.NewStringTag("RemoteCluster", waitRequest.RemoteCluster), + ) + } } - - a.logger.Info("Wait for handover to be ready", - tag.NewInt32("ShardId", shard.ShardId), - tag.NewInt64("AckedTaskId", clusterInfo.AckedTaskId), - tag.NewInt64("HandoverTaskId", handoverInfo.HandoverReplicationTaskId), - tag.NewStringTag("Namespace", waitRequest.Namespace), - tag.NewStringTag("RemoteCluster", waitRequest.RemoteCluster), - ) - return false, nil } - return true, nil + // emit metrics about how many shards are ready + a.metricsClient.Scope( + metrics.MigrationWorkflowScope, + metrics.TargetClusterTag(waitRequest.RemoteCluster), + metrics.NamespaceTag(waitRequest.Namespace), + ).UpdateGauge(metrics.HandoverReadyShardCountGauge, float64(readyShardCount)) + + return readyShardCount == len(resp.Shards), nil } func (a *activities) genReplicationTasks(ctx context.Context, request genReplicationForShard) error {