From d3f7ec0021a2ac4169a0758a45a51df8a0194e44 Mon Sep 17 00:00:00 2001 From: MoCuishle28 <32541204+MoCuishle28@users.noreply.github.com> Date: Thu, 1 Dec 2022 14:40:01 +0800 Subject: [PATCH 1/5] br: skip compatibility check for the mysql.user table when restoring (#39460) close pingcap/tidb#38785 --- br/pkg/restore/client.go | 11 ++++++++++- br/pkg/restore/client_test.go | 28 ++++++++++++++++------------ br/pkg/restore/systable_restore.go | 2 ++ session/bootstrap.go | 4 ++++ 4 files changed, 32 insertions(+), 13 deletions(-) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 49b9b7bb7f58e..9e4e5a389b935 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -940,7 +940,9 @@ func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metau return errors.Annotate(berrors.ErrRestoreIncompatibleSys, "missed system table: "+table.Info.Name.O) } backupTi := table.Info - if len(ti.Columns) != len(backupTi.Columns) { + // skip checking the number of columns in mysql.user table, + // because higher versions of TiDB may add new columns. + if len(ti.Columns) != len(backupTi.Columns) && backupTi.Name.L != sysUserTableName { log.Error("column count mismatch", zap.Stringer("table", table.Info.Name), zap.Int("col in cluster", len(ti.Columns)), @@ -959,6 +961,13 @@ func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metau col := ti.Columns[i] backupCol := backupColMap[col.Name.L] if backupCol == nil { + // skip when the backed up mysql.user table is missing columns. + if backupTi.Name.L == sysUserTableName { + log.Warn("missing column in backup data", + zap.Stringer("table", table.Info.Name), + zap.String("col", fmt.Sprintf("%s %s", col.Name, col.FieldType.String()))) + continue + } log.Error("missing column in backup data", zap.Stringer("table", table.Info.Name), zap.String("col", fmt.Sprintf("%s %s", col.Name, col.FieldType.String()))) diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index af1aa023560b6..e1f12ddbf7a1d 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -195,14 +195,14 @@ func TestCheckSysTableCompatibility(t *testing.T) { userTI, err := client.GetTableSchema(cluster.Domain, sysDB, model.NewCIStr("user")) require.NoError(t, err) - // column count mismatch + // user table in cluster have more columns(success) mockedUserTI := userTI.Clone() - mockedUserTI.Columns = mockedUserTI.Columns[:len(mockedUserTI.Columns)-1] + userTI.Columns = append(userTI.Columns, &model.ColumnInfo{Name: model.NewCIStr("new-name")}) err = client.CheckSysTableCompatibility(cluster.Domain, []*metautil.Table{{ DB: tmpSysDB, Info: mockedUserTI, }}) - require.True(t, berrors.ErrRestoreIncompatibleSys.Equal(err)) + require.NoError(t, err) // column order mismatch(success) mockedUserTI = userTI.Clone() @@ -213,15 +213,6 @@ func TestCheckSysTableCompatibility(t *testing.T) { }}) require.NoError(t, err) - // missing column - mockedUserTI = userTI.Clone() - mockedUserTI.Columns[0].Name = model.NewCIStr("new-name") - err = client.CheckSysTableCompatibility(cluster.Domain, []*metautil.Table{{ - DB: tmpSysDB, - Info: mockedUserTI, - }}) - require.True(t, berrors.ErrRestoreIncompatibleSys.Equal(err)) - // incompatible column type mockedUserTI = userTI.Clone() mockedUserTI.Columns[0].FieldType.SetFlen(2000) // Columns[0] is `Host` char(255) @@ -238,6 +229,19 @@ func TestCheckSysTableCompatibility(t *testing.T) { Info: mockedUserTI, }}) require.NoError(t, err) + + // use the mysql.db table to test for column count mismatch. + dbTI, err := client.GetTableSchema(cluster.Domain, sysDB, model.NewCIStr("db")) + require.NoError(t, err) + + // other system tables in cluster have more columns(failed) + mockedDBTI := dbTI.Clone() + dbTI.Columns = append(dbTI.Columns, &model.ColumnInfo{Name: model.NewCIStr("new-name")}) + err = client.CheckSysTableCompatibility(cluster.Domain, []*metautil.Table{{ + DB: tmpSysDB, + Info: mockedDBTI, + }}) + require.True(t, berrors.ErrRestoreIncompatibleSys.Equal(err)) } func TestInitFullClusterRestore(t *testing.T) { diff --git a/br/pkg/restore/systable_restore.go b/br/pkg/restore/systable_restore.go index 40e3450c772f2..02ea0860d5425 100644 --- a/br/pkg/restore/systable_restore.go +++ b/br/pkg/restore/systable_restore.go @@ -19,6 +19,8 @@ import ( "go.uber.org/zap" ) +const sysUserTableName = "user" + var statsTables = map[string]struct{}{ "stats_buckets": {}, "stats_extended": {}, diff --git a/session/bootstrap.go b/session/bootstrap.go index 5dbffc42aa5f8..8acf2da2f554d 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -60,6 +60,10 @@ import ( const ( // CreateUserTable is the SQL statement creates User table in system db. + // WARNING: There are some limitations on altering the schema of mysql.user table. + // Adding columns that are nullable or have default values is permitted. + // But operations like dropping or renaming columns may break the compatibility with BR. + // REFERENCE ISSUE: https://github.com/pingcap/tidb/issues/38785 CreateUserTable = `CREATE TABLE IF NOT EXISTS mysql.user ( Host CHAR(255), User CHAR(32), From 09d1a8191d040c4f9a79f19a03747a21561ccc2a Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 1 Dec 2022 15:06:00 +0800 Subject: [PATCH 2/5] bazel: mirror java_tools (#39509) Signed-off-by: Weizhen Wang Signed-off-by: Weizhen Wang Co-authored-by: Ti Chi Robot --- WORKSPACE | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/WORKSPACE b/WORKSPACE index 6a3af98f950c6..29ece61004659 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -58,3 +58,23 @@ http_archive( load("@com_google_protobuf//:protobuf_deps.bzl", "protobuf_deps") protobuf_deps() + +http_archive( + name = "remote_java_tools", + sha256 = "5cd59ea6bf938a1efc1e11ea562d37b39c82f76781211b7cd941a2346ea8484d", + urls = [ + "http://ats.apps.svc/bazel_java_tools/releases/java/v11.9/java_tools-v11.9.zip", + "https://mirror.bazel.build/bazel_java_tools/releases/java/v11.9/java_tools-v11.9.zip", + "https://github.com/bazelbuild/java_tools/releases/download/java_v11.9/java_tools-v11.9.zip", + ], +) + +http_archive( + name = "remote_java_tools_linux", + sha256 = "512582cac5b7ea7974a77b0da4581b21f546c9478f206eedf54687eeac035989", + urls = [ + "http://ats.apps.svc/bazel_java_tools/releases/java/v11.9/java_tools_linux-v11.9.zip", + "https://mirror.bazel.build/bazel_java_tools/releases/java/v11.9/java_tools_linux-v11.9.zip", + "https://github.com/bazelbuild/java_tools/releases/download/java_v11.9/java_tools_linux-v11.9.zip", + ], +) From 78011c52868e549bc3429ee8d85bc202f9705097 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 1 Dec 2022 15:20:01 +0800 Subject: [PATCH 3/5] lightning: add WithDupIndicator to expose status to caller (#39461) --- br/pkg/lightning/BUILD.bazel | 1 + br/pkg/lightning/errormanager/errormanager.go | 19 ++++++++++--------- br/pkg/lightning/lightning.go | 1 + br/pkg/lightning/restore/restore.go | 4 ++++ br/pkg/lightning/restore/table_restore.go | 5 +++++ br/pkg/lightning/run_options.go | 9 +++++++++ 6 files changed, 30 insertions(+), 9 deletions(-) diff --git a/br/pkg/lightning/BUILD.bazel b/br/pkg/lightning/BUILD.bazel index 6b3c5f8e3ce31..95aca448f786a 100644 --- a/br/pkg/lightning/BUILD.bazel +++ b/br/pkg/lightning/BUILD.bazel @@ -37,6 +37,7 @@ go_library( "@com_github_prometheus_client_golang//prometheus/promhttp", "@com_github_shurcool_httpgzip//:httpgzip", "@org_golang_x_exp//slices", + "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", "@org_uber_go_zap//zapcore", ], diff --git a/br/pkg/lightning/errormanager/errormanager.go b/br/pkg/lightning/errormanager/errormanager.go index 43035716d729c..373ba572779d4 100644 --- a/br/pkg/lightning/errormanager/errormanager.go +++ b/br/pkg/lightning/errormanager/errormanager.go @@ -40,9 +40,10 @@ const ( CREATE SCHEMA IF NOT EXISTS %s; ` - syntaxErrorTableName = "syntax_error_v1" - typeErrorTableName = "type_error_v1" - conflictErrorTableName = "conflict_error_v1" + syntaxErrorTableName = "syntax_error_v1" + typeErrorTableName = "type_error_v1" + // ConflictErrorTableName is the table name for duplicate detection. + ConflictErrorTableName = "conflict_error_v1" createSyntaxErrorTable = ` CREATE TABLE IF NOT EXISTS %s.` + syntaxErrorTableName + ` ( @@ -69,7 +70,7 @@ const ( ` createConflictErrorTable = ` - CREATE TABLE IF NOT EXISTS %s.` + conflictErrorTableName + ` ( + CREATE TABLE IF NOT EXISTS %s.` + ConflictErrorTableName + ` ( task_id bigint NOT NULL, create_time datetime(6) NOT NULL DEFAULT now(6), table_name varchar(261) NOT NULL, @@ -91,7 +92,7 @@ const ( ` insertIntoConflictErrorData = ` - INSERT INTO %s.` + conflictErrorTableName + ` + INSERT INTO %s.` + ConflictErrorTableName + ` (task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row) VALUES ` @@ -99,7 +100,7 @@ const ( sqlValuesConflictErrorData = "(?,?,'PRIMARY',?,?,?,?,raw_key,raw_value)" insertIntoConflictErrorIndex = ` - INSERT INTO %s.` + conflictErrorTableName + ` + INSERT INTO %s.` + ConflictErrorTableName + ` (task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row) VALUES ` @@ -108,7 +109,7 @@ const ( selectConflictKeys = ` SELECT _tidb_rowid, raw_handle, raw_row - FROM %s.` + conflictErrorTableName + ` + FROM %s.` + ConflictErrorTableName + ` WHERE table_name = ? AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?; ` @@ -468,7 +469,7 @@ func (em *ErrorManager) LogErrorDetails() { em.logger.Warn(fmtErrMsg(errCnt, "data type", "")) } if errCnt := em.conflictError(); errCnt > 0 { - em.logger.Warn(fmtErrMsg(errCnt, "data type", conflictErrorTableName)) + em.logger.Warn(fmtErrMsg(errCnt, "data type", ConflictErrorTableName)) } } @@ -511,7 +512,7 @@ func (em *ErrorManager) Output() string { } if errCnt := em.conflictError(); errCnt > 0 { count++ - t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(conflictErrorTableName)}) + t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(ConflictErrorTableName)}) } res := "\nImport Data Error Summary: \n" diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 9a3a78031b424..2db76b1001078 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -522,6 +522,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti Glue: g, CheckpointStorage: o.checkpointStorage, CheckpointName: o.checkpointName, + DupIndicator: o.dupIndicator, } var procedure *restore.Controller diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 380642774fe55..3c37342ec71cf 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -227,6 +227,7 @@ type Controller struct { diskQuotaState atomic.Int32 compactState atomic.Int32 status *LightningStatus + dupIndicator *atomic.Bool preInfoGetter PreRestoreInfoGetter precheckItemBuilder *PrecheckItemBuilder @@ -263,6 +264,8 @@ type ControllerParam struct { CheckpointStorage storage.ExternalStorage // when CheckpointStorage is not nil, save file checkpoint to it with this name CheckpointName string + // DupIndicator can expose the duplicate detection result to the caller + DupIndicator *atomic.Bool } func NewRestoreController( @@ -430,6 +433,7 @@ func NewRestoreControllerWithPauser( errorMgr: errorMgr, status: p.Status, taskMgr: nil, + dupIndicator: p.DupIndicator, preInfoGetter: preInfoGetter, precheckItemBuilder: preCheckBuilder, diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index f2f6bc6850d05..33bf45e4d344c 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -813,6 +813,11 @@ func (tr *TableRestore) postProcess( } } + if rc.dupIndicator != nil { + tr.logger.Debug("set dupIndicator", zap.Bool("has-duplicate", hasDupe)) + rc.dupIndicator.CompareAndSwap(false, hasDupe) + } + nextStage := checkpoints.CheckpointStatusChecksummed if rc.cfg.PostRestore.Checksum != config.OpLevelOff && !hasDupe && needChecksum { if cp.Checksum.SumKVS() > 0 || baseTotalChecksum.SumKVS() > 0 { diff --git a/br/pkg/lightning/run_options.go b/br/pkg/lightning/run_options.go index a7b5b90770c02..169c2e47088dd 100644 --- a/br/pkg/lightning/run_options.go +++ b/br/pkg/lightning/run_options.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/util/promutil" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -30,6 +31,7 @@ type options struct { promFactory promutil.Factory promRegistry promutil.Registry logger log.Logger + dupIndicator *atomic.Bool } type Option func(*options) @@ -81,3 +83,10 @@ func WithLogger(logger *zap.Logger) Option { o.logger = log.Logger{Logger: logger} } } + +// WithDupIndicator sets a *bool to indicate duplicate detection has found duplicate data. +func WithDupIndicator(b *atomic.Bool) Option { + return func(o *options) { + o.dupIndicator = b + } +} From a6eba9bfc205d4a16e30df4377c88cdd4f6f1b63 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 1 Dec 2022 16:02:01 +0800 Subject: [PATCH 4/5] txn: fix alias related bugs for non-transactional DML (#39459) ref pingcap/tidb#33485 --- session/nontransactional.go | 98 +++++++++++++++++++++++--------- session/nontransactional_test.go | 63 ++++++++++++++++++-- 2 files changed, 130 insertions(+), 31 deletions(-) diff --git a/session/nontransactional.go b/session/nontransactional.go index d6fbd8e8bd9fa..83a660f827ab6 100644 --- a/session/nontransactional.go +++ b/session/nontransactional.go @@ -98,12 +98,12 @@ func HandleNonTransactionalDML(ctx context.Context, stmt *ast.NonTransactionalDM return nil, err } - tableName, selectSQL, shardColumnInfo, err := buildSelectSQL(stmt, se) + tableName, selectSQL, shardColumnInfo, tableSources, err := buildSelectSQL(stmt, se) if err != nil { return nil, err } - if err := checkConstraintWithShardColumn(stmt, tableName, shardColumnInfo); err != nil { + if err := checkConstraintWithShardColumn(se, stmt, tableName, shardColumnInfo, tableSources); err != nil { return nil, err } @@ -132,23 +132,60 @@ func HandleNonTransactionalDML(ctx context.Context, stmt *ast.NonTransactionalDM return buildExecuteResults(ctx, jobs, se.GetSessionVars().BatchSize.MaxChunkSize, se.GetSessionVars().EnableRedactLog) } -func checkConstraintWithShardColumn(stmt *ast.NonTransactionalDMLStmt, tableName *ast.TableName, shardColumnInfo *model.ColumnInfo) error { +// we require: +// (1) in an update statement, shard column cannot be updated +// +// Note: this is not a comprehensive check. +// We do this to help user prevent some easy mistakes, at an acceptable maintenance cost. +func checkConstraintWithShardColumn(se Session, stmt *ast.NonTransactionalDMLStmt, + tableName *ast.TableName, shardColumnInfo *model.ColumnInfo, tableSources []*ast.TableSource) error { switch s := stmt.DMLStmt.(type) { case *ast.UpdateStmt: - // FIXME: this check is not enough. the table name and schema name of the assignment can be null. But we cannot - // simply rely on the column name to judge it. - for _, assignment := range s.List { - if shardColumnInfo != nil && assignment.Column.Name.L == shardColumnInfo.Name.L && - assignment.Column.Table.L == tableName.Name.L && - assignment.Column.Schema.L == tableName.Schema.L { - return errors.New("Non-transactional DML, shard columns cannot be updated") - } + if err := checkUpdateShardColumn(se, s.List, shardColumnInfo, tableName, tableSources, true); err != nil { + return err + } + case *ast.InsertStmt: + // FIXME: is it possible to happen? + // `insert into t select * from t on duplicate key update id = id + 1` will return an ambiguous column error? + if err := checkUpdateShardColumn(se, s.OnDuplicate, shardColumnInfo, tableName, tableSources, false); err != nil { + return err } default: } return nil } +// shard column should not be updated. +func checkUpdateShardColumn(se Session, assignments []*ast.Assignment, shardColumnInfo *model.ColumnInfo, + tableName *ast.TableName, tableSources []*ast.TableSource, isUpdate bool) error { + // if the table has alias, the alias is used in assignments, and we should use aliased name to compare + aliasedShardColumnTableName := tableName.Name.L + for _, tableSource := range tableSources { + if tableSource.Source.(*ast.TableName).Name.L == aliasedShardColumnTableName && tableSource.AsName.L != "" { + aliasedShardColumnTableName = tableSource.AsName.L + } + } + + if shardColumnInfo == nil { + return nil + } + for _, assignment := range assignments { + sameDB := (assignment.Column.Schema.L == tableName.Schema.L) || + (assignment.Column.Schema.L == "" && tableName.Schema.L == se.GetSessionVars().CurrentDB) + if !sameDB { + continue + } + sameTable := (assignment.Column.Table.L == aliasedShardColumnTableName) || (isUpdate && len(tableSources) == 1) + if !sameTable { + continue + } + if assignment.Column.Name.L == shardColumnInfo.Name.L { + return errors.New("Non-transactional DML, shard column cannot be updated") + } + } + return nil +} + func checkConstraint(stmt *ast.NonTransactionalDMLStmt, se Session) error { sessVars := se.GetSessionVars() if !(sessVars.IsAutocommit() && !sessVars.InTxn()) { @@ -509,29 +546,30 @@ func appendNewJob(jobs []job, id int, start types.Datum, end types.Datum, size i return jobs } -func buildSelectSQL(stmt *ast.NonTransactionalDMLStmt, se Session) (*ast.TableName, string, *model.ColumnInfo, error) { +func buildSelectSQL(stmt *ast.NonTransactionalDMLStmt, se Session) ( + *ast.TableName, string, *model.ColumnInfo, []*ast.TableSource, error) { // only use the first table join, ok := stmt.DMLStmt.TableRefsJoin() if !ok { - return nil, "", nil, errors.New("Non-transactional DML, table source not found") + return nil, "", nil, nil, errors.New("Non-transactional DML, table source not found") } tableSources := make([]*ast.TableSource, 0) tableSources, err := collectTableSourcesInJoin(join, tableSources) if err != nil { - return nil, "", nil, err + return nil, "", nil, nil, err } if len(tableSources) == 0 { - return nil, "", nil, errors.New("Non-transactional DML, no tables found in table refs") + return nil, "", nil, nil, errors.New("Non-transactional DML, no tables found in table refs") } leftMostTableSource := tableSources[0] leftMostTableName, ok := leftMostTableSource.Source.(*ast.TableName) if !ok { - return nil, "", nil, errors.New("Non-transactional DML, table name not found") + return nil, "", nil, nil, errors.New("Non-transactional DML, table name not found") } shardColumnInfo, tableName, err := selectShardColumn(stmt, se, tableSources, leftMostTableName, leftMostTableSource) if err != nil { - return nil, "", nil, err + return nil, "", nil, nil, err } var sb strings.Builder @@ -543,7 +581,7 @@ func buildSelectSQL(stmt *ast.NonTransactionalDMLStmt, se Session) (*ast.TableNa format.RestoreStringWithoutCharset, &sb), ) if err != nil { - return nil, "", nil, errors.Annotate(err, "Failed to restore where clause in non-transactional DML") + return nil, "", nil, nil, errors.Annotate(err, "Failed to restore where clause in non-transactional DML") } } else { sb.WriteString("TRUE") @@ -551,7 +589,7 @@ func buildSelectSQL(stmt *ast.NonTransactionalDMLStmt, se Session) (*ast.TableNa // assure NULL values are placed first selectSQL := fmt.Sprintf("SELECT `%s` FROM `%s`.`%s` WHERE %s ORDER BY IF(ISNULL(`%s`),0,1),`%s`", stmt.ShardColumn.Name.O, tableName.DBInfo.Name.O, tableName.Name.O, sb.String(), stmt.ShardColumn.Name.O, stmt.ShardColumn.Name.O) - return tableName, selectSQL, shardColumnInfo, nil + return tableName, selectSQL, shardColumnInfo, tableSources, nil } func selectShardColumn(stmt *ast.NonTransactionalDMLStmt, se Session, tableSources []*ast.TableSource, @@ -586,17 +624,23 @@ func selectShardColumn(stmt *ast.NonTransactionalDMLStmt, se Session, tableSourc return nil, nil, err } } else if stmt.ShardColumn.Schema.L != "" && stmt.ShardColumn.Table.L != "" && stmt.ShardColumn.Name.L != "" { - dbName := stmt.ShardColumn.Schema - tableName := stmt.ShardColumn.Table - colName := stmt.ShardColumn.Name + specifiedDbName := stmt.ShardColumn.Schema + specifiedTableName := stmt.ShardColumn.Table + specifiedColName := stmt.ShardColumn.Name // the specified table must be in the join tableInJoin := false + var chosenTableName model.CIStr for _, tableSource := range tableSources { tableSourceName := tableSource.Source.(*ast.TableName) - if tableSourceName.Schema.L == dbName.L && tableSourceName.Name.L == tableName.L { + tableSourceFinalTableName := tableSource.AsName // precedence: alias name, then table name + if tableSourceFinalTableName.O == "" { + tableSourceFinalTableName = tableSourceName.Name + } + if tableSourceName.Schema.L == specifiedDbName.L && tableSourceFinalTableName.L == specifiedTableName.L { tableInJoin = true selectedTableName = tableSourceName + chosenTableName = tableSourceName.Name break } } @@ -604,21 +648,21 @@ func selectShardColumn(stmt *ast.NonTransactionalDMLStmt, se Session, tableSourc return nil, nil, errors.Errorf( "Non-transactional DML, shard column %s.%s.%s is not in the tables involved in the join", - dbName.L, tableName.L, colName.L, + specifiedDbName.L, specifiedTableName.L, specifiedColName.L, ) } - tbl, err := domain.GetDomain(se).InfoSchema().TableByName(dbName, tableName) + tbl, err := domain.GetDomain(se).InfoSchema().TableByName(specifiedDbName, chosenTableName) if err != nil { return nil, nil, err } - indexed, shardColumnInfo, err = selectShardColumnByGivenName(colName.L, tbl) + indexed, shardColumnInfo, err = selectShardColumnByGivenName(specifiedColName.L, tbl) if err != nil { return nil, nil, err } } else { return nil, nil, errors.New( - "Non-transactional DML, shard column must be fully specified (dbname.tablename.colname) when multiple tables are involved", + "Non-transactional DML, shard column must be fully specified (i.e. `BATCH ON dbname.tablename.colname`) when multiple tables are involved", ) } } diff --git a/session/nontransactional_test.go b/session/nontransactional_test.go index 3cf238de1bea4..9eea25e46bfbb 100644 --- a/session/nontransactional_test.go +++ b/session/nontransactional_test.go @@ -949,11 +949,66 @@ func TestAnomalousNontransactionalDML(t *testing.T) { tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3)") tk.MustExec("insert into t2 values (1, 1), (2, 2), (4, 4)") - // FIXME: we should not allow this, where the shard column is the join key tk.MustExec("batch on test.t.id limit 1 update t join t2 on t.id=t2.id set t2.id = t2.id+1") tk.MustQuery("select * from t2").Check(testkit.Rows("4 1", "4 2", "4 4")) +} + +func TestAlias(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int, v1 int, v2 int, unique key (id))") + tk.MustExec("create table t2(id int, v int, key (id))") + tk.MustExec("create table t3(id int, v int, key (id))") + tk.MustExec("insert into t values (1, 1, 1), (2, 2, 2), (3, 3, 3)") + tk.MustExec("insert into t2 values (1, 1), (2, 20), (4, 40)") + tk.MustExec("insert into t3 values (2, 21), (4, 41), (5, 50)") + tk.MustExec("update t as t1 set v1 = test.t1.id + 1") + tk.MustQuery("select * from t").Check(testkit.Rows("1 2 1", "2 3 2", "3 4 3")) + + tk.MustExec("batch on test.tt2.id limit 1 replace into t select tt2.id, tt2.v, t3.v from t2 as tt2 join t3 on tt2.id=t3.id") + tk.MustQuery("select * from t order by id").Check(testkit.Rows("1 2 1", "2 20 21", "3 4 3", "4 40 41")) +} - // FIXME: and this - tk.MustExec("batch on id limit 1 update t set id=id+1") - tk.MustQuery("select * from t").Check(testkit.Rows("4 1", "4 2", "4 3")) +func TestUpdatingShardColumn(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int, v int, unique key(id))") + tk.MustExec("create table t2(id int, v int, unique key(id))") + tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3)") + tk.MustExec("insert into t2 values (1, 1), (2, 2), (4, 4)") + + // update stmt + tk.MustContainErrMsg("batch on id limit 1 update t set id=id+1", "Non-transactional DML, shard column cannot be updated") + // insert on dup update + tk.MustContainErrMsg("batch on id limit 1 insert into t select * from t on duplicate key update t.id=t.id+10", "Non-transactional DML, shard column cannot be updated") + // update with table alias + tk.MustContainErrMsg("batch on id limit 1 update t as t1 set t1.id=t1.id+1", "Non-transactional DML, shard column cannot be updated") + // insert on dup update with table alias + tk.MustContainErrMsg("batch on id limit 1 insert into t select * from t as t1 on duplicate key update t1.id=t1.id+10", "Non-transactional DML, shard column cannot be updated") + // update stmt, multiple table + tk.MustContainErrMsg("batch on test.t.id limit 1 update t join t2 on t.id=t2.id set t.id=t.id+1", "Non-transactional DML, shard column cannot be updated") + // update stmt, multiple table, alias + tk.MustContainErrMsg("batch on test.tt.id limit 1 update t as tt join t2 as tt2 on tt.id=tt2.id set tt.id=tt.id+10", "Non-transactional DML, shard column cannot be updated") +} + +func TestNameAmbiguity(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int, v1 int, v2 int, unique key (id))") + tk.MustExec("create table t2(id int, v int, key (id))") + tk.MustExec("create table t3(id int, v int, key (id))") + + tk.MustExec("create database test2") + tk.MustExec("use test2") + tk.MustExec("create table t(id int, v1 int, v2 int, unique key (id))") + tk.MustExec("create table t2(id int, v int, key (id))") + tk.MustExec("create table t3(id int, v int, key (id))") + tk.MustExec("insert into t values (1, 1, 1), (2, 2, 2)") + + tk.MustExec("use test") + tk.MustExec("batch on id limit 1 insert into t select * from test2.t") + tk.MustQuery("select * from t").Check(testkit.Rows("1 1 1", "2 2 2")) } From 213187cb47f44de0c64f23e6b5fc45ccf1ae1316 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 1 Dec 2022 16:38:01 +0800 Subject: [PATCH 5/5] server: use advertise addr for autoid service to expose itself (#39497) close pingcap/tidb#39214 --- server/http_status.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/http_status.go b/server/http_status.go index 2f31fa4a423d2..e86868325aa0c 100644 --- a/server/http_status.go +++ b/server/http_status.go @@ -477,7 +477,7 @@ func (s *Server) startStatusServerAndRPCServer(serverMux *http.ServeMux) { logutil.BgLogger().Error("tikv store not etcd background", zap.Error(err)) break } - selfAddr := s.statusListener.Addr().String() + selfAddr := s.cfg.AdvertiseAddress service := autoid.New(selfAddr, etcdAddr, store, ebd.TLSConfig()) logutil.BgLogger().Info("register auto service at", zap.String("addr", selfAddr)) pb.RegisterAutoIDAllocServer(grpcServer, service)