Skip to content

Commit

Permalink
scheduler(ticdc): fix incorrect scheduling task counter (pingcap#9840)
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored Oct 9, 2023
1 parent 4e8db94 commit f98ae2b
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 72 deletions.
2 changes: 1 addition & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ func (p *processor) getTableName(ctx context.Context, tableID model.TableID) str
retry.WithIsRetryableErr(cerror.IsRetryableError))

if tableName == nil {
log.Warn("failed to get table name for metric")
log.Warn("failed to get table name for metric", zap.Any("tableID", tableID))
return strconv.Itoa(int(tableID))
}

Expand Down
49 changes: 47 additions & 2 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package replication
import (
"bytes"
"container/heap"
"fmt"
"math"
"time"

Expand Down Expand Up @@ -51,25 +52,53 @@ type BurstBalance struct {
MoveTables []MoveTable
}

func (b BurstBalance) String() string {
if len(b.AddTables) != 0 {
return fmt.Sprintf("BurstBalance, add tables: %v", b.AddTables)
}
if len(b.RemoveTables) != 0 {
return fmt.Sprintf("BurstBalance, remove tables: %v", b.RemoveTables)
}
if len(b.MoveTables) != 0 {
return fmt.Sprintf("BurstBalance, move tables: %v", b.MoveTables)
}
return "BurstBalance, no tables"
}

// MoveTable is a schedule task for moving a table.
type MoveTable struct {
Span tablepb.Span
DestCapture model.CaptureID
}

func (t MoveTable) String() string {
return fmt.Sprintf("MoveTable, span: %s, dest: %s",
t.Span.String(), t.DestCapture)
}

// AddTable is a schedule task for adding a table.
type AddTable struct {
Span tablepb.Span
CaptureID model.CaptureID
CheckpointTs model.Ts
}

func (t AddTable) String() string {
return fmt.Sprintf("AddTable, span: %s, capture: %s, checkpointTs: %d",
t.Span.String(), t.CaptureID, t.CheckpointTs)
}

// RemoveTable is a schedule task for removing a table.
type RemoveTable struct {
Span tablepb.Span
CaptureID model.CaptureID
}

func (t RemoveTable) String() string {
return fmt.Sprintf("RemoveTable, span: %s, capture: %s",
t.Span.String(), t.CaptureID)
}

// ScheduleTask is a schedule task that wraps add/move/remove table tasks.
type ScheduleTask struct { //nolint:revive
MoveTable *MoveTable
Expand All @@ -94,6 +123,22 @@ func (s *ScheduleTask) Name() string {
return "unknown"
}

func (s *ScheduleTask) String() string {
if s.MoveTable != nil {
return s.MoveTable.String()
}
if s.AddTable != nil {
return s.AddTable.String()
}
if s.RemoveTable != nil {
return s.RemoveTable.String()
}
if s.BurstBalance != nil {
return s.BurstBalance.String()
}
return ""
}

// Manager manages replications and running scheduling tasks.
type Manager struct { //nolint:revive
spans *spanz.BtreeMap[*ReplicationSet]
Expand Down Expand Up @@ -295,7 +340,7 @@ func (r *Manager) HandleTasks(
tasks []*ScheduleTask,
) ([]*schedulepb.Message, error) {
// Check if a running task is finished.
toBeDeleted := []tablepb.Span{}
var toBeDeleted []tablepb.Span
r.runningTasks.Ascend(func(span tablepb.Span, task *ScheduleTask) bool {
if table, ok := r.spans.Get(span); ok {
// If table is back to Replicating or Removed,
Expand Down Expand Up @@ -687,7 +732,7 @@ func (r *Manager) logSlowTableInfo(currentPDTime time.Time) {
zap.String("namespace", r.changefeedID.Namespace),
zap.String("changefeed", r.changefeedID.ID),
zap.Int64("tableID", table.Span.TableID),
zap.String("tableStatus", table.Stats.String()),
zap.String("tableStatus", table.State.String()),
zap.Uint64("checkpointTs", table.Checkpoint.CheckpointTs),
zap.Uint64("resolvedTs", table.Checkpoint.ResolvedTs),
zap.Duration("checkpointLag", currentPDTime.
Expand Down
Loading

0 comments on commit f98ae2b

Please sign in to comment.