Skip to content

Commit

Permalink
*: replace capture ID with address in metrics (#686)
Browse files Browse the repository at this point in the history
* metrics: refine grafan dashboard
* *: replace capture ID with address in metrics
* sink: sanitize kafka name
* Update scripts/check-copyright.sh
* use 127.0.0.1 in tests

Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus authored Jun 24, 2020
1 parent 48d3b37 commit 5d74cae
Show file tree
Hide file tree
Showing 25 changed files with 4,134 additions and 4,000 deletions.
3 changes: 2 additions & 1 deletion cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,12 @@ func (c *Capture) assignTask(ctx context.Context, task *Task) (*processor, error
zap.String("changefeedid", task.ChangeFeedID))

p, err := runProcessor(ctx, c.session, *cf, task.ChangeFeedID,
c.info.ID, task.CheckpointTS)
*c.info, task.CheckpointTS)
if err != nil {
log.Error("run processor failed",
zap.String("changefeedid", task.ChangeFeedID),
zap.String("captureid", c.info.ID),
zap.String("captureaddr", c.info.AdvertiseAddr),
zap.Error(err))
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ func (m *mounterImpl) Run(ctx context.Context) error {
}

func (m *mounterImpl) codecWorker(ctx context.Context, index int) error {
captureID := util.CaptureIDFromCtx(ctx)
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricMountDuration := mountDuration.WithLabelValues(captureID, changefeedID)
metricMountDuration := mountDuration.WithLabelValues(captureAddr, changefeedID)

for {
var pEvent *model.PolymorphicEvent
Expand Down Expand Up @@ -193,9 +193,9 @@ func (m *mounterImpl) Input() chan<- *model.PolymorphicEvent {
}

func (m *mounterImpl) collectMetrics(ctx context.Context) {
captureID := util.CaptureIDFromCtx(ctx)
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricMounterInputChanSize := mounterInputChanSizeGauge.WithLabelValues(captureID, changefeedID)
metricMounterInputChanSize := mounterInputChanSizeGauge.WithLabelValues(captureAddr, changefeedID)

for {
select {
Expand Down
24 changes: 12 additions & 12 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
limit := 20

nextSpan := span
captureID := util.CaptureIDFromCtx(ctx)
captureAddr := util.CaptureAddrFromCtx(ctx)

for {
var (
Expand All @@ -802,7 +802,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
scanT0 := time.Now()
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
regions, err = s.regionCache.BatchLoadRegionsWithKeyRange(bo, nextSpan.Start, nextSpan.End, limit)
scanRegionsDuration.WithLabelValues(captureID).Observe(time.Since(scanT0).Seconds())
scanRegionsDuration.WithLabelValues(captureAddr).Observe(time.Since(scanT0).Seconds())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1038,17 +1038,17 @@ func (s *eventFeedSession) singleEventFeed(
startTs uint64,
receiverCh <-chan *cdcpb.Event,
) (uint64, error) {
captureID := util.CaptureIDFromCtx(ctx)
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricEventSize := eventSize.WithLabelValues(captureID)
metricPullEventInitializedCounter := pullEventCounter.WithLabelValues(cdcpb.Event_INITIALIZED.String(), captureID, changefeedID)
metricPullEventCommittedCounter := pullEventCounter.WithLabelValues(cdcpb.Event_COMMITTED.String(), captureID, changefeedID)
metricPullEventCommitCounter := pullEventCounter.WithLabelValues(cdcpb.Event_COMMIT.String(), captureID, changefeedID)
metricPullEventPrewriteCounter := pullEventCounter.WithLabelValues(cdcpb.Event_PREWRITE.String(), captureID, changefeedID)
metricPullEventRollbackCounter := pullEventCounter.WithLabelValues(cdcpb.Event_ROLLBACK.String(), captureID, changefeedID)
metricSendEventResolvedCounter := sendEventCounter.WithLabelValues("native resolved", captureID, changefeedID)
metricSendEventCommitCounter := sendEventCounter.WithLabelValues("commit", captureID, changefeedID)
metricSendEventCommittedCounter := sendEventCounter.WithLabelValues("committed", captureID, changefeedID)
metricEventSize := eventSize.WithLabelValues(captureAddr)
metricPullEventInitializedCounter := pullEventCounter.WithLabelValues(cdcpb.Event_INITIALIZED.String(), captureAddr, changefeedID)
metricPullEventCommittedCounter := pullEventCounter.WithLabelValues(cdcpb.Event_COMMITTED.String(), captureAddr, changefeedID)
metricPullEventCommitCounter := pullEventCounter.WithLabelValues(cdcpb.Event_COMMIT.String(), captureAddr, changefeedID)
metricPullEventPrewriteCounter := pullEventCounter.WithLabelValues(cdcpb.Event_PREWRITE.String(), captureAddr, changefeedID)
metricPullEventRollbackCounter := pullEventCounter.WithLabelValues(cdcpb.Event_ROLLBACK.String(), captureAddr, changefeedID)
metricSendEventResolvedCounter := sendEventCounter.WithLabelValues("native resolved", captureAddr, changefeedID)
metricSendEventCommitCounter := sendEventCounter.WithLabelValues("commit", captureAddr, changefeedID)
metricSendEventCommittedCounter := sendEventCounter.WithLabelValues("committed", captureAddr, changefeedID)

initialized := false

Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (c CDCEtcdClient) GetTaskWorkload(
func (c CDCEtcdClient) PutTaskWorkload(
ctx context.Context,
changefeedID string,
captureID string,
captureID model.CaptureID,
info *model.TaskWorkload,
) error {
data, err := info.Marshal()
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ var (
Name: "scan_regions_duration_seconds",
Help: "The time it took to finish a scanRegions call.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18),
}, []string{"captureID"})
}, []string{"capture"})
eventSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "kvclient",
Name: "event_size_bytes",
Help: "Size of KV events.",
Buckets: prometheus.ExponentialBuckets(16, 2, 25),
}, []string{"captureID"})
}, []string{"capture"})
pullEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Expand Down
4 changes: 2 additions & 2 deletions cdc/metrics_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ var (
Namespace: "ticdc",
Subsystem: "processor",
Name: "update_info_duration_seconds",
Help: "The time it took to update sub change feed info.",
Help: "The time it took to update sub changefeed info.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18),
}, []string{"captureID"})
}, []string{"capture"})
waitEventPrepareDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Expand Down
64 changes: 36 additions & 28 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var (

type processor struct {
id string
captureID string
captureInfo model.CaptureInfo
changefeedID string
changefeed model.ChangeFeedInfo
limitter *puller.BlurResourceLimitter
Expand Down Expand Up @@ -134,7 +134,8 @@ func newProcessor(
session *concurrency.Session,
changefeed model.ChangeFeedInfo,
sink sink.Sink,
changefeedID, captureID string,
changefeedID string,
captureInfo model.CaptureInfo,
checkpointTs uint64,
errCh chan error,
) (*processor, error) {
Expand All @@ -150,7 +151,7 @@ func newProcessor(
}
cdcEtcdCli := kv.NewCDCEtcdClient(etcdCli)

tsRWriter, err := fNewTsRWriter(cdcEtcdCli, changefeedID, captureID)
tsRWriter, err := fNewTsRWriter(cdcEtcdCli, changefeedID, captureInfo.ID)
if err != nil {
return nil, errors.Annotate(err, "failed to create ts RWriter")
}
Expand All @@ -176,7 +177,7 @@ func newProcessor(
p := &processor{
id: uuid.New().String(),
limitter: limitter,
captureID: captureID,
captureInfo: captureInfo,
changefeedID: changefeedID,
changefeed: changefeed,
pdCli: pdCli,
Expand Down Expand Up @@ -265,7 +266,8 @@ func (p *processor) wait() {
err := p.wg.Wait()
if err != nil && errors.Cause(err) != context.Canceled {
log.Error("processor wait error",
zap.String("captureID", p.captureID),
zap.String("captureid", p.captureInfo.ID),
zap.String("captureaddr", p.captureInfo.AdvertiseAddr),
zap.String("changefeedID", p.changefeedID),
zap.Error(err),
)
Expand Down Expand Up @@ -303,7 +305,9 @@ func (p *processor) positionWorker(ctx context.Context) error {
}
return inErr
})
updateInfoDuration.WithLabelValues(p.captureID).Observe(time.Since(t0Update).Seconds())
updateInfoDuration.
WithLabelValues(p.captureInfo.AdvertiseAddr).
Observe(time.Since(t0Update).Seconds())
if err != nil {
return errors.Annotate(err, "failed to update info")
}
Expand All @@ -324,8 +328,8 @@ func (p *processor) positionWorker(ctx context.Context) error {
log.Info("Local resolved worker exited")
}()

resolvedTsGauge := resolvedTsGauge.WithLabelValues(p.changefeedID, p.captureID)
checkpointTsGauge := checkpointTsGauge.WithLabelValues(p.changefeedID, p.captureID)
resolvedTsGauge := resolvedTsGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
checkpointTsGauge := checkpointTsGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)

for {
select {
Expand Down Expand Up @@ -398,7 +402,7 @@ func (p *processor) ddlPullWorker(ctx context.Context) error {

func (p *processor) workloadWorker(ctx context.Context) error {
t := time.NewTicker(10 * time.Second)
err := p.etcdCli.PutTaskWorkload(ctx, p.changefeedID, p.captureID, nil)
err := p.etcdCli.PutTaskWorkload(ctx, p.changefeedID, p.captureInfo.ID, nil)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -414,7 +418,7 @@ func (p *processor) workloadWorker(ctx context.Context) error {
workload[table.id] = table.workload
}
p.stateMu.Unlock()
err := p.etcdCli.PutTaskWorkload(ctx, p.changefeedID, p.captureID, &workload)
err := p.etcdCli.PutTaskWorkload(ctx, p.changefeedID, p.captureInfo.ID, &workload)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -450,7 +454,9 @@ func (p *processor) updateInfo(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Set(float64(len(p.status.Tables)))
syncTableNumGauge.
WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).
Set(float64(len(p.status.Tables)))
err = updatePosition()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -488,9 +494,9 @@ func (p *processor) removeTable(tableID int64) {
table.cancel()
delete(p.tables, tableID)
tableIDStr := strconv.FormatInt(tableID, 10)
tableInputChanSizeGauge.DeleteLabelValues(p.changefeedID, p.captureID, tableIDStr)
tableResolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureID, tableIDStr)
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Dec()
tableInputChanSizeGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableIDStr)
tableResolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableIDStr)
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Dec()
}

// handleTables handles table scheduler on this processor, add or remove table puller
Expand Down Expand Up @@ -710,7 +716,7 @@ func (p *processor) collectMetrics(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-time.After(defaultMetricInterval):
tableOutputChanSizeGauge.WithLabelValues(p.changefeedID, p.captureID).Set(float64(len(p.output)))
tableOutputChanSizeGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Set(float64(len(p.output)))
}
}
}
Expand Down Expand Up @@ -784,7 +790,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
}
}()
go func() {
resolvedTsGauge := tableResolvedTsGauge.WithLabelValues(p.changefeedID, p.captureID, strconv.FormatInt(table.id, 10))
resolvedTsGauge := tableResolvedTsGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, strconv.FormatInt(table.id, 10))
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -847,25 +853,25 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
if p.position.ResolvedTs > replicaInfo.StartTs {
p.position.ResolvedTs = replicaInfo.StartTs
}
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Inc()
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Inc()
}

func (p *processor) stop(ctx context.Context) error {
log.Info("stop processor", zap.String("id", p.id), zap.String("capture", p.captureID), zap.String("changefeed", p.changefeedID))
log.Info("stop processor", zap.String("id", p.id), zap.String("capture", p.captureInfo.AdvertiseAddr), zap.String("changefeed", p.changefeedID))
p.stateMu.Lock()
for _, tbl := range p.tables {
tbl.cancel()
}
// mark tables share the same context with its original table, don't need to cancel
p.stateMu.Unlock()
atomic.StoreInt32(&p.stopped, 1)
if err := p.etcdCli.DeleteTaskPosition(ctx, p.changefeedID, p.captureID); err != nil {
if err := p.etcdCli.DeleteTaskPosition(ctx, p.changefeedID, p.captureInfo.ID); err != nil {
return err
}
if err := p.etcdCli.DeleteTaskStatus(ctx, p.changefeedID, p.captureID); err != nil {
if err := p.etcdCli.DeleteTaskStatus(ctx, p.changefeedID, p.captureInfo.ID); err != nil {
return err
}
if err := p.etcdCli.DeleteTaskWorkload(ctx, p.changefeedID, p.captureID); err != nil {
if err := p.etcdCli.DeleteTaskWorkload(ctx, p.changefeedID, p.captureInfo.ID); err != nil {
return err
}
return p.sink.Close()
Expand All @@ -881,15 +887,15 @@ func runProcessor(
session *concurrency.Session,
info model.ChangeFeedInfo,
changefeedID string,
captureID string,
captureInfo model.CaptureInfo,
checkpointTs uint64,
) (*processor, error) {
opts := make(map[string]string, len(info.Opts)+2)
for k, v := range info.Opts {
opts[k] = v
}
opts[sink.OptChangefeedID] = changefeedID
opts[sink.OptCaptureID] = captureID
opts[sink.OptCaptureAddr] = captureInfo.AdvertiseAddr
ctx = util.PutChangefeedIDInCtx(ctx, changefeedID)
filter, err := filter.NewFilter(info.Config)
if err != nil {
Expand All @@ -902,29 +908,31 @@ func runProcessor(
cancel()
return nil, errors.Trace(err)
}
processor, err := newProcessor(ctx, session, info, sink, changefeedID, captureID, checkpointTs, errCh)
processor, err := newProcessor(ctx, session, info, sink, changefeedID, captureInfo, checkpointTs, errCh)
if err != nil {
cancel()
return nil, err
}
log.Info("start to run processor", zap.String("changefeed id", changefeedID))

processorErrorCounter.WithLabelValues(changefeedID, captureID).Add(0)
processorErrorCounter.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr).Add(0)
processor.Run(ctx)

go func() {
err := <-errCh
cause := errors.Cause(err)
if cause != nil && cause != context.Canceled && cause != model.ErrAdminStopProcessor {
processorErrorCounter.WithLabelValues(changefeedID, captureID).Inc()
processorErrorCounter.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr).Inc()
log.Error("error on running processor",
zap.String("captureid", captureID),
zap.String("captureid", captureInfo.ID),
zap.String("captureaddr", captureInfo.AdvertiseAddr),
zap.String("changefeedid", changefeedID),
zap.String("processorid", processor.id),
zap.Error(err))
} else {
log.Info("processor exited",
zap.String("captureid", captureID),
zap.String("captureid", captureInfo.ID),
zap.String("captureaddr", captureInfo.AdvertiseAddr),
zap.String("changefeedid", changefeedID),
zap.String("processorid", processor.id))
}
Expand Down
12 changes: 6 additions & 6 deletions cdc/puller/entry_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ func NewEntrySorter() *EntrySorter {

// Run runs EntrySorter
func (es *EntrySorter) Run(ctx context.Context) error {
captureID := util.CaptureIDFromCtx(ctx)
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
tableIDStr := strconv.FormatInt(util.TableIDFromCtx(ctx), 10)
metricEntrySorterResolvedChanSizeGuage := entrySorterResolvedChanSizeGauge.WithLabelValues(captureID, changefeedID, tableIDStr)
metricEntrySorterOutputChanSizeGauge := entrySorterOutputChanSizeGauge.WithLabelValues(captureID, changefeedID, tableIDStr)
metricEntryUnsortedSizeGauge := entrySorterUnsortedSizeGauge.WithLabelValues(captureID, changefeedID, tableIDStr)
metricEntrySorterSortDuration := entrySorterSortDuration.WithLabelValues(captureID, changefeedID, tableIDStr)
metricEntrySorterMergeDuration := entrySorterMergeDuration.WithLabelValues(captureID, changefeedID, tableIDStr)
metricEntrySorterResolvedChanSizeGuage := entrySorterResolvedChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEntrySorterOutputChanSizeGauge := entrySorterOutputChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEntryUnsortedSizeGauge := entrySorterUnsortedSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEntrySorterSortDuration := entrySorterSortDuration.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEntrySorterMergeDuration := entrySorterMergeDuration.WithLabelValues(captureAddr, changefeedID, tableIDStr)

lessFunc := func(i *model.PolymorphicEvent, j *model.PolymorphicEvent) bool {
if i.CRTs == j.CRTs {
Expand Down
18 changes: 9 additions & 9 deletions cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,18 @@ func (p *pullerImpl) Run(ctx context.Context) error {
})
}

captureID := util.CaptureIDFromCtx(ctx)
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
tableID := util.TableIDFromCtx(ctx)
tableIDStr := strconv.FormatInt(tableID, 10)
metricOutputChanSize := outputChanSizeGauge.WithLabelValues(captureID, changefeedID, tableIDStr)
metricEventChanSize := eventChanSizeGauge.WithLabelValues(captureID, changefeedID, tableIDStr)
metricMemBufferSize := memBufferSizeGauge.WithLabelValues(captureID, changefeedID, tableIDStr)
metricPullerResolvedTs := pullerResolvedTsGauge.WithLabelValues(captureID, changefeedID, tableIDStr)
metricEventCounterKv := kvEventCounter.WithLabelValues(captureID, changefeedID, "kv")
metricEventCounterResolved := kvEventCounter.WithLabelValues(captureID, changefeedID, "resolved")
metricTxnCollectCounterKv := txnCollectCounter.WithLabelValues(captureID, changefeedID, tableIDStr, "kv")
metricTxnCollectCounterResolved := txnCollectCounter.WithLabelValues(captureID, changefeedID, tableIDStr, "kv")
metricOutputChanSize := outputChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEventChanSize := eventChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricMemBufferSize := memBufferSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricPullerResolvedTs := pullerResolvedTsGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEventCounterKv := kvEventCounter.WithLabelValues(captureAddr, changefeedID, "kv")
metricEventCounterResolved := kvEventCounter.WithLabelValues(captureAddr, changefeedID, "resolved")
metricTxnCollectCounterKv := txnCollectCounter.WithLabelValues(captureAddr, changefeedID, tableIDStr, "kv")
metricTxnCollectCounterResolved := txnCollectCounter.WithLabelValues(captureAddr, changefeedID, tableIDStr, "kv")

g.Go(func() error {
for {
Expand Down
4 changes: 2 additions & 2 deletions cdc/roles/storage/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ type ProcessorTsEtcdRWriter struct {

// NewProcessorTsEtcdRWriter returns a new `*ChangeFeedRWriter` instance
func NewProcessorTsEtcdRWriter(cli kv.CDCEtcdClient, changefeedID, captureID string) (*ProcessorTsEtcdRWriter, error) {
logger := log.L().With(zap.String("changefeed id", changefeedID)).
With(zap.String("capture id", captureID))
logger := log.L().With(zap.String("changefeedid", changefeedID)).
With(zap.String("captureid", captureID))

rw := &ProcessorTsEtcdRWriter{
etcdClient: cli,
Expand Down
Loading

0 comments on commit 5d74cae

Please sign in to comment.