Skip to content

Commit

Permalink
Add metrics to ns migration workflows (#2326)
Browse files Browse the repository at this point in the history
* Add metrics to ns migration workflows

* goimports

* return error on missing remote cluster info
  • Loading branch information
yiminc authored Dec 29, 2021
1 parent c191883 commit 26278fd
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 35 deletions.
7 changes: 7 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,8 @@ const (
ParentClosePolicyProcessorScope
// AddSearchAttributesWorkflowScope is scope used by all metrics emitted by worker.AddSearchAttributesWorkflowScope module
AddSearchAttributesWorkflowScope
// MigrationWorkflowScope is scope used by metrics emitted by migration related workflows
MigrationWorkflowScope

NumWorkerScopes
)
Expand Down Expand Up @@ -1707,6 +1709,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
BatcherScope: {operation: "batcher"},
ParentClosePolicyProcessorScope: {operation: "ParentClosePolicyProcessor"},
AddSearchAttributesWorkflowScope: {operation: "AddSearchAttributesWorkflow"},
MigrationWorkflowScope: {operation: "MigrationWorkflow"},
},
Server: {
ServerTlsScope: {operation: "ServerTls"},
Expand Down Expand Up @@ -2161,6 +2164,8 @@ const (
ScavengerValidationRequestsCount
ScavengerValidationFailuresCount
AddSearchAttributesFailuresCount
CatchUpReadyShardCountGauge
HandoverReadyShardCountGauge

NumWorkerMetrics
)
Expand Down Expand Up @@ -2607,6 +2612,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ScavengerValidationRequestsCount: NewCounterDef("scavenger_validation_requests"),
ScavengerValidationFailuresCount: NewCounterDef("scavenger_validation_failures"),
AddSearchAttributesFailuresCount: NewCounterDef("add_search_attributes_failures"),
CatchUpReadyShardCountGauge: NewGaugeDef("catchup_ready_shard_count"),
HandoverReadyShardCountGauge: NewGaugeDef("handover_ready_shard_count"),
},
Server: {
TlsCertsExpired: NewGaugeDef("certificates_expired"),
Expand Down
4 changes: 2 additions & 2 deletions service/worker/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ import (
"go.temporal.io/server/common/resource"
"go.temporal.io/server/service"
"go.temporal.io/server/service/worker/addsearchattributes"
"go.temporal.io/server/service/worker/scanner/replication"
"go.temporal.io/server/service/worker/migration"
)

var Module = fx.Options(
replication.Module,
migration.Module,
addsearchattributes.Module,
resource.Module,
fx.Provide(ParamsExpandProvider),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package replication
package migration

import (
"go.temporal.io/api/workflowservice/v1"
Expand All @@ -31,6 +31,7 @@ import (
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
workercommon "go.temporal.io/server/service/worker/common"
Expand All @@ -46,6 +47,7 @@ type (
HistoryClient historyservice.HistoryServiceClient
FrontendClient workflowservice.WorkflowServiceClient
Logger log.Logger
MetricsClient metrics.Client
}

fxResult struct {
Expand Down Expand Up @@ -90,5 +92,6 @@ func (wc *replicationWorkerComponent) activities() *activities {
historyClient: wc.HistoryClient,
frontendClient: wc.FrontendClient,
logger: wc.Logger,
metricsClient: wc.MetricsClient,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package replication
package migration

import (
"context"
Expand All @@ -44,6 +44,7 @@ import (
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
)
Expand Down Expand Up @@ -82,6 +83,7 @@ type (
historyClient historyservice.HistoryServiceClient
frontendClient workflowservice.WorkflowServiceClient
logger log.Logger
metricsClient metrics.Client
}

genReplicationForShardRange struct {
Expand Down Expand Up @@ -408,30 +410,44 @@ func (a *activities) checkReplicationOnce(ctx context.Context, waitRequest waitR
}

// check that every shard has caught up
readyShardCount := 0
logged := false
for _, shard := range resp.Shards {
clusterInfo, ok := shard.RemoteClusters[waitRequest.RemoteCluster]
if !ok {
return false, fmt.Errorf("GetReplicationStatus response for shard %d does not contains remote cluster %s", shard.ShardId, waitRequest.RemoteCluster)
}
if clusterInfo.AckedTaskId == shard.MaxReplicationTaskId {
continue // already caught up, continue to check next shard.
clusterInfo, hasClusterInfo := shard.RemoteClusters[waitRequest.RemoteCluster]
if hasClusterInfo {
if clusterInfo.AckedTaskId == shard.MaxReplicationTaskId ||
(clusterInfo.AckedTaskId >= waitRequest.WaitForTaskIds[shard.ShardId] &&
shard.ShardLocalTime.Sub(*clusterInfo.AckedTaskVisibilityTime) <= waitRequest.AllowedLagging) {
readyShardCount++
continue
}
}

if clusterInfo.AckedTaskId < waitRequest.WaitForTaskIds[shard.ShardId] ||
shard.ShardLocalTime.Sub(*clusterInfo.AckedTaskVisibilityTime) > waitRequest.AllowedLagging {
a.logger.Info("Wait for remote ack",
// shard is not ready, log first non-ready shard
if !logged {
logged = true
if !hasClusterInfo {
a.logger.Info("Wait catchup missing remote cluster info", tag.ShardID(shard.ShardId), tag.ClusterName(waitRequest.RemoteCluster))
// this is not expected, so fail activity to surface the error, but retryPolicy will keep retrying.
return false, fmt.Errorf("GetReplicationStatus response for shard %d does not contains remote cluster %s", shard.ShardId, waitRequest.RemoteCluster)
}
a.logger.Info("Wait catchup not ready",
tag.NewInt32("ShardId", shard.ShardId),
tag.NewInt64("AckedTaskId", clusterInfo.AckedTaskId),
tag.NewInt64("WaitForTaskId", waitRequest.WaitForTaskIds[shard.ShardId]),
tag.NewDurationTag("AllowedLagging", waitRequest.AllowedLagging),
tag.NewDurationTag("ActualLagging", shard.ShardLocalTime.Sub(*clusterInfo.AckedTaskVisibilityTime)),
tag.NewStringTag("RemoteCluster", waitRequest.RemoteCluster),
)
return false, nil
}
}

return true, nil
// emit metrics about how many shards are ready
a.metricsClient.Scope(
metrics.MigrationWorkflowScope,
metrics.TargetClusterTag(waitRequest.RemoteCluster),
).UpdateGauge(metrics.CatchUpReadyShardCountGauge, float64(readyShardCount))

return readyShardCount == len(resp.Shards), nil
}

func (a *activities) WaitHandover(ctx context.Context, waitRequest waitHandoverRequest) error {
Expand Down Expand Up @@ -462,32 +478,50 @@ func (a *activities) checkHandoverOnce(ctx context.Context, waitRequest waitHand
return false, fmt.Errorf("GetReplicationStatus returns %d shards, expecting %d", len(resp.Shards), waitRequest.ShardCount)
}

readyShardCount := 0
logged := false
// check that every shard is ready to handover
for _, shard := range resp.Shards {
clusterInfo, ok := shard.RemoteClusters[waitRequest.RemoteCluster]
if !ok {
return false, fmt.Errorf("GetReplicationStatus response for shard %d does not contains remote cluster %s", shard.ShardId, waitRequest.RemoteCluster)
}
handoverInfo, ok := shard.HandoverNamespaces[waitRequest.Namespace]
if !ok {
return false, fmt.Errorf("namespace %s on shard %d is not in handover state", waitRequest.Namespace, shard.ShardId)
clusterInfo, hasClusterInfo := shard.RemoteClusters[waitRequest.RemoteCluster]
handoverInfo, hasHandoverInfo := shard.HandoverNamespaces[waitRequest.Namespace]
if hasClusterInfo && hasHandoverInfo {
if clusterInfo.AckedTaskId == shard.MaxReplicationTaskId || clusterInfo.AckedTaskId >= handoverInfo.HandoverReplicationTaskId {
readyShardCount++
continue
}
}
// shard is not ready, log first non-ready shard
if !logged {
logged = true
if !hasClusterInfo {
a.logger.Info("Wait handover missing remote cluster info", tag.ShardID(shard.ShardId), tag.ClusterName(waitRequest.RemoteCluster))
// this is not expected, so fail activity to surface the error, but retryPolicy will keep retrying.
return false, fmt.Errorf("GetReplicationStatus response for shard %d does not contains remote cluster %s", shard.ShardId, waitRequest.RemoteCluster)
}

if clusterInfo.AckedTaskId == shard.MaxReplicationTaskId && clusterInfo.AckedTaskId >= handoverInfo.HandoverReplicationTaskId {
continue // already caught up, continue to check next shard.
if !hasHandoverInfo {
// this could happen before namespace cache refresh
a.logger.Info("Wait handover missing handover namespace info", tag.ShardID(shard.ShardId), tag.ClusterName(waitRequest.RemoteCluster), tag.WorkflowNamespace(waitRequest.Namespace))
} else {
a.logger.Info("Wait handover not ready",
tag.NewInt32("ShardId", shard.ShardId),
tag.NewInt64("AckedTaskId", clusterInfo.AckedTaskId),
tag.NewInt64("HandoverTaskId", handoverInfo.HandoverReplicationTaskId),
tag.NewStringTag("Namespace", waitRequest.Namespace),
tag.NewStringTag("RemoteCluster", waitRequest.RemoteCluster),
)
}
}

a.logger.Info("Wait for handover to be ready",
tag.NewInt32("ShardId", shard.ShardId),
tag.NewInt64("AckedTaskId", clusterInfo.AckedTaskId),
tag.NewInt64("HandoverTaskId", handoverInfo.HandoverReplicationTaskId),
tag.NewStringTag("Namespace", waitRequest.Namespace),
tag.NewStringTag("RemoteCluster", waitRequest.RemoteCluster),
)
return false, nil
}

return true, nil
// emit metrics about how many shards are ready
a.metricsClient.Scope(
metrics.MigrationWorkflowScope,
metrics.TargetClusterTag(waitRequest.RemoteCluster),
metrics.NamespaceTag(waitRequest.Namespace),
).UpdateGauge(metrics.HandoverReadyShardCountGauge, float64(readyShardCount))

return readyShardCount == len(resp.Shards), nil
}

func (a *activities) genReplicationTasks(ctx context.Context, request genReplicationForShard) error {
Expand Down

0 comments on commit 26278fd

Please sign in to comment.