Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: replace capture ID with address in metrics #686

Merged
merged 9 commits into from
Jun 24, 2020
3 changes: 2 additions & 1 deletion cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,12 @@ func (c *Capture) assignTask(ctx context.Context, task *Task) (*processor, error
zap.String("changefeedid", task.ChangeFeedID))

p, err := runProcessor(ctx, c.session, *cf, task.ChangeFeedID,
c.info.ID, task.CheckpointTS)
*c.info, task.CheckpointTS)
if err != nil {
log.Error("run processor failed",
zap.String("changefeedid", task.ChangeFeedID),
zap.String("captureid", c.info.ID),
zap.String("captureaddr", c.info.AdvertiseAddr),
zap.Error(err))
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ func (m *mounterImpl) Run(ctx context.Context) error {
}

func (m *mounterImpl) codecWorker(ctx context.Context, index int) error {
captureID := util.CaptureIDFromCtx(ctx)
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricMountDuration := mountDuration.WithLabelValues(captureID, changefeedID)
metricMountDuration := mountDuration.WithLabelValues(captureAddr, changefeedID)

for {
var pEvent *model.PolymorphicEvent
Expand Down Expand Up @@ -193,9 +193,9 @@ func (m *mounterImpl) Input() chan<- *model.PolymorphicEvent {
}

func (m *mounterImpl) collectMetrics(ctx context.Context) {
captureID := util.CaptureIDFromCtx(ctx)
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricMounterInputChanSize := mounterInputChanSizeGauge.WithLabelValues(captureID, changefeedID)
metricMounterInputChanSize := mounterInputChanSizeGauge.WithLabelValues(captureAddr, changefeedID)

for {
select {
Expand Down
24 changes: 12 additions & 12 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
limit := 20

nextSpan := span
captureID := util.CaptureIDFromCtx(ctx)
captureAddr := util.CaptureAddrFromCtx(ctx)

for {
var (
Expand All @@ -802,7 +802,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
scanT0 := time.Now()
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
regions, err = s.regionCache.BatchLoadRegionsWithKeyRange(bo, nextSpan.Start, nextSpan.End, limit)
scanRegionsDuration.WithLabelValues(captureID).Observe(time.Since(scanT0).Seconds())
scanRegionsDuration.WithLabelValues(captureAddr).Observe(time.Since(scanT0).Seconds())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1038,17 +1038,17 @@ func (s *eventFeedSession) singleEventFeed(
startTs uint64,
receiverCh <-chan *cdcpb.Event,
) (uint64, error) {
captureID := util.CaptureIDFromCtx(ctx)
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricEventSize := eventSize.WithLabelValues(captureID)
metricPullEventInitializedCounter := pullEventCounter.WithLabelValues(cdcpb.Event_INITIALIZED.String(), captureID, changefeedID)
metricPullEventCommittedCounter := pullEventCounter.WithLabelValues(cdcpb.Event_COMMITTED.String(), captureID, changefeedID)
metricPullEventCommitCounter := pullEventCounter.WithLabelValues(cdcpb.Event_COMMIT.String(), captureID, changefeedID)
metricPullEventPrewriteCounter := pullEventCounter.WithLabelValues(cdcpb.Event_PREWRITE.String(), captureID, changefeedID)
metricPullEventRollbackCounter := pullEventCounter.WithLabelValues(cdcpb.Event_ROLLBACK.String(), captureID, changefeedID)
metricSendEventResolvedCounter := sendEventCounter.WithLabelValues("native resolved", captureID, changefeedID)
metricSendEventCommitCounter := sendEventCounter.WithLabelValues("commit", captureID, changefeedID)
metricSendEventCommittedCounter := sendEventCounter.WithLabelValues("committed", captureID, changefeedID)
metricEventSize := eventSize.WithLabelValues(captureAddr)
metricPullEventInitializedCounter := pullEventCounter.WithLabelValues(cdcpb.Event_INITIALIZED.String(), captureAddr, changefeedID)
metricPullEventCommittedCounter := pullEventCounter.WithLabelValues(cdcpb.Event_COMMITTED.String(), captureAddr, changefeedID)
metricPullEventCommitCounter := pullEventCounter.WithLabelValues(cdcpb.Event_COMMIT.String(), captureAddr, changefeedID)
metricPullEventPrewriteCounter := pullEventCounter.WithLabelValues(cdcpb.Event_PREWRITE.String(), captureAddr, changefeedID)
metricPullEventRollbackCounter := pullEventCounter.WithLabelValues(cdcpb.Event_ROLLBACK.String(), captureAddr, changefeedID)
metricSendEventResolvedCounter := sendEventCounter.WithLabelValues("native resolved", captureAddr, changefeedID)
metricSendEventCommitCounter := sendEventCounter.WithLabelValues("commit", captureAddr, changefeedID)
metricSendEventCommittedCounter := sendEventCounter.WithLabelValues("committed", captureAddr, changefeedID)

initialized := false

Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (c CDCEtcdClient) GetTaskWorkload(
func (c CDCEtcdClient) PutTaskWorkload(
ctx context.Context,
changefeedID string,
captureID string,
captureID model.CaptureID,
info *model.TaskWorkload,
) error {
data, err := info.Marshal()
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ var (
Name: "scan_regions_duration_seconds",
Help: "The time it took to finish a scanRegions call.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18),
}, []string{"captureID"})
}, []string{"capture"})
eventSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "kvclient",
Name: "event_size_bytes",
Help: "Size of KV events.",
Buckets: prometheus.ExponentialBuckets(16, 2, 25),
}, []string{"captureID"})
}, []string{"capture"})
pullEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Expand Down
4 changes: 2 additions & 2 deletions cdc/metrics_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ var (
Namespace: "ticdc",
Subsystem: "processor",
Name: "update_info_duration_seconds",
Help: "The time it took to update sub change feed info.",
Help: "The time it took to update sub changefeed info.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18),
}, []string{"captureID"})
}, []string{"capture"})
waitEventPrepareDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Expand Down
64 changes: 36 additions & 28 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var (

type processor struct {
id string
captureID string
captureInfo model.CaptureInfo
changefeedID string
changefeed model.ChangeFeedInfo
limitter *puller.BlurResourceLimitter
Expand Down Expand Up @@ -134,7 +134,8 @@ func newProcessor(
session *concurrency.Session,
changefeed model.ChangeFeedInfo,
sink sink.Sink,
changefeedID, captureID string,
changefeedID string,
captureInfo model.CaptureInfo,
checkpointTs uint64,
errCh chan error,
) (*processor, error) {
Expand All @@ -150,7 +151,7 @@ func newProcessor(
}
cdcEtcdCli := kv.NewCDCEtcdClient(etcdCli)

tsRWriter, err := fNewTsRWriter(cdcEtcdCli, changefeedID, captureID)
tsRWriter, err := fNewTsRWriter(cdcEtcdCli, changefeedID, captureInfo.ID)
if err != nil {
return nil, errors.Annotate(err, "failed to create ts RWriter")
}
Expand All @@ -176,7 +177,7 @@ func newProcessor(
p := &processor{
id: uuid.New().String(),
limitter: limitter,
captureID: captureID,
captureInfo: captureInfo,
changefeedID: changefeedID,
changefeed: changefeed,
pdCli: pdCli,
Expand Down Expand Up @@ -265,7 +266,8 @@ func (p *processor) wait() {
err := p.wg.Wait()
if err != nil && errors.Cause(err) != context.Canceled {
log.Error("processor wait error",
zap.String("captureID", p.captureID),
zap.Reflect("captureid", p.captureInfo.ID),
overvenus marked this conversation as resolved.
Show resolved Hide resolved
zap.Reflect("captureaddr", p.captureInfo.AdvertiseAddr),
overvenus marked this conversation as resolved.
Show resolved Hide resolved
zap.String("changefeedID", p.changefeedID),
zap.Error(err),
)
Expand Down Expand Up @@ -303,7 +305,9 @@ func (p *processor) positionWorker(ctx context.Context) error {
}
return inErr
})
updateInfoDuration.WithLabelValues(p.captureID).Observe(time.Since(t0Update).Seconds())
updateInfoDuration.
WithLabelValues(p.captureInfo.AdvertiseAddr).
Observe(time.Since(t0Update).Seconds())
if err != nil {
return errors.Annotate(err, "failed to update info")
}
Expand All @@ -324,8 +328,8 @@ func (p *processor) positionWorker(ctx context.Context) error {
log.Info("Local resolved worker exited")
}()

resolvedTsGauge := resolvedTsGauge.WithLabelValues(p.changefeedID, p.captureID)
checkpointTsGauge := checkpointTsGauge.WithLabelValues(p.changefeedID, p.captureID)
resolvedTsGauge := resolvedTsGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
checkpointTsGauge := checkpointTsGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)

for {
select {
Expand Down Expand Up @@ -398,7 +402,7 @@ func (p *processor) ddlPullWorker(ctx context.Context) error {

func (p *processor) workloadWorker(ctx context.Context) error {
t := time.NewTicker(10 * time.Second)
err := p.etcdCli.PutTaskWorkload(ctx, p.changefeedID, p.captureID, nil)
err := p.etcdCli.PutTaskWorkload(ctx, p.changefeedID, p.captureInfo.ID, nil)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -414,7 +418,7 @@ func (p *processor) workloadWorker(ctx context.Context) error {
workload[table.id] = table.workload
}
p.stateMu.Unlock()
err := p.etcdCli.PutTaskWorkload(ctx, p.changefeedID, p.captureID, &workload)
err := p.etcdCli.PutTaskWorkload(ctx, p.changefeedID, p.captureInfo.ID, &workload)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -450,7 +454,9 @@ func (p *processor) updateInfo(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Set(float64(len(p.status.Tables)))
syncTableNumGauge.
WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).
Set(float64(len(p.status.Tables)))
err = updatePosition()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -488,9 +494,9 @@ func (p *processor) removeTable(tableID int64) {
table.cancel()
delete(p.tables, tableID)
tableIDStr := strconv.FormatInt(tableID, 10)
tableInputChanSizeGauge.DeleteLabelValues(p.changefeedID, p.captureID, tableIDStr)
tableResolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureID, tableIDStr)
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Dec()
tableInputChanSizeGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableIDStr)
tableResolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableIDStr)
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Dec()
}

// handleTables handles table scheduler on this processor, add or remove table puller
Expand Down Expand Up @@ -710,7 +716,7 @@ func (p *processor) collectMetrics(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-time.After(defaultMetricInterval):
tableOutputChanSizeGauge.WithLabelValues(p.changefeedID, p.captureID).Set(float64(len(p.output)))
tableOutputChanSizeGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Set(float64(len(p.output)))
}
}
}
Expand Down Expand Up @@ -784,7 +790,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
}
}()
go func() {
resolvedTsGauge := tableResolvedTsGauge.WithLabelValues(p.changefeedID, p.captureID, strconv.FormatInt(table.id, 10))
resolvedTsGauge := tableResolvedTsGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, strconv.FormatInt(table.id, 10))
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -847,25 +853,25 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
if p.position.ResolvedTs > replicaInfo.StartTs {
p.position.ResolvedTs = replicaInfo.StartTs
}
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Inc()
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Inc()
}

func (p *processor) stop(ctx context.Context) error {
log.Info("stop processor", zap.String("id", p.id), zap.String("capture", p.captureID), zap.String("changefeed", p.changefeedID))
log.Info("stop processor", zap.String("id", p.id), zap.String("capture", p.captureInfo.AdvertiseAddr), zap.String("changefeed", p.changefeedID))
p.stateMu.Lock()
for _, tbl := range p.tables {
tbl.cancel()
}
// mark tables share the same context with its original table, don't need to cancel
p.stateMu.Unlock()
atomic.StoreInt32(&p.stopped, 1)
if err := p.etcdCli.DeleteTaskPosition(ctx, p.changefeedID, p.captureID); err != nil {
if err := p.etcdCli.DeleteTaskPosition(ctx, p.changefeedID, p.captureInfo.ID); err != nil {
return err
}
if err := p.etcdCli.DeleteTaskStatus(ctx, p.changefeedID, p.captureID); err != nil {
if err := p.etcdCli.DeleteTaskStatus(ctx, p.changefeedID, p.captureInfo.ID); err != nil {
return err
}
if err := p.etcdCli.DeleteTaskWorkload(ctx, p.changefeedID, p.captureID); err != nil {
if err := p.etcdCli.DeleteTaskWorkload(ctx, p.changefeedID, p.captureInfo.ID); err != nil {
return err
}
return p.sink.Close()
Expand All @@ -881,15 +887,15 @@ func runProcessor(
session *concurrency.Session,
info model.ChangeFeedInfo,
changefeedID string,
captureID string,
captureInfo model.CaptureInfo,
checkpointTs uint64,
) (*processor, error) {
opts := make(map[string]string, len(info.Opts)+2)
for k, v := range info.Opts {
opts[k] = v
}
opts[sink.OptChangefeedID] = changefeedID
opts[sink.OptCaptureID] = captureID
opts[sink.OptCaptureAddr] = captureInfo.AdvertiseAddr
ctx = util.PutChangefeedIDInCtx(ctx, changefeedID)
filter, err := filter.NewFilter(info.Config)
if err != nil {
Expand All @@ -902,29 +908,31 @@ func runProcessor(
cancel()
return nil, errors.Trace(err)
}
processor, err := newProcessor(ctx, session, info, sink, changefeedID, captureID, checkpointTs, errCh)
processor, err := newProcessor(ctx, session, info, sink, changefeedID, captureInfo, checkpointTs, errCh)
if err != nil {
cancel()
return nil, err
}
log.Info("start to run processor", zap.String("changefeed id", changefeedID))

processorErrorCounter.WithLabelValues(changefeedID, captureID).Add(0)
processorErrorCounter.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr).Add(0)
processor.Run(ctx)

go func() {
err := <-errCh
cause := errors.Cause(err)
if cause != nil && cause != context.Canceled && cause != model.ErrAdminStopProcessor {
processorErrorCounter.WithLabelValues(changefeedID, captureID).Inc()
processorErrorCounter.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr).Inc()
log.Error("error on running processor",
zap.String("captureid", captureID),
zap.String("captureid", captureInfo.ID),
zap.String("captureaddr", captureInfo.AdvertiseAddr),
zap.String("changefeedid", changefeedID),
zap.String("processorid", processor.id),
zap.Error(err))
} else {
log.Info("processor exited",
zap.String("captureid", captureID),
zap.String("captureid", captureInfo.ID),
zap.String("captureaddr", captureInfo.AdvertiseAddr),
zap.String("changefeedid", changefeedID),
zap.String("processorid", processor.id))
}
Expand Down
12 changes: 6 additions & 6 deletions cdc/puller/entry_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ func NewEntrySorter() *EntrySorter {

// Run runs EntrySorter
func (es *EntrySorter) Run(ctx context.Context) error {
captureID := util.CaptureIDFromCtx(ctx)
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
tableIDStr := strconv.FormatInt(util.TableIDFromCtx(ctx), 10)
metricEntrySorterResolvedChanSizeGuage := entrySorterResolvedChanSizeGauge.WithLabelValues(captureID, changefeedID, tableIDStr)
metricEntrySorterOutputChanSizeGauge := entrySorterOutputChanSizeGauge.WithLabelValues(captureID, changefeedID, tableIDStr)
metricEntryUnsortedSizeGauge := entrySorterUnsortedSizeGauge.WithLabelValues(captureID, changefeedID, tableIDStr)
metricEntrySorterSortDuration := entrySorterSortDuration.WithLabelValues(captureID, changefeedID, tableIDStr)
metricEntrySorterMergeDuration := entrySorterMergeDuration.WithLabelValues(captureID, changefeedID, tableIDStr)
metricEntrySorterResolvedChanSizeGuage := entrySorterResolvedChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEntrySorterOutputChanSizeGauge := entrySorterOutputChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEntryUnsortedSizeGauge := entrySorterUnsortedSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEntrySorterSortDuration := entrySorterSortDuration.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEntrySorterMergeDuration := entrySorterMergeDuration.WithLabelValues(captureAddr, changefeedID, tableIDStr)

lessFunc := func(i *model.PolymorphicEvent, j *model.PolymorphicEvent) bool {
if i.CRTs == j.CRTs {
Expand Down
18 changes: 9 additions & 9 deletions cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,18 @@ func (p *pullerImpl) Run(ctx context.Context) error {
})
}

captureID := util.CaptureIDFromCtx(ctx)
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
tableID := util.TableIDFromCtx(ctx)
tableIDStr := strconv.FormatInt(tableID, 10)
metricOutputChanSize := outputChanSizeGauge.WithLabelValues(captureID, changefeedID, tableIDStr)
metricEventChanSize := eventChanSizeGauge.WithLabelValues(captureID, changefeedID, tableIDStr)
metricMemBufferSize := memBufferSizeGauge.WithLabelValues(captureID, changefeedID, tableIDStr)
metricPullerResolvedTs := pullerResolvedTsGauge.WithLabelValues(captureID, changefeedID, tableIDStr)
metricEventCounterKv := kvEventCounter.WithLabelValues(captureID, changefeedID, "kv")
metricEventCounterResolved := kvEventCounter.WithLabelValues(captureID, changefeedID, "resolved")
metricTxnCollectCounterKv := txnCollectCounter.WithLabelValues(captureID, changefeedID, tableIDStr, "kv")
metricTxnCollectCounterResolved := txnCollectCounter.WithLabelValues(captureID, changefeedID, tableIDStr, "kv")
metricOutputChanSize := outputChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEventChanSize := eventChanSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricMemBufferSize := memBufferSizeGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricPullerResolvedTs := pullerResolvedTsGauge.WithLabelValues(captureAddr, changefeedID, tableIDStr)
metricEventCounterKv := kvEventCounter.WithLabelValues(captureAddr, changefeedID, "kv")
metricEventCounterResolved := kvEventCounter.WithLabelValues(captureAddr, changefeedID, "resolved")
metricTxnCollectCounterKv := txnCollectCounter.WithLabelValues(captureAddr, changefeedID, tableIDStr, "kv")
metricTxnCollectCounterResolved := txnCollectCounter.WithLabelValues(captureAddr, changefeedID, tableIDStr, "kv")

g.Go(func() error {
for {
Expand Down
4 changes: 2 additions & 2 deletions cdc/roles/storage/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ type ProcessorTsEtcdRWriter struct {

// NewProcessorTsEtcdRWriter returns a new `*ChangeFeedRWriter` instance
func NewProcessorTsEtcdRWriter(cli kv.CDCEtcdClient, changefeedID, captureID string) (*ProcessorTsEtcdRWriter, error) {
logger := log.L().With(zap.String("changefeed id", changefeedID)).
With(zap.String("capture id", captureID))
logger := log.L().With(zap.String("changefeedid", changefeedID)).
With(zap.String("captureid", captureID))

rw := &ProcessorTsEtcdRWriter{
etcdClient: cli,
Expand Down
Loading