Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temporary fix for ErrSnapshotSchemaNotFound and ErrSchemaStorageGCed #1069

Merged
merged 31 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4bc970c
WIP
liuzix Nov 11, 2020
75af669
fix integration test
liuzix Nov 11, 2020
54d58c9
fix integration test
liuzix Nov 11, 2020
1dabb0b
fix integration test
liuzix Nov 12, 2020
fe03dcb
fix integration test
liuzix Nov 12, 2020
e736022
fix integration test
liuzix Nov 12, 2020
093856f
fix integration test
liuzix Nov 12, 2020
db2734e
fix startTs
liuzix Nov 13, 2020
b22104a
add fatal for startTs < checkpoint
liuzix Nov 13, 2020
1015ff1
Test
liuzix Nov 13, 2020
c9e1733
debug log
liuzix Nov 13, 2020
f837e10
move fatal
liuzix Nov 13, 2020
414dd94
Merge branch 'master' of github.com:pingcap/ticdc into zixiong-fix-mo…
liuzix Nov 20, 2020
69671c0
delay SchemaStorage GC
liuzix Nov 20, 2020
2f7b9ed
fix errordoc
liuzix Nov 20, 2020
c81896f
fix errordoc
liuzix Nov 20, 2020
0ff9f62
fix errordoc
liuzix Nov 20, 2020
330c9c5
Merge branch 'master' of github.com:pingcap/ticdc into zixiong-fix-mo…
liuzix Nov 23, 2020
ef4da3c
fix leak test
liuzix Nov 23, 2020
e35c0da
reduce move_table workload
liuzix Nov 23, 2020
672f5c8
fix leak test
liuzix Nov 23, 2020
52111a9
modify integration test
liuzix Nov 23, 2020
d8d4a9c
modify integration test
liuzix Nov 23, 2020
d3dcfb2
modify integration test
liuzix Nov 23, 2020
faca2a6
modify table mover
liuzix Nov 23, 2020
237f93c
modify integration test script
liuzix Nov 23, 2020
017c319
add retry to table mover
liuzix Nov 23, 2020
ec02d7b
fix table mover
liuzix Nov 23, 2020
c378df5
fix table mover
liuzix Nov 23, 2020
9006316
Merge branch 'master' into zixiong-fix-move-table-startts
Nov 24, 2020
15c16d7
Merge branch 'master' into zixiong-fix-move-table-startts
ti-srebot Nov 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,13 +525,15 @@ func (c *changeFeed) handleMoveTableJobs(ctx context.Context, captures map[model
case model.MoveTableStatusDeleted:
// add table to target capture
status, exist := cloneStatus(job.To)
replicaInfo := job.TableReplicaInfo.Clone()
replicaInfo.StartTs = c.status.CheckpointTs
if !exist {
// the target capture is not exist, add table to orphanTables.
c.orphanTables[tableID] = job.TableReplicaInfo.StartTs
c.orphanTables[tableID] = replicaInfo.StartTs
log.Warn("the target capture is not exist, sent the table to orphanTables", zap.Reflect("job", job))
continue
}
status.AddTable(tableID, job.TableReplicaInfo, job.TableReplicaInfo.StartTs)
status.AddTable(tableID, replicaInfo, c.status.CheckpointTs)
job.Status = model.MoveTableStatusFinished
delete(c.moveTableJobs, tableID)
log.Info("handle the move job, add table to the target capture", zap.Reflect("job", job))
Expand Down
5 changes: 5 additions & 0 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,11 @@ func (o *Owner) cleanUpStaleTasks(ctx context.Context, captures []*model.Capture
captureIDs[captureID] = struct{}{}
}

log.Debug("cleanUpStaleTasks",
zap.Reflect("statuses", statuses),
zap.Reflect("positions", positions),
zap.Reflect("workloads", workloads))

for captureID := range captureIDs {
if _, ok := active[captureID]; !ok {
status, ok1 := statuses[captureID]
Expand Down
20 changes: 19 additions & 1 deletion cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ const (
defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G

defaultSyncResolvedBatch = 1024

schemaStorageGCLag = time.Minute * 20
)

var (
Expand All @@ -86,6 +88,7 @@ type processor struct {
globalResolvedTs uint64
localResolvedTs uint64
checkpointTs uint64
globalcheckpointTs uint64
flushCheckpointInterval time.Duration

ddlPuller puller.Puller
Expand Down Expand Up @@ -664,12 +667,17 @@ func (p *processor) globalStatusWorker(ctx context.Context) error {
defer globalResolvedTsNotifier.Close()

updateStatus := func(changefeedStatus *model.ChangeFeedStatus) {
atomic.StoreUint64(&p.globalcheckpointTs, changefeedStatus.CheckpointTs)
if lastResolvedTs == changefeedStatus.ResolvedTs &&
lastCheckPointTs == changefeedStatus.CheckpointTs {
return
}
if lastCheckPointTs < changefeedStatus.CheckpointTs {
p.schemaStorage.DoGC(changefeedStatus.CheckpointTs)
// Delay GC to accommodate pullers starting from a startTs that's too small
// TODO fix startTs problem and remove GC delay, or use other mechanism that prevents the problem deterministically
gcTime := oracle.GetTimeFromTS(changefeedStatus.CheckpointTs).Add(-schemaStorageGCLag)
gcTs := oracle.ComposeTS(gcTime.Unix(), 0)
p.schemaStorage.DoGC(gcTs)
lastCheckPointTs = changefeedStatus.CheckpointTs
}
if lastResolvedTs < changefeedStatus.ResolvedTs {
Expand Down Expand Up @@ -937,6 +945,16 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
return
}
}

globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs)

if replicaInfo.StartTs < globalcheckpointTs {
log.Warn("addTable: startTs < checkpoint",
zap.Int64("tableID", tableID),
zap.Uint64("checkpoint", globalcheckpointTs),
zap.Uint64("startTs", replicaInfo.StartTs))
}

globalResolvedTs := atomic.LoadUint64(&p.sinkEmittedResolvedTs)
log.Debug("Add table", zap.Int64("tableID", tableID),
zap.String("name", tableName),
Expand Down
4 changes: 3 additions & 1 deletion pkg/notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ func (s *notifySuite) TestNotifyHub(c *check.C) {
r1 := notifier.NewReceiver(-1)
r2 := notifier.NewReceiver(-1)
r3 := notifier.NewReceiver(-1)
finishedCh := make(chan struct{})
go func() {
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
notifier.Notify()
}
close(finishedCh)
}()
<-r1.C
r1.Stop()
Expand All @@ -50,7 +52,6 @@ func (s *notifySuite) TestNotifyHub(c *check.C) {
r2.Stop()
r3.Stop()
c.Assert(len(notifier.receivers), check.Equals, 0)
time.Sleep(time.Second)
r4 := notifier.NewReceiver(-1)
<-r4.C
r4.Stop()
Expand All @@ -59,6 +60,7 @@ func (s *notifySuite) TestNotifyHub(c *check.C) {
r5 := notifier2.NewReceiver(10 * time.Millisecond)
<-r5.C
r5.Stop()
<-finishedCh // To make the leak checker happy
}

func (s *notifySuite) TestContinusStop(c *check.C) {
Expand Down
27 changes: 27 additions & 0 deletions tests/move_table/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# diff Configuration.

log-level = "info"
chunk-size = 10
check-thread-count = 4
sample-percent = 100
use-rowid = false
use-checksum = true
fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "move_table"
tables = ["~usertable.*"]

[[source-db]]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
instance-id = "source-1"

[target-db]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
13 changes: 13 additions & 0 deletions tests/move_table/conf/workload
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
threadcount=10
recordcount=60000
operationcount=0
workload=core

readallfields=true

readproportion=0
updateproportion=0
scanproportion=0
insertproportion=0

requestdistribution=uniform
Loading