Skip to content

Commit

Permalink
scheduler, processor(ticdc): replace some panic with error (#9123) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 7, 2023
1 parent 6330ad2 commit f5ee171
Show file tree
Hide file tree
Showing 16 changed files with 106 additions and 64 deletions.
8 changes: 8 additions & 0 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,14 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
resolvedTs = m.sourceManager.GetTableResolvedTs(span)
}

if resolvedTs < checkpointTs.ResolvedMark() {
log.Error("sinkManager: resolved ts should not less than checkpoint ts",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Uint64("resolvedTs", resolvedTs),
zap.Any("checkpointTs", checkpointTs))
}
return TableStats{
CheckpointTs: checkpointTs.ResolvedMark(),
ResolvedTs: resolvedTs,
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func addTableAndAddEventsToSortEngine(
engine engine.SortEngine,
span tablepb.Span,
) {
engine.AddTable(span)
engine.AddTable(span, 0)
events := []*model.PolymorphicEvent{
{
StartTs: 1,
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/redo_log_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (suite *redoLogWorkerSuite) addEventsToSortEngine(
events []*model.PolymorphicEvent,
sortEngine engine.SortEngine,
) {
sortEngine.AddTable(suite.testSpan)
sortEngine.AddTable(suite.testSpan, 0)
for _, event := range events {
sortEngine.Add(suite.testSpan, event)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (suite *tableSinkWorkerSuite) addEventsToSortEngine(
events []*model.PolymorphicEvent,
sortEngine engine.SortEngine,
) {
sortEngine.AddTable(suite.testSpan)
sortEngine.AddTable(suite.testSpan, 0)
for _, event := range events {
sortEngine.Add(suite.testSpan, event)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sourcemanager/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type SortEngine interface {
IsTableBased() bool

// AddTable adds the table into the engine.
AddTable(span tablepb.Span)
AddTable(span tablepb.Span, startTs model.Ts)

// RemoveTable removes the table from the engine.
RemoveTable(span tablepb.Span)
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sourcemanager/engine/memory/event_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (s *EventSorter) IsTableBased() bool {
}

// AddTable implements engine.SortEngine.
func (s *EventSorter) AddTable(span tablepb.Span) {
resolvedTs := model.Ts(0)
func (s *EventSorter) AddTable(span tablepb.Span, startTs model.Ts) {
resolvedTs := startTs
if _, exists := s.tables.LoadOrStore(span, &tableSorter{resolvedTs: &resolvedTs}); exists {
log.Panic("add an exist table", zap.Stringer("span", &span))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestEventSorter(t *testing.T) {

span := spanz.TableIDToComparableSpan(1)
es := New(context.Background())
es.AddTable(span)
es.AddTable(span, 0)
var nextToFetch engine.Position
for _, tc := range testCases {
for _, entry := range tc.input {
Expand Down
8 changes: 4 additions & 4 deletions cdc/processor/sourcemanager/engine/mock/engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions cdc/processor/sourcemanager/engine/pebble/event_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *EventSorter) IsTableBased() bool {
}

// AddTable implements engine.SortEngine.
func (s *EventSorter) AddTable(span tablepb.Span) {
func (s *EventSorter) AddTable(span tablepb.Span, startTs model.Ts) {
s.mu.Lock()
if _, exists := s.tables.Get(span); exists {
s.mu.Unlock()
Expand All @@ -123,10 +123,12 @@ func (s *EventSorter) AddTable(span tablepb.Span) {
zap.Stringer("span", &span))
return
}
s.tables.ReplaceOrInsert(span, &tableState{
state := &tableState{
uniqueID: genUniqueID(),
ch: s.channs[getDB(span, len(s.dbs))],
})
}
state.maxReceivedResolvedTs.Store(startTs)
s.tables.ReplaceOrInsert(span, state)
s.mu.Unlock()
}

Expand Down
13 changes: 7 additions & 6 deletions cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ func TestTableOperations(t *testing.T) {
require.True(t, s.IsTableBased())

span := spanz.TableIDToComparableSpan(1)
s.AddTable(span)
s.AddTable(span)
s.AddTable(span, 2)
s.AddTable(span, 1)

require.Equal(t, model.Ts(0), s.GetResolvedTs(span))
require.Equal(t, model.Ts(2), s.GetResolvedTs(span))

s.RemoveTable(span)
s.RemoveTable(span)
Expand All @@ -64,7 +64,7 @@ func TestNoResolvedTs(t *testing.T) {
require.True(t, s.IsTableBased())

span := spanz.TableIDToComparableSpan(1)
s.AddTable(span)
s.AddTable(span, 0)
resolvedTs := make(chan model.Ts)
s.OnResolve(func(_ tablepb.Span, ts model.Ts) { resolvedTs <- ts })

Expand Down Expand Up @@ -95,7 +95,7 @@ func TestEventFetch(t *testing.T) {
require.True(t, s.IsTableBased())

span := spanz.TableIDToComparableSpan(1)
s.AddTable(span)
s.AddTable(span, 1)
resolvedTs := make(chan model.Ts)
s.OnResolve(func(_ tablepb.Span, ts model.Ts) { resolvedTs <- ts })

Expand Down Expand Up @@ -128,6 +128,7 @@ func TestEventFetch(t *testing.T) {

s.Add(span, inputEvents...)
s.Add(span, model.NewResolvedPolymorphicEvent(0, 4))
require.Equal(t, model.Ts(4), s.GetResolvedTs(span))

sortedEvents := make([]*model.PolymorphicEvent, 0, len(inputEvents))
sortedPositions := make([]engine.Position, 0, len(inputEvents))
Expand Down Expand Up @@ -177,7 +178,7 @@ func TestCleanData(t *testing.T) {
require.True(t, s.IsTableBased())

span := spanz.TableIDToComparableSpan(1)
s.AddTable(span)
s.AddTable(span, 0)
require.Panics(t, func() {
s.CleanByTable(spanz.TableIDToComparableSpan(2), engine.Position{})
})
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func NewForTest(
// AddTable adds a table to the source manager. Start puller and register table to the engine.
func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs model.Ts) {
// Add table to the engine first, so that the engine can receive the events from the puller.
m.engine.AddTable(span)
m.engine.AddTable(span, startTs)
p := m.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode)
p.Start(m.ctx, m.up, m.engine, m.errChan)
m.pullers.Store(span, p)
Expand Down
36 changes: 24 additions & 12 deletions cdc/scheduler/internal/v3/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"time"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand All @@ -27,7 +26,7 @@ import (
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/transport"
"github.com/pingcap/tiflow/cdc/scheduler/schedulepb"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/version"
Expand Down Expand Up @@ -148,7 +147,7 @@ func newAgent(

revision, err := etcdClient.GetOwnerRevision(etcdCliCtx, ownerCaptureID)
if err != nil {
if cerror.ErrOwnerNotFound.Equal(err) || cerror.ErrNotOwner.Equal(err) {
if errors.ErrOwnerNotFound.Equal(err) || errors.ErrNotOwner.Equal(err) {
// These are expected errors when no owner has been elected
log.Info("schedulerv3: no owner found when querying for the owner revision",
zap.String("ownerCaptureID", ownerCaptureID),
Expand Down Expand Up @@ -207,7 +206,10 @@ func (a *agent) Tick(ctx context.Context) (*schedulepb.Barrier, error) {
return nil, errors.Trace(err)
}

outboundMessages, barrier := a.handleMessage(inboundMessages)
outboundMessages, barrier, err := a.handleMessage(inboundMessages)
if err != nil {
return nil, errors.Trace(err)
}

responses, err := a.tableM.poll(ctx)
if err != nil {
Expand Down Expand Up @@ -236,10 +238,8 @@ func (a *agent) handleLivenessUpdate(liveness model.Liveness) {
}

func (a *agent) handleMessage(msg []*schedulepb.Message) (
[]*schedulepb.Message, *schedulepb.Barrier,
result []*schedulepb.Message, barrier *schedulepb.Barrier, err error,
) {
result := make([]*schedulepb.Message, 0)
var barrier *schedulepb.Barrier
for _, message := range msg {
ownerCaptureID := message.GetFrom()
header := message.GetHeader()
Expand All @@ -254,7 +254,10 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) (
switch message.GetMsgType() {
case schedulepb.MsgHeartbeat:
var reMsg *schedulepb.Message
reMsg, barrier = a.handleMessageHeartbeat(message.GetHeartbeat())
reMsg, barrier, err = a.handleMessageHeartbeat(message.GetHeartbeat())
if err != nil {
return
}
result = append(result, reMsg)
case schedulepb.MsgDispatchTableRequest:
a.handleMessageDispatchTableRequest(message.DispatchTableRequest, processorEpoch)
Expand All @@ -266,22 +269,31 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) (
zap.Any("message", message))
}
}
return result, barrier
return
}

func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) (
*schedulepb.Message, *schedulepb.Barrier,
*schedulepb.Message, *schedulepb.Barrier, error,
) {
allTables := a.tableM.getAllTableSpans()
result := make([]tablepb.TableStatus, 0, allTables.Len())

isValidCheckpointTs := true
allTables.Ascend(func(span tablepb.Span, table *tableSpan) bool {
status := table.getTableSpanStatus(request.CollectStats)
isValidCheckpointTs = status.Checkpoint.CheckpointTs <= status.Checkpoint.ResolvedTs
if table.task != nil && table.task.IsRemove {
status.State = tablepb.TableStateStopping
}
result = append(result, status)
return true
return isValidCheckpointTs
})
if !isValidCheckpointTs {
status := result[len(result)-1]
checkpointTs := status.Checkpoint.CheckpointTs
resolvedTs := status.Checkpoint.ResolvedTs
return nil, nil, errors.ErrInvalidCheckpointTs.GenWithStackByArgs(checkpointTs, resolvedTs)
}
for _, span := range request.GetSpans() {
if _, ok := allTables.Get(span); !ok {
status := a.tableM.getTableSpanStatus(span, request.CollectStats)
Expand All @@ -308,7 +320,7 @@ func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) (
zap.String("changefeed", a.ChangeFeedID.ID),
zap.Any("message", message))

return message, request.GetBarrier()
return message, request.GetBarrier(), nil
}

type dispatchTableTaskStatus int32
Expand Down
20 changes: 10 additions & 10 deletions cdc/scheduler/internal/v3/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) {
},
}

response, _ := a.handleMessage([]*schedulepb.Message{heartbeat})
response, _, _ := a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)
require.Equal(t, model.LivenessCaptureAlive, response[0].GetHeartbeatResponse().Liveness)

Expand All @@ -376,21 +376,21 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) {
}

a.tableM.tables.GetV(spanz.TableIDToComparableSpan(1)).task = &dispatchTableTask{IsRemove: true}
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
result = response[0].GetHeartbeatResponse().Tables
sort.Slice(result, func(i, j int) bool {
return result[i].Span.TableID < result[j].Span.TableID
})
require.Equal(t, tablepb.TableStateStopping, result[1].State)

a.handleLivenessUpdate(model.LivenessCaptureStopping)
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)
require.Equal(t, model.LivenessCaptureStopping, response[0].GetHeartbeatResponse().Liveness)

a.handleLivenessUpdate(model.LivenessCaptureAlive)
heartbeat.Heartbeat.IsStopping = true
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Equal(t, model.LivenessCaptureStopping, response[0].GetHeartbeatResponse().Liveness)
require.Equal(t, model.LivenessCaptureStopping, a.liveness.Load())
}
Expand Down Expand Up @@ -577,7 +577,7 @@ func TestAgentHandleMessage(t *testing.T) {
}

// handle the first heartbeat, from the known owner.
response, _ := a.handleMessage([]*schedulepb.Message{heartbeat})
response, _, _ := a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)

addTableRequest := &schedulepb.Message{
Expand All @@ -600,17 +600,17 @@ func TestAgentHandleMessage(t *testing.T) {
},
}
// wrong epoch, ignored
responses, _ := a.handleMessage([]*schedulepb.Message{addTableRequest})
responses, _, _ := a.handleMessage([]*schedulepb.Message{addTableRequest})
require.False(t, tableM.tables.Has(spanz.TableIDToComparableSpan(1)))
require.Len(t, responses, 0)

// correct epoch, processing.
addTableRequest.Header.ProcessorEpoch = a.Epoch
_, _ = a.handleMessage([]*schedulepb.Message{addTableRequest})
_, _, _ = a.handleMessage([]*schedulepb.Message{addTableRequest})
require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1)))

heartbeat.Header.OwnerRevision.Revision = 2
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)

// this should never happen in real world
Expand All @@ -624,12 +624,12 @@ func TestAgentHandleMessage(t *testing.T) {
From: a.ownerInfo.ID,
}

response, _ = a.handleMessage([]*schedulepb.Message{unknownMessage})
response, _, _ = a.handleMessage([]*schedulepb.Message{unknownMessage})
require.Len(t, response, 0)

// staled message
heartbeat.Header.OwnerRevision.Revision = 1
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 0)
}

Expand Down
Loading

0 comments on commit f5ee171

Please sign in to comment.