From a1a198eedaee47c86cab677f9243eb08afd59f03 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 15 Feb 2023 10:58:01 +0800 Subject: [PATCH] sinkv2(ticdc): add max-multi-update-row config in mysql sink (#8191) (#8204) ref pingcap/tiflow#8084 --- cdc/sinkv2/eventsink/txn/mysql/mysql.go | 31 +++- cdc/sinkv2/eventsink/txn/mysql/mysql_test.go | 60 ++++++++ pkg/sink/mysql/config.go | 114 +++++++++++--- pkg/sink/mysql/config_test.go | 13 ++ pkg/sqlmodel/multirow.go | 150 ++++++++++++++++++- pkg/sqlmodel/multirow_bench_test.go | 110 ++++++++++++++ pkg/sqlmodel/multirow_test.go | 121 ++++++++++++++- pkg/sqlmodel/row_change.go | 13 ++ 8 files changed, 578 insertions(+), 34 deletions(-) create mode 100644 pkg/sqlmodel/multirow_bench_test.go diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql.go b/cdc/sinkv2/eventsink/txn/mysql/mysql.go index 8403d004b70..5c6db285e8c 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql.go @@ -256,6 +256,7 @@ func convert2RowChanges( tableInfo, nil, nil) } + res.SetApproximateDataSize(row.ApproximateDataSize) return res } @@ -330,7 +331,7 @@ func (s *mysqlBackend) groupRowsByType( updateRow = append( updateRow, convert2RowChanges(row, tableInfo, sqlmodel.RowChangeUpdate)) - if len(updateRow) >= s.cfg.MaxTxnRow { + if len(updateRow) >= s.cfg.MaxMultiUpdateRowCount { updateRows = append(updateRows, updateRow) updateRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) } @@ -384,15 +385,37 @@ func (s *mysqlBackend) batchSingleTxnDmls( // handle update if len(updateRows) > 0 { for _, rows := range updateRows { - sql, value := sqlmodel.GenUpdateSQL(rows...) - sqls = append(sqls, sql) - values = append(values, value) + s, v := s.genUpdateSQL(rows...) + sqls = append(sqls, s...) + values = append(values, v...) } } return } +func (s *mysqlBackend) genUpdateSQL(rows ...*sqlmodel.RowChange) ([]string, [][]interface{}) { + size, count := 0, 0 + for _, r := range rows { + size += int(r.GetApproximateDataSize()) + count++ + } + if size < s.cfg.MaxMultiUpdateRowSize*count { + // use multi update in one SQL + sql, value := sqlmodel.GenUpdateSQLFast(rows...) + return []string{sql}, [][]interface{}{value} + } + // each row has one independent update SQL. + sqls := make([]string, 0, len(rows)) + values := make([][]interface{}, 0, len(rows)) + for _, row := range rows { + sql, value := row.GenSQL(sqlmodel.DMLUpdate) + sqls = append(sqls, sql) + values = append(values, value) + } + return sqls, values +} + func hasHandleKey(cols []*model.Column) bool { for _, col := range cols { if col == nil { diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go b/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go index b16ea5b02f3..3c6b06a104f 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go @@ -28,7 +28,10 @@ import ( dmysql "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" @@ -37,6 +40,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink" pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" + "github.com/pingcap/tiflow/pkg/sqlmodel" "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" @@ -1681,3 +1685,59 @@ func TestGroupRowsByType(t *testing.T) { }) } } + +func TestBackendGenUpdateSQL(t *testing.T) { + ctx := context.Background() + ms := newMySQLBackendWithoutDB(ctx) + table := &model.TableName{Schema: "db", Table: "tb1"} + + createSQL := "CREATE TABLE tb1 (id INT PRIMARY KEY, name varchar(20))" + stmt, err := parser.New().ParseOneStmt(createSQL, "", "") + require.NoError(t, err) + ti, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt)) + require.NoError(t, err) + + row1 := sqlmodel.NewRowChange(table, table, []any{1, "a"}, []any{1, "aa"}, ti, ti, nil) + row1.SetApproximateDataSize(6) + row2 := sqlmodel.NewRowChange(table, table, []any{2, "b"}, []any{2, "bb"}, ti, ti, nil) + row2.SetApproximateDataSize(6) + + testCases := []struct { + rows []*sqlmodel.RowChange + maxMultiUpdateRowSize int + expectedSQLs []string + expectedValues [][]interface{} + }{ + { + []*sqlmodel.RowChange{row1, row2}, + ms.cfg.MaxMultiUpdateRowCount, + []string{ + "UPDATE `db`.`tb1` SET " + + "`id`=CASE WHEN `id`=? THEN ? WHEN `id`=? THEN ? END, " + + "`name`=CASE WHEN `id`=? THEN ? WHEN `id`=? THEN ? END " + + "WHERE `id` IN (?,?)", + }, + [][]interface{}{ + {1, 1, 2, 2, 1, "aa", 2, "bb", 1, 2}, + }, + }, + { + []*sqlmodel.RowChange{row1, row2}, + 0, + []string{ + "UPDATE `db`.`tb1` SET `id` = ?, `name` = ? WHERE `id` = ? LIMIT 1", + "UPDATE `db`.`tb1` SET `id` = ?, `name` = ? WHERE `id` = ? LIMIT 1", + }, + [][]interface{}{ + {1, "aa", 1}, + {2, "bb", 2}, + }, + }, + } + for _, tc := range testCases { + ms.cfg.MaxMultiUpdateRowSize = tc.maxMultiUpdateRowSize + sqls, values := ms.genUpdateSQL(tc.rows...) + require.Equal(t, tc.expectedSQLs, sqls) + require.Equal(t, tc.expectedValues, values) + } +} diff --git a/pkg/sink/mysql/config.go b/pkg/sink/mysql/config.go index 1e91b4cf546..204765e0741 100644 --- a/pkg/sink/mysql/config.go +++ b/pkg/sink/mysql/config.go @@ -41,10 +41,21 @@ const ( defaultWorkerCount = 16 // defaultMaxTxnRow is the default max number of rows in a transaction. defaultMaxTxnRow = 256 + // defaultMaxMultiUpdateRowCount is the default max number of rows in a + // single multi update SQL. + defaultMaxMultiUpdateRowCount = 40 + // defaultMaxMultiUpdateRowSize(1KB) defines the default value of MaxMultiUpdateRowSize + // When row average size is larger MaxMultiUpdateRowSize, + // disable multi update, otherwise enable multi update. + defaultMaxMultiUpdateRowSize = 1024 // The upper limit of max worker counts. maxWorkerCount = 1024 // The upper limit of max txn rows. maxMaxTxnRow = 2048 + // The upper limit of max multi update rows in a single SQL. + maxMaxMultiUpdateRowCount = 256 + // The upper limit of max multi update row size(8KB). + maxMaxMultiUpdateRowSize = 8192 defaultTiDBTxnMode = txnModeOptimistic defaultBatchReplaceEnabled = true @@ -67,19 +78,21 @@ const ( // Config is the configs for MySQL backend. type Config struct { - WorkerCount int - MaxTxnRow int - tidbTxnMode string - BatchReplaceEnabled bool - BatchReplaceSize int - ReadTimeout string - WriteTimeout string - DialTimeout string - SafeMode bool - Timezone string - TLS string - ForceReplicate bool - EnableOldValue bool + WorkerCount int + MaxTxnRow int + MaxMultiUpdateRowCount int + MaxMultiUpdateRowSize int + tidbTxnMode string + BatchReplaceEnabled bool + BatchReplaceSize int + ReadTimeout string + WriteTimeout string + DialTimeout string + SafeMode bool + Timezone string + TLS string + ForceReplicate bool + EnableOldValue bool IsTiDB bool // IsTiDB is true if the downstream is TiDB SourceID uint64 @@ -89,16 +102,18 @@ type Config struct { // NewConfig returns the default mysql backend config. func NewConfig() *Config { return &Config{ - WorkerCount: defaultWorkerCount, - MaxTxnRow: defaultMaxTxnRow, - tidbTxnMode: defaultTiDBTxnMode, - BatchReplaceEnabled: defaultBatchReplaceEnabled, - BatchReplaceSize: defaultBatchReplaceSize, - ReadTimeout: defaultReadTimeout, - WriteTimeout: defaultWriteTimeout, - DialTimeout: defaultDialTimeout, - SafeMode: defaultSafeMode, - BatchDMLEnable: defaultBatchDMLEnable, + WorkerCount: defaultWorkerCount, + MaxTxnRow: defaultMaxTxnRow, + MaxMultiUpdateRowCount: defaultMaxMultiUpdateRowCount, + MaxMultiUpdateRowSize: defaultMaxMultiUpdateRowSize, + tidbTxnMode: defaultTiDBTxnMode, + BatchReplaceEnabled: defaultBatchReplaceEnabled, + BatchReplaceSize: defaultBatchReplaceSize, + ReadTimeout: defaultReadTimeout, + WriteTimeout: defaultWriteTimeout, + DialTimeout: defaultDialTimeout, + SafeMode: defaultSafeMode, + BatchDMLEnable: defaultBatchDMLEnable, } } @@ -124,6 +139,12 @@ func (c *Config) Apply( if err = getMaxTxnRow(query, &c.MaxTxnRow); err != nil { return err } + if err = getMaxMultiUpdateRowCount(query, &c.MaxMultiUpdateRowCount); err != nil { + return err + } + if err = getMaxMultiUpdateRowSize(query, &c.MaxMultiUpdateRowSize); err != nil { + return err + } if err = getTiDBTxnMode(query, &c.tidbTxnMode); err != nil { return err } @@ -205,6 +226,53 @@ func getMaxTxnRow(values url.Values, maxTxnRow *int) error { return nil } +func getMaxMultiUpdateRowCount(values url.Values, maxMultiUpdateRow *int) error { + s := values.Get("max-multi-update-row") + if len(s) == 0 { + return nil + } + + c, err := strconv.Atoi(s) + if err != nil { + return cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) + } + if c <= 0 { + return cerror.WrapError(cerror.ErrMySQLInvalidConfig, + fmt.Errorf("invalid max-multi-update-row %d, which must be greater than 0", c)) + } + if c > maxMaxMultiUpdateRowCount { + log.Warn("max-multi-update-row too large", + zap.Int("original", c), zap.Int("override", maxMaxMultiUpdateRowCount)) + c = maxMaxMultiUpdateRowCount + } + *maxMultiUpdateRow = c + return nil +} + +func getMaxMultiUpdateRowSize(values url.Values, maxMultiUpdateRowSize *int) error { + s := values.Get("max-multi-update-row-size") + if len(s) == 0 { + return nil + } + + c, err := strconv.Atoi(s) + if err != nil { + return cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) + } + if c < 0 { + return cerror.WrapError(cerror.ErrMySQLInvalidConfig, + fmt.Errorf("invalid max-multi-update-row-size %d, "+ + "which must be greater than or equal to 0", c)) + } + if c > maxMaxMultiUpdateRowSize { + log.Warn("max-multi-update-row-size too large", + zap.Int("original", c), zap.Int("override", maxMaxMultiUpdateRowSize)) + c = maxMaxMultiUpdateRowSize + } + *maxMultiUpdateRowSize = c + return nil +} + func getTiDBTxnMode(values url.Values, mode *string) error { s := values.Get("tidb-txn-mode") if len(s) == 0 { diff --git a/pkg/sink/mysql/config_test.go b/pkg/sink/mysql/config_test.go index efee4fac8db..9ba903f841a 100644 --- a/pkg/sink/mysql/config_test.go +++ b/pkg/sink/mysql/config_test.go @@ -181,6 +181,8 @@ func TestApplySinkURIParamsToConfig(t *testing.T) { expected := NewConfig() expected.WorkerCount = 64 expected.MaxTxnRow = 20 + expected.MaxMultiUpdateRowCount = 80 + expected.MaxMultiUpdateRowSize = 512 expected.BatchReplaceEnabled = true expected.BatchReplaceSize = 50 expected.SafeMode = false @@ -188,6 +190,7 @@ func TestApplySinkURIParamsToConfig(t *testing.T) { expected.tidbTxnMode = "pessimistic" expected.EnableOldValue = true uriStr := "mysql://127.0.0.1:3306/?worker-count=64&max-txn-row=20" + + "&max-multi-update-row=80&max-multi-update-row-size=512" + "&batch-replace-enable=true&batch-replace-size=50&safe-mode=false" + "&tidb-txn-mode=pessimistic" uri, err := url.Parse(uriStr) @@ -240,6 +243,16 @@ func TestParseSinkURIOverride(t *testing.T) { checker: func(sp *Config) { require.EqualValues(t, sp.MaxTxnRow, maxMaxTxnRow) }, + }, { + uri: "mysql://127.0.0.1:3306/?max-multi-update-row=2147483648", // int32 max + checker: func(sp *Config) { + require.EqualValues(t, sp.MaxMultiUpdateRowCount, maxMaxMultiUpdateRowCount) + }, + }, { + uri: "mysql://127.0.0.1:3306/?max-multi-update-row-size=2147483648", // int32 max + checker: func(sp *Config) { + require.EqualValues(t, sp.MaxMultiUpdateRowSize, maxMaxMultiUpdateRowSize) + }, }, { uri: "mysql://127.0.0.1:3306/?tidb-txn-mode=badmode", checker: func(sp *Config) { diff --git a/pkg/sqlmodel/multirow.go b/pkg/sqlmodel/multirow.go index 0f7341bf749..452a69bb2f9 100644 --- a/pkg/sqlmodel/multirow.go +++ b/pkg/sqlmodel/multirow.go @@ -16,12 +16,12 @@ package sqlmodel import ( "strings" + "github.com/pingcap/log" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/format" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/opcode" driver "github.com/pingcap/tidb/types/parser_driver" - "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/pkg/quotes" "go.uber.org/zap" ) @@ -122,6 +122,152 @@ func GenDeleteSQL(changes ...*RowChange) (string, []interface{}) { return buf.String(), args } +// GenUpdateSQLFast generates the UPDATE SQL and its arguments. +// Input `changes` should have same target table and same columns for WHERE +// (typically same PK/NOT NULL UK), otherwise the behaviour is undefined. +// It is a faster version compared with GenUpdateSQL. +func GenUpdateSQLFast(changes ...*RowChange) (string, []any) { + if len(changes) == 0 { + log.L().DPanic("row changes is empty") + return "", nil + } + var buf strings.Builder + buf.Grow(1024) + + // Generate UPDATE `db`.`table` SET + first := changes[0] + buf.WriteString("UPDATE ") + buf.WriteString(first.targetTable.QuoteString()) + buf.WriteString(" SET ") + + // Pre-generate essential sub statements used after WHEN, WHERE and IN. + var ( + whereCaseStmt string + whenCaseStmt string + inCaseStmt string + ) + whereColumns, _ := first.whereColumnsAndValues() + if len(whereColumns) == 1 { + // one field PK or UK, use `field`=? directly. + whereCaseStmt = quotes.QuoteName(whereColumns[0]) + whenCaseStmt = whereCaseStmt + "=?" + inCaseStmt = valuesHolder(len(changes)) + } else { + // multiple fields PK or UK, use ROW(...fields) expression. + whereValuesHolder := valuesHolder(len(whereColumns)) + whereCaseStmt = "ROW(" + for i, column := range whereColumns { + whereCaseStmt += quotes.QuoteName(column) + if i != len(whereColumns)-1 { + whereCaseStmt += "," + } else { + whereCaseStmt += ")" + whenCaseStmt = whereCaseStmt + "=ROW" + whereValuesHolder + } + } + var inCaseStmtBuf strings.Builder + // inCaseStmt sample: IN (ROW(?,?,?),ROW(?,?,?)) + // ^ ^ + // Buffer size count between |---------------------| + // equals to 3 * len(changes) for each `ROW` + // plus 1 * len(changes) - 1 for each `,` between every two ROW(?,?,?) + // plus len(whereValuesHolder) * len(changes) + // plus 2 for `(` and `)` + inCaseStmtBuf.Grow((4+len(whereValuesHolder))*len(changes) + 1) + inCaseStmtBuf.WriteString("(") + for i := range changes { + inCaseStmtBuf.WriteString("ROW") + inCaseStmtBuf.WriteString(whereValuesHolder) + if i != len(changes)-1 { + inCaseStmtBuf.WriteString(",") + } else { + inCaseStmtBuf.WriteString(")") + } + } + inCaseStmt = inCaseStmtBuf.String() + } + + // Generate `ColumnName`=CASE WHEN .. THEN .. END + // Use this value in order to identify which is the first CaseWhenThen line, + // because generated column can happen any where and it will be skipped. + isFirstCaseWhenThenLine := true + for _, column := range first.targetTableInfo.Columns { + if isGenerated(first.targetTableInfo.Columns, column.Name) { + continue + } + if !isFirstCaseWhenThenLine { + // insert ", " after END of each lines except for the first line. + buf.WriteString(", ") + } + + buf.WriteString(quotes.QuoteName(column.Name.String()) + "=CASE") + for range changes { + buf.WriteString(" WHEN ") + buf.WriteString(whenCaseStmt) + buf.WriteString(" THEN ?") + } + buf.WriteString(" END") + isFirstCaseWhenThenLine = false + } + + // Generate WHERE .. IN .. + buf.WriteString(" WHERE ") + buf.WriteString(whereCaseStmt) + buf.WriteString(" IN ") + buf.WriteString(inCaseStmt) + + // Build args of the UPDATE SQL + var assignValueColumnCount int + var skipColIdx []int + for i, col := range first.sourceTableInfo.Columns { + if isGenerated(first.targetTableInfo.Columns, col.Name) { + skipColIdx = append(skipColIdx, i) + continue + } + assignValueColumnCount++ + } + args := make([]any, 0, + assignValueColumnCount*len(changes)*(len(whereColumns)+1)+len(changes)*len(whereColumns)) + argsPerCol := make([][]any, assignValueColumnCount) + for i := 0; i < assignValueColumnCount; i++ { + argsPerCol[i] = make([]any, 0, len(changes)*(len(whereColumns)+1)) + } + whereValuesAtTheEnd := make([]any, 0, len(changes)*len(whereColumns)) + for _, change := range changes { + _, whereValues := change.whereColumnsAndValues() + // a simple check about different number of WHERE values, not trying to + // cover all cases + if len(whereValues) != len(whereColumns) { + log.Panic("len(whereValues) != len(whereColumns)", + zap.Int("len(whereValues)", len(whereValues)), + zap.Int("len(whereColumns)", len(whereColumns)), + zap.Any("whereValues", whereValues), + zap.Stringer("sourceTable", change.sourceTable)) + return "", nil + } + + whereValuesAtTheEnd = append(whereValuesAtTheEnd, whereValues...) + + i := 0 // used as index of skipColIdx + writeableCol := 0 + for j, val := range change.postValues { + if i < len(skipColIdx) && skipColIdx[i] == j { + i++ + continue + } + argsPerCol[writeableCol] = append(argsPerCol[writeableCol], whereValues...) + argsPerCol[writeableCol] = append(argsPerCol[writeableCol], val) + writeableCol++ + } + } + for _, a := range argsPerCol { + args = append(args, a...) + } + args = append(args, whereValuesAtTheEnd...) + + return buf.String(), args +} + // GenUpdateSQL generates the UPDATE SQL and its arguments. // Input `changes` should have same target table and same columns for WHERE // (typically same PK/NOT NULL UK), otherwise the behaviour is undefined. @@ -224,7 +370,7 @@ func GenUpdateSQL(changes ...*RowChange) (string, []interface{}) { // a simple check about different number of WHERE values, not trying to // cover all cases if len(whereValues) != len(whereColumns) { - log.L().DPanic("len(whereValues) != len(whereColumns)", + log.Panic("len(whereValues) != len(whereColumns)", zap.Int("len(whereValues)", len(whereValues)), zap.Int("len(whereColumns)", len(whereColumns)), zap.Any("whereValues", whereValues), diff --git a/pkg/sqlmodel/multirow_bench_test.go b/pkg/sqlmodel/multirow_bench_test.go new file mode 100644 index 00000000000..496758274b3 --- /dev/null +++ b/pkg/sqlmodel/multirow_bench_test.go @@ -0,0 +1,110 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlmodel + +import ( + "fmt" + "testing" + "time" + + cdcmodel "github.com/pingcap/tiflow/cdc/model" +) + +func prepareDataOneColoumnPK(t *testing.T, batch int) []*RowChange { + source := &cdcmodel.TableName{Schema: "db", Table: "tb"} + target := &cdcmodel.TableName{Schema: "db", Table: "tb"} + + sourceTI := mockTableInfo(t, `CREATE TABLE tb (c INT, c2 INT, c3 INT, + c4 VARCHAR(10), c5 VARCHAR(100), c6 VARCHAR(1000), PRIMARY KEY (c))`) + targetTI := mockTableInfo(t, `CREATE TABLE tb (c INT, c2 INT, c3 INT, + c4 VARCHAR(10), c5 VARCHAR(100), c6 VARCHAR(1000), PRIMARY KEY (c))`) + + changes := make([]*RowChange, 0, batch) + for i := 0; i < batch; i++ { + change := NewRowChange(source, target, + []interface{}{i + 1, i + 2, i + 3, "c4", "c5", "c6"}, + []interface{}{i + 10, i + 20, i + 30, "c4", "c5", "c6"}, + sourceTI, targetTI, nil) + changes = append(changes, change) + } + return changes +} + +func prepareDataMultiColumnsPK(t *testing.T, batch int) []*RowChange { + source := &cdcmodel.TableName{Schema: "db", Table: "tb"} + target := &cdcmodel.TableName{Schema: "db", Table: "tb"} + + sourceTI := mockTableInfo(t, `CREATE TABLE tb (c1 INT, c2 INT, c3 INT, c4 INT, + c5 VARCHAR(10), c6 VARCHAR(100), c7 VARCHAR(1000), c8 timestamp, c9 timestamp, + PRIMARY KEY (c1, c2, c3, c4))`) + targetTI := mockTableInfo(t, `CREATE TABLE tb (c1 INT, c2 INT, c3 INT, c4 INT, + c5 VARCHAR(10), c6 VARCHAR(100), c7 VARCHAR(1000), c8 timestamp, c9 timestamp, + PRIMARY KEY (c1, c2, c3, c4))`) + + changes := make([]*RowChange, 0, batch) + for i := 0; i < batch; i++ { + change := NewRowChange(source, target, + []interface{}{i + 1, i + 2, i + 3, i + 4, "c4", "c5", "c6", "c7", time.Time{}, time.Time{}}, + []interface{}{i + 10, i + 20, i + 30, i + 40, "c4", "c5", "c6", "c7", time.Time{}, time.Time{}}, + sourceTI, targetTI, nil) + changes = append(changes, change) + } + return changes +} + +// bench cmd: go test -run='^$' -benchmem -bench '^(BenchmarkGenUpdate)$' github.com/pingcap/tiflow/pkg/sqlmodel +func BenchmarkGenUpdate(b *testing.B) { + t := &testing.T{} + type genCase struct { + name string + fn genSQLFunc + prepare func(t *testing.T, batch int) []*RowChange + } + batchSizes := []int{ + 1, 2, 4, 8, 16, 32, 64, 128, + } + benchCases := []genCase{ + { + name: "OneColumnPK-GenUpdateSQL", + fn: GenUpdateSQL, + prepare: prepareDataOneColoumnPK, + }, + { + name: "OneColumnPK-GenUpdateSQLFast", + fn: GenUpdateSQLFast, + prepare: prepareDataOneColoumnPK, + }, + { + name: "MultiColumnsPK-GenUpdateSQL", + fn: GenUpdateSQL, + prepare: prepareDataMultiColumnsPK, + }, + { + name: "MultiColumnsPK-GenUpdateSQLFast", + fn: GenUpdateSQLFast, + prepare: prepareDataMultiColumnsPK, + }, + } + for _, bc := range benchCases { + for _, batch := range batchSizes { + name := fmt.Sprintf("%s-Batch%d", bc.name, batch) + b.Run(name, func(b *testing.B) { + changes := prepareDataOneColoumnPK(t, batch) + for i := 0; i < b.N; i++ { + bc.fn(changes...) + } + }) + } + } +} diff --git a/pkg/sqlmodel/multirow_test.go b/pkg/sqlmodel/multirow_test.go index 52500da6758..3fd84ec9a2e 100644 --- a/pkg/sqlmodel/multirow_test.go +++ b/pkg/sqlmodel/multirow_test.go @@ -20,6 +20,8 @@ import ( "github.com/stretchr/testify/require" ) +type genSQLFunc func(changes ...*RowChange) (string, []interface{}) + func TestGenDeleteMultiRows(t *testing.T) { t.Parallel() @@ -41,7 +43,31 @@ func TestGenDeleteMultiRows(t *testing.T) { func TestGenUpdateMultiRows(t *testing.T) { t.Parallel() + testGenUpdateMultiRows(t, GenUpdateSQL) + testGenUpdateMultiRows(t, GenUpdateSQLFast) +} +func TestGenUpdateMultiRowsOneColPK(t *testing.T) { + t.Parallel() + testGenUpdateMultiRowsOneColPK(t, GenUpdateSQL) + testGenUpdateMultiRowsOneColPK(t, GenUpdateSQLFast) +} + +func TestGenUpdateMultiRowsWithVirtualGeneratedColumn(t *testing.T) { + t.Parallel() + testGenUpdateMultiRowsWithVirtualGeneratedColumn(t, GenUpdateSQL) + testGenUpdateMultiRowsWithVirtualGeneratedColumn(t, GenUpdateSQLFast) + testGenUpdateMultiRowsWithVirtualGeneratedColumns(t, GenUpdateSQL) + testGenUpdateMultiRowsWithVirtualGeneratedColumns(t, GenUpdateSQLFast) +} + +func TestGenUpdateMultiRowsWithStoredGeneratedColumn(t *testing.T) { + t.Parallel() + testGenUpdateMultiRowsWithStoredGeneratedColumn(t, GenUpdateSQL) + testGenUpdateMultiRowsWithStoredGeneratedColumn(t, GenUpdateSQLFast) +} + +func testGenUpdateMultiRows(t *testing.T, genUpdate genSQLFunc) { source1 := &cdcmodel.TableName{Schema: "db", Table: "tb1"} source2 := &cdcmodel.TableName{Schema: "db", Table: "tb2"} target := &cdcmodel.TableName{Schema: "db", Table: "tb"} @@ -52,7 +78,7 @@ func TestGenUpdateMultiRows(t *testing.T) { change1 := NewRowChange(source1, target, []interface{}{1, 2, 3}, []interface{}{10, 20, 30}, sourceTI1, targetTI, nil) change2 := NewRowChange(source2, target, []interface{}{4, 5, 6}, []interface{}{40, 50, 60}, sourceTI2, targetTI, nil) - sql, args := GenUpdateSQL(change1, change2) + sql, args := genUpdate(change1, change2) expectedSQL := "UPDATE `db`.`tb` SET " + "`c`=CASE WHEN ROW(`c`,`c2`)=ROW(?,?) THEN ? WHEN ROW(`c`,`c2`)=ROW(?,?) THEN ? END, " + @@ -70,9 +96,7 @@ func TestGenUpdateMultiRows(t *testing.T) { require.Equal(t, expectedArgs, args) } -func TestGenUpdateMultiRowsOneColPK(t *testing.T) { - t.Parallel() - +func testGenUpdateMultiRowsOneColPK(t *testing.T, genUpdate genSQLFunc) { source1 := &cdcmodel.TableName{Schema: "db", Table: "tb1"} source2 := &cdcmodel.TableName{Schema: "db", Table: "tb2"} target := &cdcmodel.TableName{Schema: "db", Table: "tb"} @@ -83,7 +107,7 @@ func TestGenUpdateMultiRowsOneColPK(t *testing.T) { change1 := NewRowChange(source1, target, []interface{}{1, 2, 3}, []interface{}{10, 20, 30}, sourceTI1, targetTI, nil) change2 := NewRowChange(source2, target, []interface{}{4, 5, 6}, []interface{}{40, 50, 60}, sourceTI2, targetTI, nil) - sql, args := GenUpdateSQL(change1, change2) + sql, args := genUpdate(change1, change2) expectedSQL := "UPDATE `db`.`tb` SET " + "`c`=CASE WHEN `c`=? THEN ? WHEN `c`=? THEN ? END, " + @@ -101,6 +125,93 @@ func TestGenUpdateMultiRowsOneColPK(t *testing.T) { require.Equal(t, expectedArgs, args) } +func testGenUpdateMultiRowsWithVirtualGeneratedColumn(t *testing.T, genUpdate genSQLFunc) { + source := &cdcmodel.TableName{Schema: "db", Table: "tb"} + target := &cdcmodel.TableName{Schema: "db", Table: "tb"} + + sourceTI := mockTableInfo(t, "CREATE TABLE tb1 (c INT, c1 int as (c+100) virtual not null, c2 INT, c3 INT, PRIMARY KEY (c))") + targetTI := mockTableInfo(t, "CREATE TABLE tb (c INT, c1 int as (c+100) virtual not null, c2 INT, c3 INT, PRIMARY KEY (c))") + + change1 := NewRowChange(source, target, []interface{}{1, 101, 2, 3}, []interface{}{10, 110, 20, 30}, sourceTI, targetTI, nil) + change2 := NewRowChange(source, target, []interface{}{4, 104, 5, 6}, []interface{}{40, 140, 50, 60}, sourceTI, targetTI, nil) + change3 := NewRowChange(source, target, []interface{}{7, 107, 8, 9}, []interface{}{70, 170, 80, 90}, sourceTI, targetTI, nil) + sql, args := genUpdate(change1, change2, change3) + + expectedSQL := "UPDATE `db`.`tb` SET " + + "`c`=CASE WHEN `c`=? THEN ? WHEN `c`=? THEN ? WHEN `c`=? THEN ? END, " + + "`c2`=CASE WHEN `c`=? THEN ? WHEN `c`=? THEN ? WHEN `c`=? THEN ? END, " + + "`c3`=CASE WHEN `c`=? THEN ? WHEN `c`=? THEN ? WHEN `c`=? THEN ? END " + + "WHERE `c` IN (?,?,?)" + expectedArgs := []interface{}{ + 1, 10, 4, 40, 7, 70, + 1, 20, 4, 50, 7, 80, + 1, 30, 4, 60, 7, 90, + 1, 4, 7, + } + + require.Equal(t, expectedSQL, sql) + require.Equal(t, expectedArgs, args) +} + +// multiple generated columns test case +func testGenUpdateMultiRowsWithVirtualGeneratedColumns(t *testing.T, genUpdate genSQLFunc) { + source := &cdcmodel.TableName{Schema: "db", Table: "tb"} + target := &cdcmodel.TableName{Schema: "db", Table: "tb"} + + sourceTI := mockTableInfo(t, `CREATE TABLE tb1 (c0 int as (c4*c4) virtual not null, + c1 int as (c+100) virtual not null, c2 INT, c3 INT, c4 INT, PRIMARY KEY (c4))`) + targetTI := mockTableInfo(t, `CREATE TABLE tb (c0 int as (c4*c4) virtual not null, + c1 int as (c+100) virtual not null, c2 INT, c3 INT, c4 INT, PRIMARY KEY (c4))`) + + change1 := NewRowChange(source, target, []interface{}{1, 101, 2, 3, 1}, []interface{}{100, 110, 20, 30, 10}, sourceTI, targetTI, nil) + change2 := NewRowChange(source, target, []interface{}{16, 104, 5, 6, 4}, []interface{}{1600, 140, 50, 60, 40}, sourceTI, targetTI, nil) + change3 := NewRowChange(source, target, []interface{}{49, 107, 8, 9, 7}, []interface{}{4900, 170, 80, 90, 70}, sourceTI, targetTI, nil) + sql, args := genUpdate(change1, change2, change3) + + expectedSQL := "UPDATE `db`.`tb` SET " + + "`c2`=CASE WHEN `c4`=? THEN ? WHEN `c4`=? THEN ? WHEN `c4`=? THEN ? END, " + + "`c3`=CASE WHEN `c4`=? THEN ? WHEN `c4`=? THEN ? WHEN `c4`=? THEN ? END, " + + "`c4`=CASE WHEN `c4`=? THEN ? WHEN `c4`=? THEN ? WHEN `c4`=? THEN ? END " + + "WHERE `c4` IN (?,?,?)" + expectedArgs := []interface{}{ + 1, 20, 4, 50, 7, 80, + 1, 30, 4, 60, 7, 90, + 1, 10, 4, 40, 7, 70, + 1, 4, 7, + } + + require.Equal(t, expectedSQL, sql) + require.Equal(t, expectedArgs, args) +} + +func testGenUpdateMultiRowsWithStoredGeneratedColumn(t *testing.T, genUpdate genSQLFunc) { + source := &cdcmodel.TableName{Schema: "db", Table: "tb"} + target := &cdcmodel.TableName{Schema: "db", Table: "tb"} + + sourceTI := mockTableInfo(t, "CREATE TABLE tb1 (c INT, c1 int as (c+100) stored, c2 INT, c3 INT, PRIMARY KEY (c1))") + targetTI := mockTableInfo(t, "CREATE TABLE tb (c INT, c1 int as (c+100) stored, c2 INT, c3 INT, PRIMARY KEY (c1))") + + change1 := NewRowChange(source, target, []interface{}{1, 101, 2, 3}, []interface{}{10, 110, 20, 30}, sourceTI, targetTI, nil) + change2 := NewRowChange(source, target, []interface{}{4, 104, 5, 6}, []interface{}{40, 140, 50, 60}, sourceTI, targetTI, nil) + change3 := NewRowChange(source, target, []interface{}{7, 107, 8, 9}, []interface{}{70, 170, 80, 90}, sourceTI, targetTI, nil) + sql, args := genUpdate(change1, change2, change3) + + expectedSQL := "UPDATE `db`.`tb` SET " + + "`c`=CASE WHEN `c1`=? THEN ? WHEN `c1`=? THEN ? WHEN `c1`=? THEN ? END, " + + "`c2`=CASE WHEN `c1`=? THEN ? WHEN `c1`=? THEN ? WHEN `c1`=? THEN ? END, " + + "`c3`=CASE WHEN `c1`=? THEN ? WHEN `c1`=? THEN ? WHEN `c1`=? THEN ? END " + + "WHERE `c1` IN (?,?,?)" + expectedArgs := []interface{}{ + 101, 10, 104, 40, 107, 70, + 101, 20, 104, 50, 107, 80, + 101, 30, 104, 60, 107, 90, + 101, 104, 107, + } + + require.Equal(t, expectedSQL, sql) + require.Equal(t, expectedArgs, args) +} + func TestGenInsertMultiRows(t *testing.T) { t.Parallel() diff --git a/pkg/sqlmodel/row_change.go b/pkg/sqlmodel/row_change.go index 883e4ef60d3..29d2e8e1c9d 100644 --- a/pkg/sqlmodel/row_change.go +++ b/pkg/sqlmodel/row_change.go @@ -69,6 +69,8 @@ type RowChange struct { tp RowChangeType whereHandle *WhereHandle + + approximateDataSize int64 } // NewRowChange creates a new RowChange. @@ -195,6 +197,17 @@ func (r *RowChange) SetWhereHandle(whereHandle *WhereHandle) { r.whereHandle = whereHandle } +// GetApproximateDataSize returns internal approximateDataSize, it could be zero +// if this value is not set. +func (r *RowChange) GetApproximateDataSize() int64 { + return r.approximateDataSize +} + +// SetApproximateDataSize sets the approximate size of row change. +func (r *RowChange) SetApproximateDataSize(approximateDataSize int64) { + r.approximateDataSize = approximateDataSize +} + func (r *RowChange) lazyInitWhereHandle() { if r.whereHandle != nil { return