diff --git a/pkg/mcs/tso/server/grpc_service.go b/pkg/mcs/tso/server/grpc_service.go index 0c8aa4a162f..fb4d14ef797 100644 --- a/pkg/mcs/tso/server/grpc_service.go +++ b/pkg/mcs/tso/server/grpc_service.go @@ -18,6 +18,7 @@ import ( "context" "io" "net/http" + "strconv" "strings" "time" @@ -135,20 +136,26 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error { if s.IsClosed() { return status.Errorf(codes.Unknown, "server not started") } - if request.GetHeader().GetClusterId() != s.clusterID { + header := request.GetHeader() + clusterID := header.GetClusterId() + if clusterID != s.clusterID { return status.Errorf( codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", - s.clusterID, request.GetHeader().GetClusterId()) + s.clusterID, clusterID) } + keyspaceID := header.GetKeyspaceId() + keyspaceGroupID := header.GetKeyspaceGroupId() + dcLocation := request.GetDcLocation() count := request.GetCount() ts, keyspaceGroupBelongTo, err := s.keyspaceGroupManager.HandleTSORequest( ctx, - request.Header.KeyspaceId, request.Header.KeyspaceGroupId, - request.GetDcLocation(), count) + keyspaceID, keyspaceGroupID, + dcLocation, count) if err != nil { return status.Errorf(codes.Unknown, err.Error()) } - tsoHandleDuration.Observe(time.Since(start).Seconds()) + keyspaceGroupIDStr := strconv.FormatUint(uint64(keyspaceGroupID), 10) + tsoHandleDuration.WithLabelValues(keyspaceGroupIDStr).Observe(time.Since(start).Seconds()) response := &tsopb.TsoResponse{ Header: s.header(keyspaceGroupBelongTo), Timestamp: &ts, diff --git a/pkg/mcs/tso/server/metrics.go b/pkg/mcs/tso/server/metrics.go index afd0d47ef13..288d650e1e7 100644 --- a/pkg/mcs/tso/server/metrics.go +++ b/pkg/mcs/tso/server/metrics.go @@ -16,12 +16,9 @@ package server import "github.com/prometheus/client_golang/prometheus" -const ( - namespace = "tso" -) +const namespace = "tso" var ( - // TODO: pre-allocate gauge metrics timeJumpBackCounter = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: namespace, @@ -30,7 +27,7 @@ var ( Help: "Counter of system time jumps backward.", }) - metadataGauge = prometheus.NewGaugeVec( + metaDataGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, Subsystem: "cluster", @@ -46,39 +43,19 @@ var ( Help: "Indicate the tso server info, and the value is the start timestamp (s).", }, []string{"version", "hash"}) - tsoProxyHandleDuration = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: "server", - Name: "handle_tso_proxy_duration_seconds", - Help: "Bucketed histogram of processing time (s) of handled tso proxy requests.", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), - }) - - tsoProxyBatchSize = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: "server", - Name: "handle_tso_proxy_batch_size", - Help: "Bucketed histogram of the batch size of handled tso proxy requests.", - Buckets: prometheus.ExponentialBuckets(1, 2, 13), - }) - - tsoHandleDuration = prometheus.NewHistogram( + tsoHandleDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: namespace, Subsystem: "server", Name: "handle_tso_duration_seconds", Help: "Bucketed histogram of processing time (s) of handled tso requests.", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), - }) + }, []string{"group"}) ) func init() { prometheus.MustRegister(timeJumpBackCounter) - prometheus.MustRegister(metadataGauge) + prometheus.MustRegister(metaDataGauge) prometheus.MustRegister(serverInfo) - prometheus.MustRegister(tsoProxyHandleDuration) - prometheus.MustRegister(tsoProxyBatchSize) prometheus.MustRegister(tsoHandleDuration) } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 01b1d03ef03..91d5e480eaa 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -345,7 +345,7 @@ func (s *Server) startServer() (err error) { log.Info("init cluster id", zap.Uint64("cluster-id", s.clusterID)) // It may lose accuracy if use float64 to store uint64. So we store the cluster id in label. - metadataGauge.WithLabelValues(fmt.Sprintf("cluster%d", s.clusterID)).Set(0) + metaDataGauge.WithLabelValues(fmt.Sprintf("cluster%d", s.clusterID)).Set(0) // The independent TSO service still reuses PD version info since PD and TSO are just // different service modes provided by the same pd-server binary serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 02a046563fb..721fb77c09f 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -190,7 +190,7 @@ func (s *state) markGroupRequested(groupID uint32, checker func() error) error { return nil } -func (s *state) checkTSOSplit( +func (s *state) checkGroupSplit( targetGroupID uint32, ) (splitTargetAM, splitSourceAM *AllocatorManager, err error) { s.RLock() @@ -212,7 +212,7 @@ func (s *state) checkTSOSplit( // Reject any request if the keyspace group is in merging state, // we need to wait for the merging checker to finish the TSO merging. -func (s *state) checkTSOMerge( +func (s *state) checkGroupMerge( groupID uint32, ) error { s.RLock() @@ -1066,7 +1066,7 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest( if err != nil { return pdpb.Timestamp{}, curKeyspaceGroupID, err } - err = kgm.state.checkTSOMerge(curKeyspaceGroupID) + err = kgm.state.checkGroupMerge(curKeyspaceGroupID) if err != nil { return pdpb.Timestamp{}, curKeyspaceGroupID, err } @@ -1156,7 +1156,7 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit( keyspaceGroupID uint32, dcLocation string, ) error { - splitTargetAM, splitSourceAM, err := kgm.state.checkTSOSplit(keyspaceGroupID) + splitTargetAM, splitSourceAM, err := kgm.state.checkGroupSplit(keyspaceGroupID) if err != nil || splitTargetAM == nil { return err } @@ -1209,6 +1209,7 @@ const keyspaceGroupsAPIPrefix = "/pd/api/v2/tso/keyspace-groups" // Put the code below into the critical section to prevent from sending too many HTTP requests. func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error { + start := time.Now() kgm.Lock() defer kgm.Unlock() // Check if the keyspace group is in split state. @@ -1220,6 +1221,7 @@ func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error { if kgm.httpClient == nil { return nil } + startRequest := time.Now() statusCode, err := apiutil.DoDelete( kgm.httpClient, kgm.cfg.GeBackendEndpoints()+keyspaceGroupsAPIPrefix+fmt.Sprintf("/%d/split", id)) @@ -1232,6 +1234,7 @@ func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error { zap.Int("status-code", statusCode)) return errs.ErrSendRequest.FastGenByArgs() } + kgm.metrics.finishSplitSendDuration.Observe(time.Since(startRequest).Seconds()) // Pre-update the split keyspace group's split state in memory. // Note: to avoid data race with state read APIs, we always replace the group in memory as a whole. // For now, we only have scenarios to update split state/merge state, and the other fields are always @@ -1239,10 +1242,12 @@ func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error { newSplitGroup := *splitGroup newSplitGroup.SplitState = nil kgm.kgs[id] = &newSplitGroup + kgm.metrics.finishSplitDuration.Observe(time.Since(start).Seconds()) return nil } func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error { + start := time.Now() kgm.Lock() defer kgm.Unlock() // Check if the keyspace group is in the merging state. @@ -1254,6 +1259,7 @@ func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error { if kgm.httpClient == nil { return nil } + startRequest := time.Now() statusCode, err := apiutil.DoDelete( kgm.httpClient, kgm.cfg.GeBackendEndpoints()+keyspaceGroupsAPIPrefix+fmt.Sprintf("/%d/merge", id)) @@ -1266,7 +1272,7 @@ func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error { zap.Int("status-code", statusCode)) return errs.ErrSendRequest.FastGenByArgs() } - + kgm.metrics.finishMergeSendDuration.Observe(time.Since(startRequest).Seconds()) // Pre-update the merge target keyspace group's merge state in memory. // Note: to avoid data race with state read APIs, we always replace the group in memory as a whole. // For now, we only have scenarios to update split state/merge state, and the other fields are always @@ -1274,6 +1280,7 @@ func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error { newTargetGroup := *mergeTarget newTargetGroup.MergeState = nil kgm.kgs[id] = &newTargetGroup + kgm.metrics.finishMergeDuration.Observe(time.Since(start).Seconds()) return nil } diff --git a/pkg/tso/metrics.go b/pkg/tso/metrics.go index 6b4936c67c3..02e72ebb376 100644 --- a/pkg/tso/metrics.go +++ b/pkg/tso/metrics.go @@ -17,16 +17,18 @@ package tso import "github.com/prometheus/client_golang/prometheus" const ( - dcLabel = "dc" - typeLabel = "type" - groupLabel = "group" + pdNamespace = "pd" + tsoNamespace = "tso" + dcLabel = "dc" + typeLabel = "type" + groupLabel = "group" ) var ( // TSO metrics tsoCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "pd", + Namespace: pdNamespace, Subsystem: "tso", Name: "events", Help: "Counter of tso events", @@ -34,7 +36,7 @@ var ( tsoGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Namespace: "pd", + Namespace: pdNamespace, Subsystem: "cluster", Name: "tso", Help: "Record of tso metadata.", @@ -42,15 +44,24 @@ var ( tsoGap = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Namespace: "pd", + Namespace: pdNamespace, Subsystem: "cluster", Name: "tso_gap_millionseconds", Help: "The minimal (non-zero) TSO gap for each DC.", }, []string{groupLabel, dcLabel}) + tsoOpDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: pdNamespace, + Subsystem: "cluster", + Name: "tso_operation_duration_seconds", + Help: "Bucketed histogram of processing time(s) of the TSO operations.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), + }, []string{typeLabel, groupLabel, dcLabel}) + tsoAllocatorRole = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Namespace: "pd", + Namespace: pdNamespace, Subsystem: "tso", Name: "role", Help: "Indicate the PD server role info, whether it's a TSO allocator.", @@ -59,7 +70,7 @@ var ( // Keyspace Group metrics keyspaceGroupStateGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Namespace: "pd", + Namespace: tsoNamespace, Subsystem: "keyspace_group", Name: "state", Help: "Gauge of the Keyspace Group states.", @@ -67,7 +78,7 @@ var ( keyspaceGroupOpDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Namespace: "pd", + Namespace: tsoNamespace, Subsystem: "keyspace_group", Name: "operation_duration_seconds", Help: "Bucketed histogram of processing time(s) of the Keyspace Group operations.", @@ -79,6 +90,7 @@ func init() { prometheus.MustRegister(tsoCounter) prometheus.MustRegister(tsoGauge) prometheus.MustRegister(tsoGap) + prometheus.MustRegister(tsoOpDuration) prometheus.MustRegister(tsoAllocatorRole) prometheus.MustRegister(keyspaceGroupStateGauge) prometheus.MustRegister(keyspaceGroupOpDuration) @@ -104,6 +116,10 @@ type tsoMetrics struct { notLeaderAnymoreEvent prometheus.Counter logicalOverflowEvent prometheus.Counter exceededMaxRetryEvent prometheus.Counter + // timestampOracle operation duration + syncSaveDuration prometheus.Observer + resetSaveDuration prometheus.Observer + updateSaveDuration prometheus.Observer // allocator event counter notLeaderEvent prometheus.Counter globalTSOSyncEvent prometheus.Counter @@ -137,6 +153,9 @@ func newTSOMetrics(groupID, dcLocation string) *tsoMetrics { notLeaderAnymoreEvent: tsoCounter.WithLabelValues("not_leader_anymore", groupID, dcLocation), logicalOverflowEvent: tsoCounter.WithLabelValues("logical_overflow", groupID, dcLocation), exceededMaxRetryEvent: tsoCounter.WithLabelValues("exceeded_max_retry", groupID, dcLocation), + syncSaveDuration: tsoOpDuration.WithLabelValues("sync_save", groupID, dcLocation), + resetSaveDuration: tsoOpDuration.WithLabelValues("reset_save", groupID, dcLocation), + updateSaveDuration: tsoOpDuration.WithLabelValues("update_save", groupID, dcLocation), notLeaderEvent: tsoCounter.WithLabelValues("not_leader", groupID, dcLocation), globalTSOSyncEvent: tsoCounter.WithLabelValues("global_tso_sync", groupID, dcLocation), globalTSOEstimateEvent: tsoCounter.WithLabelValues("global_tso_estimate", groupID, dcLocation), @@ -150,21 +169,29 @@ func newTSOMetrics(groupID, dcLocation string) *tsoMetrics { } type keyspaceGroupMetrics struct { - splitSourceGauge prometheus.Gauge - splitTargetGauge prometheus.Gauge - mergeSourceGauge prometheus.Gauge - mergeTargetGauge prometheus.Gauge - splitDuration prometheus.Observer - mergeDuration prometheus.Observer + splitSourceGauge prometheus.Gauge + splitTargetGauge prometheus.Gauge + mergeSourceGauge prometheus.Gauge + mergeTargetGauge prometheus.Gauge + splitDuration prometheus.Observer + mergeDuration prometheus.Observer + finishSplitSendDuration prometheus.Observer + finishSplitDuration prometheus.Observer + finishMergeSendDuration prometheus.Observer + finishMergeDuration prometheus.Observer } func newKeyspaceGroupMetrics() *keyspaceGroupMetrics { return &keyspaceGroupMetrics{ - splitSourceGauge: keyspaceGroupStateGauge.WithLabelValues("split-source"), - splitTargetGauge: keyspaceGroupStateGauge.WithLabelValues("split-target"), - mergeSourceGauge: keyspaceGroupStateGauge.WithLabelValues("merge-source"), - mergeTargetGauge: keyspaceGroupStateGauge.WithLabelValues("merge-target"), - splitDuration: keyspaceGroupOpDuration.WithLabelValues("split"), - mergeDuration: keyspaceGroupOpDuration.WithLabelValues("merge"), + splitSourceGauge: keyspaceGroupStateGauge.WithLabelValues("split-source"), + splitTargetGauge: keyspaceGroupStateGauge.WithLabelValues("split-target"), + mergeSourceGauge: keyspaceGroupStateGauge.WithLabelValues("merge-source"), + mergeTargetGauge: keyspaceGroupStateGauge.WithLabelValues("merge-target"), + splitDuration: keyspaceGroupOpDuration.WithLabelValues("split"), + mergeDuration: keyspaceGroupOpDuration.WithLabelValues("merge"), + finishSplitSendDuration: keyspaceGroupOpDuration.WithLabelValues("finish-split-send"), + finishSplitDuration: keyspaceGroupOpDuration.WithLabelValues("finish-split"), + finishMergeSendDuration: keyspaceGroupOpDuration.WithLabelValues("finish-merge-send"), + finishMergeDuration: keyspaceGroupOpDuration.WithLabelValues("finish-merge"), } } diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index eba1b44d2ec..e7906e01c48 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -192,10 +192,12 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error { failpoint.Return(errs.ErrEtcdTxnInternal) }) save := next.Add(t.saveInterval) + start := time.Now() if err = t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { t.metrics.errSaveSyncTSEvent.Inc() return err } + t.metrics.syncSaveDuration.Observe(time.Since(start).Seconds()) t.lastSavedTime.Store(save) t.metrics.syncOKEvent.Inc() @@ -260,10 +262,12 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi // save into etcd only if nextPhysical is close to lastSavedTime if typeutil.SubRealTimeByWallClock(t.getLastSavedTime(), nextPhysical) <= UpdateTimestampGuard { save := nextPhysical.Add(t.saveInterval) + start := time.Now() if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { t.metrics.errSaveResetTSEvent.Inc() return err } + t.metrics.resetSaveDuration.Observe(time.Since(start).Seconds()) t.lastSavedTime.Store(save) } // save into memory only if nextPhysical or nextLogical is greater. @@ -336,6 +340,7 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error // The time window needs to be updated and saved to etcd. if typeutil.SubRealTimeByWallClock(t.getLastSavedTime(), next) <= UpdateTimestampGuard { save := next.Add(t.saveInterval) + start := time.Now() if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { log.Warn("save timestamp failed", zap.String("dc-location", t.dcLocation), @@ -344,6 +349,7 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error t.metrics.errSaveUpdateTSEvent.Inc() return err } + t.metrics.updateSaveDuration.Observe(time.Since(start).Seconds()) t.lastSavedTime.Store(save) } // save into memory