Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler, processor(ticdc): replace some panic with error (#9123) #9140

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,14 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
resolvedTs = m.sourceManager.GetTableResolvedTs(span)
}

if resolvedTs < checkpointTs.ResolvedMark() {
log.Error("sinkManager: resolved ts should not less than checkpoint ts",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Uint64("resolvedTs", resolvedTs),
zap.Any("checkpointTs", checkpointTs))
}
return TableStats{
CheckpointTs: checkpointTs.ResolvedMark(),
ResolvedTs: resolvedTs,
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func addTableAndAddEventsToSortEngine(
engine engine.SortEngine,
span tablepb.Span,
) {
engine.AddTable(span)
engine.AddTable(span, 0)
events := []*model.PolymorphicEvent{
{
StartTs: 1,
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/redo_log_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (suite *redoLogWorkerSuite) addEventsToSortEngine(
events []*model.PolymorphicEvent,
sortEngine engine.SortEngine,
) {
sortEngine.AddTable(suite.testSpan)
sortEngine.AddTable(suite.testSpan, 0)
for _, event := range events {
sortEngine.Add(suite.testSpan, event)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (suite *tableSinkWorkerSuite) addEventsToSortEngine(
events []*model.PolymorphicEvent,
sortEngine engine.SortEngine,
) {
sortEngine.AddTable(suite.testSpan)
sortEngine.AddTable(suite.testSpan, 0)
for _, event := range events {
sortEngine.Add(suite.testSpan, event)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sourcemanager/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type SortEngine interface {
IsTableBased() bool

// AddTable adds the table into the engine.
AddTable(span tablepb.Span)
AddTable(span tablepb.Span, startTs model.Ts)

// RemoveTable removes the table from the engine.
RemoveTable(span tablepb.Span)
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sourcemanager/engine/memory/event_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (s *EventSorter) IsTableBased() bool {
}

// AddTable implements engine.SortEngine.
func (s *EventSorter) AddTable(span tablepb.Span) {
resolvedTs := model.Ts(0)
func (s *EventSorter) AddTable(span tablepb.Span, startTs model.Ts) {
resolvedTs := startTs
if _, exists := s.tables.LoadOrStore(span, &tableSorter{resolvedTs: &resolvedTs}); exists {
log.Panic("add an exist table", zap.Stringer("span", &span))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestEventSorter(t *testing.T) {

span := spanz.TableIDToComparableSpan(1)
es := New(context.Background())
es.AddTable(span)
es.AddTable(span, 0)
var nextToFetch engine.Position
for _, tc := range testCases {
for _, entry := range tc.input {
Expand Down
8 changes: 4 additions & 4 deletions cdc/processor/sourcemanager/engine/mock/engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions cdc/processor/sourcemanager/engine/pebble/event_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *EventSorter) IsTableBased() bool {
}

// AddTable implements engine.SortEngine.
func (s *EventSorter) AddTable(span tablepb.Span) {
func (s *EventSorter) AddTable(span tablepb.Span, startTs model.Ts) {
s.mu.Lock()
if _, exists := s.tables.Get(span); exists {
s.mu.Unlock()
Expand All @@ -123,10 +123,12 @@ func (s *EventSorter) AddTable(span tablepb.Span) {
zap.Stringer("span", &span))
return
}
s.tables.ReplaceOrInsert(span, &tableState{
state := &tableState{
uniqueID: genUniqueID(),
ch: s.channs[getDB(span, len(s.dbs))],
})
}
state.maxReceivedResolvedTs.Store(startTs)
s.tables.ReplaceOrInsert(span, state)
s.mu.Unlock()
}

Expand Down
13 changes: 7 additions & 6 deletions cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ func TestTableOperations(t *testing.T) {
require.True(t, s.IsTableBased())

span := spanz.TableIDToComparableSpan(1)
s.AddTable(span)
s.AddTable(span)
s.AddTable(span, 2)
s.AddTable(span, 1)

require.Equal(t, model.Ts(0), s.GetResolvedTs(span))
require.Equal(t, model.Ts(2), s.GetResolvedTs(span))

s.RemoveTable(span)
s.RemoveTable(span)
Expand All @@ -64,7 +64,7 @@ func TestNoResolvedTs(t *testing.T) {
require.True(t, s.IsTableBased())

span := spanz.TableIDToComparableSpan(1)
s.AddTable(span)
s.AddTable(span, 0)
resolvedTs := make(chan model.Ts)
s.OnResolve(func(_ tablepb.Span, ts model.Ts) { resolvedTs <- ts })

Expand Down Expand Up @@ -95,7 +95,7 @@ func TestEventFetch(t *testing.T) {
require.True(t, s.IsTableBased())

span := spanz.TableIDToComparableSpan(1)
s.AddTable(span)
s.AddTable(span, 1)
resolvedTs := make(chan model.Ts)
s.OnResolve(func(_ tablepb.Span, ts model.Ts) { resolvedTs <- ts })

Expand Down Expand Up @@ -128,6 +128,7 @@ func TestEventFetch(t *testing.T) {

s.Add(span, inputEvents...)
s.Add(span, model.NewResolvedPolymorphicEvent(0, 4))
require.Equal(t, model.Ts(4), s.GetResolvedTs(span))

sortedEvents := make([]*model.PolymorphicEvent, 0, len(inputEvents))
sortedPositions := make([]engine.Position, 0, len(inputEvents))
Expand Down Expand Up @@ -177,7 +178,7 @@ func TestCleanData(t *testing.T) {
require.True(t, s.IsTableBased())

span := spanz.TableIDToComparableSpan(1)
s.AddTable(span)
s.AddTable(span, 0)
require.Panics(t, func() {
s.CleanByTable(spanz.TableIDToComparableSpan(2), engine.Position{})
})
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func NewForTest(
// AddTable adds a table to the source manager. Start puller and register table to the engine.
func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs model.Ts) {
// Add table to the engine first, so that the engine can receive the events from the puller.
m.engine.AddTable(span)
m.engine.AddTable(span, startTs)
p := m.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode)
p.Start(m.ctx, m.up, m.engine, m.errChan)
m.pullers.Store(span, p)
Expand Down
36 changes: 24 additions & 12 deletions cdc/scheduler/internal/v3/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"time"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand All @@ -27,7 +26,7 @@ import (
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/transport"
"github.com/pingcap/tiflow/cdc/scheduler/schedulepb"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/version"
Expand Down Expand Up @@ -148,7 +147,7 @@ func newAgent(

revision, err := etcdClient.GetOwnerRevision(etcdCliCtx, ownerCaptureID)
if err != nil {
if cerror.ErrOwnerNotFound.Equal(err) || cerror.ErrNotOwner.Equal(err) {
if errors.ErrOwnerNotFound.Equal(err) || errors.ErrNotOwner.Equal(err) {
// These are expected errors when no owner has been elected
log.Info("schedulerv3: no owner found when querying for the owner revision",
zap.String("ownerCaptureID", ownerCaptureID),
Expand Down Expand Up @@ -207,7 +206,10 @@ func (a *agent) Tick(ctx context.Context) (*schedulepb.Barrier, error) {
return nil, errors.Trace(err)
}

outboundMessages, barrier := a.handleMessage(inboundMessages)
outboundMessages, barrier, err := a.handleMessage(inboundMessages)
if err != nil {
return nil, errors.Trace(err)
}

responses, err := a.tableM.poll(ctx)
if err != nil {
Expand Down Expand Up @@ -236,10 +238,8 @@ func (a *agent) handleLivenessUpdate(liveness model.Liveness) {
}

func (a *agent) handleMessage(msg []*schedulepb.Message) (
[]*schedulepb.Message, *schedulepb.Barrier,
result []*schedulepb.Message, barrier *schedulepb.Barrier, err error,
) {
result := make([]*schedulepb.Message, 0)
var barrier *schedulepb.Barrier
for _, message := range msg {
ownerCaptureID := message.GetFrom()
header := message.GetHeader()
Expand All @@ -254,7 +254,10 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) (
switch message.GetMsgType() {
case schedulepb.MsgHeartbeat:
var reMsg *schedulepb.Message
reMsg, barrier = a.handleMessageHeartbeat(message.GetHeartbeat())
reMsg, barrier, err = a.handleMessageHeartbeat(message.GetHeartbeat())
if err != nil {
return
}
result = append(result, reMsg)
case schedulepb.MsgDispatchTableRequest:
a.handleMessageDispatchTableRequest(message.DispatchTableRequest, processorEpoch)
Expand All @@ -266,22 +269,31 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) (
zap.Any("message", message))
}
}
return result, barrier
return
}

func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) (
*schedulepb.Message, *schedulepb.Barrier,
*schedulepb.Message, *schedulepb.Barrier, error,
) {
allTables := a.tableM.getAllTableSpans()
result := make([]tablepb.TableStatus, 0, allTables.Len())

isValidCheckpointTs := true
allTables.Ascend(func(span tablepb.Span, table *tableSpan) bool {
status := table.getTableSpanStatus(request.CollectStats)
isValidCheckpointTs = status.Checkpoint.CheckpointTs <= status.Checkpoint.ResolvedTs
if table.task != nil && table.task.IsRemove {
status.State = tablepb.TableStateStopping
}
result = append(result, status)
return true
return isValidCheckpointTs
})
if !isValidCheckpointTs {
status := result[len(result)-1]
checkpointTs := status.Checkpoint.CheckpointTs
resolvedTs := status.Checkpoint.ResolvedTs
return nil, nil, errors.ErrInvalidCheckpointTs.GenWithStackByArgs(checkpointTs, resolvedTs)
}
for _, span := range request.GetSpans() {
if _, ok := allTables.Get(span); !ok {
status := a.tableM.getTableSpanStatus(span, request.CollectStats)
Expand All @@ -308,7 +320,7 @@ func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) (
zap.String("changefeed", a.ChangeFeedID.ID),
zap.Any("message", message))

return message, request.GetBarrier()
return message, request.GetBarrier(), nil
}

type dispatchTableTaskStatus int32
Expand Down
20 changes: 10 additions & 10 deletions cdc/scheduler/internal/v3/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) {
},
}

response, _ := a.handleMessage([]*schedulepb.Message{heartbeat})
response, _, _ := a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)
require.Equal(t, model.LivenessCaptureAlive, response[0].GetHeartbeatResponse().Liveness)

Expand All @@ -376,21 +376,21 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) {
}

a.tableM.tables.GetV(spanz.TableIDToComparableSpan(1)).task = &dispatchTableTask{IsRemove: true}
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
result = response[0].GetHeartbeatResponse().Tables
sort.Slice(result, func(i, j int) bool {
return result[i].Span.TableID < result[j].Span.TableID
})
require.Equal(t, tablepb.TableStateStopping, result[1].State)

a.handleLivenessUpdate(model.LivenessCaptureStopping)
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)
require.Equal(t, model.LivenessCaptureStopping, response[0].GetHeartbeatResponse().Liveness)

a.handleLivenessUpdate(model.LivenessCaptureAlive)
heartbeat.Heartbeat.IsStopping = true
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Equal(t, model.LivenessCaptureStopping, response[0].GetHeartbeatResponse().Liveness)
require.Equal(t, model.LivenessCaptureStopping, a.liveness.Load())
}
Expand Down Expand Up @@ -577,7 +577,7 @@ func TestAgentHandleMessage(t *testing.T) {
}

// handle the first heartbeat, from the known owner.
response, _ := a.handleMessage([]*schedulepb.Message{heartbeat})
response, _, _ := a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)

addTableRequest := &schedulepb.Message{
Expand All @@ -600,17 +600,17 @@ func TestAgentHandleMessage(t *testing.T) {
},
}
// wrong epoch, ignored
responses, _ := a.handleMessage([]*schedulepb.Message{addTableRequest})
responses, _, _ := a.handleMessage([]*schedulepb.Message{addTableRequest})
require.False(t, tableM.tables.Has(spanz.TableIDToComparableSpan(1)))
require.Len(t, responses, 0)

// correct epoch, processing.
addTableRequest.Header.ProcessorEpoch = a.Epoch
_, _ = a.handleMessage([]*schedulepb.Message{addTableRequest})
_, _, _ = a.handleMessage([]*schedulepb.Message{addTableRequest})
require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1)))

heartbeat.Header.OwnerRevision.Revision = 2
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)

// this should never happen in real world
Expand All @@ -624,12 +624,12 @@ func TestAgentHandleMessage(t *testing.T) {
From: a.ownerInfo.ID,
}

response, _ = a.handleMessage([]*schedulepb.Message{unknownMessage})
response, _, _ = a.handleMessage([]*schedulepb.Message{unknownMessage})
require.Len(t, response, 0)

// staled message
heartbeat.Header.OwnerRevision.Revision = 1
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 0)
}

Expand Down
Loading