diff --git a/cdc/model/capture_test.go b/cdc/model/capture_test.go new file mode 100644 index 00000000000..46ba96c34b2 --- /dev/null +++ b/cdc/model/capture_test.go @@ -0,0 +1,37 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +import ( + "github.com/pingcap/check" +) + +type captureSuite struct{} + +var _ = check.Suite(&captureSuite{}) + +func (s *captureSuite) TestMarshalUnmarshal(c *check.C) { + info := &CaptureInfo{ + ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7890", + AdvertiseAddr: "127.0.0.1:8300", + } + expected := []byte(`{"id":"9ff52aca-aea6-4022-8ec4-fbee3f2c7890","address":"127.0.0.1:8300"}`) + data, err := info.Marshal() + c.Assert(err, check.IsNil) + c.Assert(data, check.DeepEquals, expected) + decodedInfo := &CaptureInfo{} + err = decodedInfo.Unmarshal(data) + c.Assert(err, check.IsNil) + c.Assert(decodedInfo, check.DeepEquals, info) +} diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 2c2549b2f1e..fbf41bdeed4 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -14,13 +14,16 @@ package model import ( + "math" "time" "github.com/pingcap/check" "github.com/pingcap/parser/model" "github.com/pingcap/ticdc/pkg/config" + cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/util/testleak" filter "github.com/pingcap/tidb-tools/pkg/table-filter" + "github.com/pingcap/tidb/store/tikv/oracle" ) type configSuite struct{} @@ -95,7 +98,7 @@ func (s *configSuite) TestFillV1(c *check.C) { ] }, "cyclic-replication":{ - "enable":false, + "enable":true, "replica-id":1, "filter-replica-ids":[ 2, @@ -112,7 +115,9 @@ func (s *configSuite) TestFillV1(c *check.C) { c.Assert(err, check.IsNil) c.Assert(cfg, check.DeepEquals, &ChangeFeedInfo{ SinkURI: "blackhole://", - Opts: map[string]string{}, + Opts: map[string]string{ + "_cyclic_relax_sql_mode": `{"enable":true,"replica-id":1,"filter-replica-ids":[2,3],"id-buckets":4,"sync-ddl":true}`, + }, StartTs: 417136892416622595, Engine: "memory", SortDir: ".", @@ -150,6 +155,7 @@ func (s *configSuite) TestFillV1(c *check.C) { }, }, Cyclic: &config.CyclicConfig{ + Enable: true, ReplicaID: 1, FilterReplicaID: []uint64{2, 3}, IDBuckets: 4, @@ -223,3 +229,49 @@ func (s *changefeedSuite) TestChangefeedInfoStringer(c *check.C) { str := info.String() c.Check(str, check.Matches, ".*sink-uri\":\"\\*\\*\\*\".*") } + +func (s *changefeedSuite) TestValidateChangefeedID(c *check.C) { + validIDs := []string{ + "test", + "1", + "9ff52aca-aea6-4022-8ec4-fbee3f2c7890", + } + for _, id := range validIDs { + err := ValidateChangefeedID(id) + c.Assert(err, check.IsNil) + } + + invalidIDs := []string{ + "", + "test_task", + "job$", + } + for _, id := range invalidIDs { + err := ValidateChangefeedID(id) + c.Assert(cerror.ErrInvalidChangefeedID.Equal(err), check.IsTrue) + } +} + +func (s *changefeedSuite) TestGetTs(c *check.C) { + var ( + startTs uint64 = 418881574869139457 + targetTs uint64 = 420891571239139085 + checkpointTs uint64 = 420874357546418177 + createTime = time.Now() + info = &ChangeFeedInfo{ + SinkURI: "blackhole://", + CreateTime: createTime, + } + ) + c.Assert(info.GetStartTs(), check.Equals, oracle.EncodeTSO(createTime.Unix()*1000)) + info.StartTs = startTs + c.Assert(info.GetStartTs(), check.Equals, startTs) + + c.Assert(info.GetTargetTs(), check.Equals, uint64(math.MaxUint64)) + info.TargetTs = targetTs + c.Assert(info.GetTargetTs(), check.Equals, targetTs) + + c.Assert(info.GetCheckpointTs(nil), check.Equals, startTs) + status := &ChangeFeedStatus{CheckpointTs: checkpointTs} + c.Assert(info.GetCheckpointTs(status), check.Equals, checkpointTs) +} diff --git a/cdc/model/kv_test.go b/cdc/model/kv_test.go new file mode 100644 index 00000000000..20c75e81f97 --- /dev/null +++ b/cdc/model/kv_test.go @@ -0,0 +1,56 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +import ( + "github.com/pingcap/check" + "github.com/pingcap/ticdc/pkg/regionspan" +) + +type kvSuite struct{} + +var _ = check.Suite(&kvSuite{}) + +func (s *kvSuite) TestRegionFeedEvent(c *check.C) { + raw := &RawKVEntry{ + CRTs: 1, + OpType: OpTypePut, + } + resolved := &ResolvedSpan{ + Span: regionspan.ComparableSpan{}, + ResolvedTs: 111, + } + + ev := &RegionFeedEvent{} + c.Assert(ev.GetValue(), check.IsNil) + + ev = &RegionFeedEvent{Val: raw} + c.Assert(ev.GetValue(), check.DeepEquals, raw) + + ev = &RegionFeedEvent{Resolved: resolved} + c.Assert(ev.GetValue(), check.DeepEquals, resolved) +} + +func (s *kvSuite) TestRawKVEntry(c *check.C) { + raw := &RawKVEntry{ + StartTs: 100, + CRTs: 101, + OpType: OpTypePut, + Key: []byte("123"), + Value: []byte("345"), + } + + c.Assert(raw.String(), check.Equals, "OpType: 1, Key: 123, Value: 345, StartTs: 100, CRTs: 101, RegionID: 0") + c.Assert(raw.ApproximateSize(), check.Equals, int64(6)) +} diff --git a/cdc/model/mounter_test.go b/cdc/model/mounter_test.go new file mode 100644 index 00000000000..fc915cf375e --- /dev/null +++ b/cdc/model/mounter_test.go @@ -0,0 +1,73 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +import ( + "context" + "sync" + + "github.com/pingcap/check" +) + +type mounterSuite struct{} + +var _ = check.Suite(&mounterSuite{}) + +func (s *mounterSuite) TestPolymorphicEvent(c *check.C) { + raw := &RawKVEntry{ + StartTs: 99, + CRTs: 100, + OpType: OpTypePut, + RegionID: 2, + } + resolved := &RawKVEntry{ + OpType: OpTypeResolved, + CRTs: 101, + } + + polyEvent := NewPolymorphicEvent(raw) + c.Assert(polyEvent.RawKV, check.DeepEquals, raw) + c.Assert(polyEvent.CRTs, check.Equals, raw.CRTs) + c.Assert(polyEvent.StartTs, check.Equals, raw.StartTs) + c.Assert(polyEvent.RegionID(), check.Equals, raw.RegionID) + + rawResolved := &RawKVEntry{CRTs: resolved.CRTs, OpType: OpTypeResolved} + polyEvent = NewPolymorphicEvent(resolved) + c.Assert(polyEvent.RawKV, check.DeepEquals, rawResolved) + c.Assert(polyEvent.CRTs, check.Equals, resolved.CRTs) + c.Assert(polyEvent.StartTs, check.Equals, uint64(0)) +} + +func (s *mounterSuite) TestPolymorphicEventPrepare(c *check.C) { + ctx := context.Background() + polyEvent := NewPolymorphicEvent(&RawKVEntry{OpType: OpTypeResolved}) + c.Assert(polyEvent.WaitPrepare(ctx), check.IsNil) + + polyEvent = NewPolymorphicEvent(&RawKVEntry{OpType: OpTypePut}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := polyEvent.WaitPrepare(ctx) + c.Assert(err, check.IsNil) + }() + polyEvent.PrepareFinished() + wg.Wait() + + cctx, cancel := context.WithCancel(ctx) + polyEvent = NewPolymorphicEvent(&RawKVEntry{OpType: OpTypePut}) + cancel() + err := polyEvent.WaitPrepare(cctx) + c.Assert(err, check.Equals, context.Canceled) +} diff --git a/cdc/model/owner_test.go b/cdc/model/owner_test.go index 8b95f83e640..cfd0fc014cd 100644 --- a/cdc/model/owner_test.go +++ b/cdc/model/owner_test.go @@ -14,6 +14,7 @@ package model import ( + "math" "testing" "github.com/pingcap/check" @@ -22,6 +23,132 @@ import ( func TestSuite(t *testing.T) { check.TestingT(t) } +type ownerCommonSuite struct{} + +var _ = check.Suite(&ownerCommonSuite{}) + +func (s *ownerCommonSuite) TestAdminJobType(c *check.C) { + names := map[AdminJobType]string{ + AdminNone: "noop", + AdminStop: "stop changefeed", + AdminResume: "resume changefeed", + AdminRemove: "remove changefeed", + AdminFinish: "finish changefeed", + AdminJobType(100): "unknown", + } + for job, name := range names { + c.Assert(job.String(), check.Equals, name) + } + + isStopped := map[AdminJobType]bool{ + AdminNone: false, + AdminStop: true, + AdminResume: false, + AdminRemove: true, + AdminFinish: true, + } + for job, stopped := range isStopped { + c.Assert(job.IsStopState(), check.Equals, stopped) + } +} + +func (s *ownerCommonSuite) TestDDLStateString(c *check.C) { + names := map[ChangeFeedDDLState]string{ + ChangeFeedSyncDML: "SyncDML", + ChangeFeedWaitToExecDDL: "WaitToExecDDL", + ChangeFeedExecDDL: "ExecDDL", + ChangeFeedDDLExecuteFailed: "DDLExecuteFailed", + ChangeFeedDDLState(100): "Unknown", + } + for state, name := range names { + c.Assert(state.String(), check.Equals, name) + } +} + +func (s *ownerCommonSuite) TestTaskPositionMarshal(c *check.C) { + pos := &TaskPosition{ + ResolvedTs: 420875942036766723, + CheckPointTs: 420875940070686721, + } + expected := `{"checkpoint-ts":420875940070686721,"resolved-ts":420875942036766723,"count":0,"error":null}` + + data, err := pos.Marshal() + c.Assert(err, check.IsNil) + c.Assert(data, check.DeepEquals, expected) + c.Assert(pos.String(), check.Equals, expected) + + newPos := &TaskPosition{} + err = newPos.Unmarshal([]byte(data)) + c.Assert(err, check.IsNil) + c.Assert(newPos, check.DeepEquals, pos) +} + +func (s *ownerCommonSuite) TestChangeFeedStatusMarshal(c *check.C) { + status := &ChangeFeedStatus{ + ResolvedTs: 420875942036766723, + CheckpointTs: 420875940070686721, + } + expected := `{"resolved-ts":420875942036766723,"checkpoint-ts":420875940070686721,"admin-job-type":0}` + + data, err := status.Marshal() + c.Assert(err, check.IsNil) + c.Assert(data, check.DeepEquals, expected) + + newStatus := &ChangeFeedStatus{} + err = newStatus.Unmarshal([]byte(data)) + c.Assert(err, check.IsNil) + c.Assert(newStatus, check.DeepEquals, status) +} + +func (s *ownerCommonSuite) TestTableOperationState(c *check.C) { + processedMap := map[uint64]bool{ + OperDispatched: false, + OperProcessed: true, + OperFinished: true, + } + appliedMap := map[uint64]bool{ + OperDispatched: false, + OperProcessed: false, + OperFinished: true, + } + o := &TableOperation{} + + for status, processed := range processedMap { + o.Status = status + c.Assert(o.TableProcessed(), check.Equals, processed) + } + for status, applied := range appliedMap { + o.Status = status + c.Assert(o.TableApplied(), check.Equals, applied) + } + + // test clone nil operation. no-nil clone will be tested in `TestShouldBeDeepCopy` + var nilTableOper *TableOperation + c.Assert(nilTableOper.Clone(), check.IsNil) +} + +func (s *ownerCommonSuite) TestTaskWorkloadMarshal(c *check.C) { + workload := &TaskWorkload{ + 12: WorkloadInfo{Workload: uint64(1)}, + 15: WorkloadInfo{Workload: uint64(3)}, + } + expected := `{"12":{"workload":1},"15":{"workload":3}}` + + data, err := workload.Marshal() + c.Assert(err, check.IsNil) + c.Assert(data, check.Equals, expected) + + newWorkload := &TaskWorkload{} + err = newWorkload.Unmarshal([]byte(data)) + c.Assert(err, check.IsNil) + c.Assert(newWorkload, check.DeepEquals, workload) + + workload = nil + data, err = workload.Marshal() + c.Assert(err, check.IsNil) + c.Assert(data, check.Equals, "{}") +} + type taskStatusSuite struct{} var _ = check.Suite(&taskStatusSuite{}) @@ -93,6 +220,62 @@ func (s *taskStatusSuite) TestProcSnapshot(c *check.C) { c.Assert(snap.Tables[10], check.DeepEquals, &TableReplicaInfo{StartTs: 200}) } +func (s *taskStatusSuite) TestTaskStatusMarshal(c *check.C) { + status := &TaskStatus{ + Tables: map[TableID]*TableReplicaInfo{ + 1: {StartTs: 420875942036766723}, + }, + } + expected := `{"tables":{"1":{"start-ts":420875942036766723,"mark-table-id":0}},"operation":null,"admin-job-type":0}` + + data, err := status.Marshal() + c.Assert(err, check.IsNil) + c.Assert(data, check.DeepEquals, expected) + c.Assert(status.String(), check.Equals, expected) + + newStatus := &TaskStatus{} + err = newStatus.Unmarshal([]byte(data)) + c.Assert(err, check.IsNil) + c.Assert(newStatus, check.DeepEquals, status) +} + +func (s *taskStatusSuite) TestAddTable(c *check.C) { + ts := uint64(420875942036766723) + expected := &TaskStatus{ + Tables: map[TableID]*TableReplicaInfo{ + 1: {StartTs: ts}, + }, + Operation: map[TableID]*TableOperation{ + 1: { + BoundaryTs: ts, + Status: OperDispatched, + }, + }, + } + status := &TaskStatus{} + status.AddTable(1, &TableReplicaInfo{StartTs: ts}, ts) + c.Assert(status, check.DeepEquals, expected) + + // add existing table does nothing + status.AddTable(1, &TableReplicaInfo{StartTs: 1}, 1) + c.Assert(status, check.DeepEquals, expected) +} + +func (s *taskStatusSuite) TestTaskStatusApplyState(c *check.C) { + ts1 := uint64(420875042036766723) + ts2 := uint64(420876783269969921) + status := &TaskStatus{} + status.AddTable(1, &TableReplicaInfo{StartTs: ts1}, ts1) + status.AddTable(2, &TableReplicaInfo{StartTs: ts2}, ts2) + c.Assert(status.SomeOperationsUnapplied(), check.IsTrue) + c.Assert(status.AppliedTs(), check.Equals, ts1) + + status.Operation[1].Status = OperFinished + status.Operation[2].Status = OperFinished + c.Assert(status.SomeOperationsUnapplied(), check.IsFalse) + c.Assert(status.AppliedTs(), check.Equals, uint64(math.MaxUint64)) +} + type removeTableSuite struct{} var _ = check.Suite(&removeTableSuite{}) @@ -113,9 +296,17 @@ func (s *removeTableSuite) TestShouldReturnRemovedTable(c *check.C) { c.Assert(replicaInfo, check.DeepEquals, &TableReplicaInfo{StartTs: 200}) } -func (s *removeTableSuite) TestShouldHandleTableNotFoundCorrectly(c *check.C) { +func (s *removeTableSuite) TestShouldHandleTableNotFound(c *check.C) { defer testleak.AfterTest(c)() info := TaskStatus{} _, found := info.RemoveTable(404, 666) c.Assert(found, check.IsFalse) + + info = TaskStatus{ + Tables: map[TableID]*TableReplicaInfo{ + 1: {StartTs: 100}, + }, + } + _, found = info.RemoveTable(404, 666) + c.Assert(found, check.IsFalse) } diff --git a/cdc/model/sink.go b/cdc/model/sink.go index acd0c62e1f8..9932c1ab559 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/parser/model" + "github.com/pingcap/ticdc/pkg/quotes" "github.com/pingcap/ticdc/pkg/util" "go.uber.org/zap" ) @@ -194,7 +195,7 @@ func (t TableName) String() string { // QuoteString returns quoted full table name func (t TableName) QuoteString() string { - return QuoteSchema(t.Schema, t.Table) + return quotes.QuoteSchema(t.Schema, t.Table) } // GetSchema returns schema name. diff --git a/cdc/model/string.go b/cdc/model/string.go index f12149940ee..eda57d46756 100644 --- a/cdc/model/string.go +++ b/cdc/model/string.go @@ -14,28 +14,13 @@ package model import ( - "fmt" "strings" cerror "github.com/pingcap/ticdc/pkg/errors" ) -// QuoteSchema quotes a full table name -func QuoteSchema(schema string, table string) string { - return fmt.Sprintf("`%s`.`%s`", EscapeName(schema), EscapeName(table)) -} - -// QuoteName wraps a name with "`" -func QuoteName(name string) string { - return "`" + EscapeName(name) + "`" -} - -// EscapeName replaces all "`" in name with "``" -func EscapeName(name string) string { - return strings.Replace(name, "`", "``", -1) -} - // HolderString returns a string of place holders separated by comma +// n must be greater or equal than 1, or the function will panic func HolderString(n int) string { var builder strings.Builder builder.Grow((n-1)*2 + 1) diff --git a/cdc/model/string_test.go b/cdc/model/string_test.go index ead27bef8f2..9512a6e403e 100644 --- a/cdc/model/string_test.go +++ b/cdc/model/string_test.go @@ -22,6 +22,24 @@ type stringSuite struct{} var _ = check.Suite(&stringSuite{}) +func (s *stringSuite) TestHolderString(c *check.C) { + testCases := []struct { + count int + expected string + }{ + {1, "?"}, + {2, "?,?"}, + {10, "?,?,?,?,?,?,?,?,?,?"}, + } + for _, tc := range testCases { + s := HolderString(tc.count) + c.Assert(s, check.Equals, tc.expected) + } + // test invalid input + c.Assert(func() { HolderString(0) }, check.Panics, "strings.Builder.Grow: negative count") + c.Assert(func() { HolderString(-1) }, check.Panics, "strings.Builder.Grow: negative count") +} + func (s *stringSuite) TestExtractKeySuffix(c *check.C) { defer testleak.AfterTest(c)() testCases := []struct { diff --git a/cdc/sink/cdclog/file.go b/cdc/sink/cdclog/file.go index 1e09eda6d9c..5a7665ad654 100644 --- a/cdc/sink/cdclog/file.go +++ b/cdc/sink/cdclog/file.go @@ -22,10 +22,10 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" parsemodel "github.com/pingcap/parser/model" - "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/cdc/sink/codec" cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/quotes" "github.com/uber-go/atomic" "go.uber.org/zap" ) @@ -233,14 +233,14 @@ func (f *fileSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { func (f *fileSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { switch ddl.Type { case parsemodel.ActionCreateTable: - f.logMeta.Names[ddl.TableInfo.TableID] = model.QuoteSchema(ddl.TableInfo.Schema, ddl.TableInfo.Table) + f.logMeta.Names[ddl.TableInfo.TableID] = quotes.QuoteSchema(ddl.TableInfo.Schema, ddl.TableInfo.Table) err := f.flushLogMeta() if err != nil { return err } case parsemodel.ActionRenameTable: delete(f.logMeta.Names, ddl.PreTableInfo.TableID) - f.logMeta.Names[ddl.TableInfo.TableID] = model.QuoteSchema(ddl.TableInfo.Schema, ddl.TableInfo.Table) + f.logMeta.Names[ddl.TableInfo.TableID] = quotes.QuoteSchema(ddl.TableInfo.Schema, ddl.TableInfo.Table) err := f.flushLogMeta() if err != nil { return err diff --git a/cdc/sink/cdclog/s3.go b/cdc/sink/cdclog/s3.go index 47ea9e55ff5..06a22ad9d69 100644 --- a/cdc/sink/cdclog/s3.go +++ b/cdc/sink/cdclog/s3.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/cdc/sink/codec" cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/quotes" "github.com/uber-go/atomic" "go.uber.org/zap" ) @@ -241,14 +242,14 @@ func (s *s3Sink) EmitCheckpointTs(ctx context.Context, ts uint64) error { func (s *s3Sink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { switch ddl.Type { case parsemodel.ActionCreateTable: - s.logMeta.Names[ddl.TableInfo.TableID] = model.QuoteSchema(ddl.TableInfo.Schema, ddl.TableInfo.Table) + s.logMeta.Names[ddl.TableInfo.TableID] = quotes.QuoteSchema(ddl.TableInfo.Schema, ddl.TableInfo.Table) err := s.flushLogMeta(ctx) if err != nil { return err } case parsemodel.ActionRenameTable: delete(s.logMeta.Names, ddl.PreTableInfo.TableID) - s.logMeta.Names[ddl.TableInfo.TableID] = model.QuoteSchema(ddl.TableInfo.Schema, ddl.TableInfo.Table) + s.logMeta.Names[ddl.TableInfo.TableID] = quotes.QuoteSchema(ddl.TableInfo.Schema, ddl.TableInfo.Table) err := s.flushLogMeta(ctx) if err != nil { return err diff --git a/cdc/sink/cdclog/utils.go b/cdc/sink/cdclog/utils.go index 53328e79e96..73da77b1098 100644 --- a/cdc/sink/cdclog/utils.go +++ b/cdc/sink/cdclog/utils.go @@ -22,12 +22,12 @@ import ( "github.com/pingcap/br/pkg/storage" "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/cdc/sink/codec" + "github.com/pingcap/ticdc/pkg/quotes" "github.com/uber-go/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" - - "github.com/pingcap/ticdc/cdc/model" - "github.com/pingcap/ticdc/cdc/sink/codec" ) const ( @@ -237,7 +237,7 @@ func makeLogMetaContent(tableInfos []*model.SimpleTableInfo) *logMeta { for _, table := range tableInfos { if table != nil { log.Info("[makeLogMetaContent]", zap.Reflect("table", table)) - names[table.TableID] = model.QuoteSchema(table.Schema, table.Table) + names[table.TableID] = quotes.QuoteSchema(table.Schema, table.Table) } } meta.Names = names diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 310e8cb06fe..24d6fc292e4 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -1036,9 +1036,9 @@ func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplic } for i, column := range columnNames { if i == len(columnNames)-1 { - builder.WriteString("`" + model.EscapeName(column) + "`=?") + builder.WriteString("`" + quotes.EscapeName(column) + "`=?") } else { - builder.WriteString("`" + model.EscapeName(column) + "`=?,") + builder.WriteString("`" + quotes.EscapeName(column) + "`=?,") } } diff --git a/pkg/cyclic/replication.go b/pkg/cyclic/replication.go index 0be5ab3186a..8d6c6408554 100644 --- a/pkg/cyclic/replication.go +++ b/pkg/cyclic/replication.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/cyclic/mark" + "github.com/pingcap/ticdc/pkg/quotes" ) // RelaxSQLMode returns relaxed SQL mode, "STRICT_TRANS_TABLES" is removed. @@ -65,7 +66,7 @@ func (*Cyclic) UdpateSourceTableCyclicMark(sourceSchema, sourceTable string, buc schema, table := mark.GetMarkTableName(sourceSchema, sourceTable) return fmt.Sprintf( `INSERT INTO %s VALUES (%d, %d, 0, %d) ON DUPLICATE KEY UPDATE val = val + 1;`, - model.QuoteSchema(schema, table), bucket, replicaID, startTs) + quotes.QuoteSchema(schema, table), bucket, replicaID, startTs) } // FilterReplicaID return a slice of replica IDs needs to be filtered.