Skip to content

Commit

Permalink
Temporary fix for ErrSnapshotSchemaNotFound and ErrSchemaStorageGCed (#…
Browse files Browse the repository at this point in the history
…1069) (#1114)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Nov 24, 2020
1 parent 78fe6c8 commit b181f9d
Show file tree
Hide file tree
Showing 8 changed files with 408 additions and 4 deletions.
6 changes: 4 additions & 2 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,13 +525,15 @@ func (c *changeFeed) handleMoveTableJobs(ctx context.Context, captures map[model
case model.MoveTableStatusDeleted:
// add table to target capture
status, exist := cloneStatus(job.To)
replicaInfo := job.TableReplicaInfo.Clone()
replicaInfo.StartTs = c.status.CheckpointTs
if !exist {
// the target capture is not exist, add table to orphanTables.
c.orphanTables[tableID] = job.TableReplicaInfo.StartTs
c.orphanTables[tableID] = replicaInfo.StartTs
log.Warn("the target capture is not exist, sent the table to orphanTables", zap.Reflect("job", job))
continue
}
status.AddTable(tableID, job.TableReplicaInfo, job.TableReplicaInfo.StartTs)
status.AddTable(tableID, replicaInfo, c.status.CheckpointTs)
job.Status = model.MoveTableStatusFinished
delete(c.moveTableJobs, tableID)
log.Info("handle the move job, add table to the target capture", zap.Reflect("job", job))
Expand Down
5 changes: 5 additions & 0 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,11 @@ func (o *Owner) cleanUpStaleTasks(ctx context.Context, captures []*model.Capture
captureIDs[captureID] = struct{}{}
}

log.Debug("cleanUpStaleTasks",
zap.Reflect("statuses", statuses),
zap.Reflect("positions", positions),
zap.Reflect("workloads", workloads))

for captureID := range captureIDs {
if _, ok := active[captureID]; !ok {
status, ok1 := statuses[captureID]
Expand Down
20 changes: 19 additions & 1 deletion cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ const (
defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G

defaultSyncResolvedBatch = 1024

schemaStorageGCLag = time.Minute * 20
)

var (
Expand All @@ -86,6 +88,7 @@ type processor struct {
globalResolvedTs uint64
localResolvedTs uint64
checkpointTs uint64
globalcheckpointTs uint64
flushCheckpointInterval time.Duration

ddlPuller puller.Puller
Expand Down Expand Up @@ -664,12 +667,17 @@ func (p *processor) globalStatusWorker(ctx context.Context) error {
defer globalResolvedTsNotifier.Close()

updateStatus := func(changefeedStatus *model.ChangeFeedStatus) {
atomic.StoreUint64(&p.globalcheckpointTs, changefeedStatus.CheckpointTs)
if lastResolvedTs == changefeedStatus.ResolvedTs &&
lastCheckPointTs == changefeedStatus.CheckpointTs {
return
}
if lastCheckPointTs < changefeedStatus.CheckpointTs {
p.schemaStorage.DoGC(changefeedStatus.CheckpointTs)
// Delay GC to accommodate pullers starting from a startTs that's too small
// TODO fix startTs problem and remove GC delay, or use other mechanism that prevents the problem deterministically
gcTime := oracle.GetTimeFromTS(changefeedStatus.CheckpointTs).Add(-schemaStorageGCLag)
gcTs := oracle.ComposeTS(gcTime.Unix(), 0)
p.schemaStorage.DoGC(gcTs)
lastCheckPointTs = changefeedStatus.CheckpointTs
}
if lastResolvedTs < changefeedStatus.ResolvedTs {
Expand Down Expand Up @@ -937,6 +945,16 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
return
}
}

globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs)

if replicaInfo.StartTs < globalcheckpointTs {
log.Warn("addTable: startTs < checkpoint",
zap.Int64("tableID", tableID),
zap.Uint64("checkpoint", globalcheckpointTs),
zap.Uint64("startTs", replicaInfo.StartTs))
}

globalResolvedTs := atomic.LoadUint64(&p.sinkEmittedResolvedTs)
log.Debug("Add table", zap.Int64("tableID", tableID),
zap.String("name", tableName),
Expand Down
4 changes: 3 additions & 1 deletion pkg/notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ func (s *notifySuite) TestNotifyHub(c *check.C) {
r1 := notifier.NewReceiver(-1)
r2 := notifier.NewReceiver(-1)
r3 := notifier.NewReceiver(-1)
finishedCh := make(chan struct{})
go func() {
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
notifier.Notify()
}
close(finishedCh)
}()
<-r1.C
r1.Stop()
Expand All @@ -50,7 +52,6 @@ func (s *notifySuite) TestNotifyHub(c *check.C) {
r2.Stop()
r3.Stop()
c.Assert(len(notifier.receivers), check.Equals, 0)
time.Sleep(time.Second)
r4 := notifier.NewReceiver(-1)
<-r4.C
r4.Stop()
Expand All @@ -59,6 +60,7 @@ func (s *notifySuite) TestNotifyHub(c *check.C) {
r5 := notifier2.NewReceiver(10 * time.Millisecond)
<-r5.C
r5.Stop()
<-finishedCh // To make the leak checker happy
}

func (s *notifySuite) TestContinusStop(c *check.C) {
Expand Down
27 changes: 27 additions & 0 deletions tests/move_table/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# diff Configuration.

log-level = "info"
chunk-size = 10
check-thread-count = 4
sample-percent = 100
use-rowid = false
use-checksum = true
fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "move_table"
tables = ["~usertable.*"]

[[source-db]]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
instance-id = "source-1"

[target-db]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
13 changes: 13 additions & 0 deletions tests/move_table/conf/workload
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
threadcount=10
recordcount=60000
operationcount=0
workload=core

readallfields=true

readproportion=0
updateproportion=0
scanproportion=0
insertproportion=0

requestdistribution=uniform
Loading

0 comments on commit b181f9d

Please sign in to comment.