Skip to content

Commit

Permalink
Merge branch 'master' into subscription-flush
Browse files Browse the repository at this point in the history
  • Loading branch information
joccau authored Dec 1, 2022
2 parents 4e528fc + 213187c commit 2ad7f64
Show file tree
Hide file tree
Showing 14 changed files with 213 additions and 54 deletions.
20 changes: 20 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,23 @@ http_archive(
load("@com_google_protobuf//:protobuf_deps.bzl", "protobuf_deps")

protobuf_deps()

http_archive(
name = "remote_java_tools",
sha256 = "5cd59ea6bf938a1efc1e11ea562d37b39c82f76781211b7cd941a2346ea8484d",
urls = [
"http://ats.apps.svc/bazel_java_tools/releases/java/v11.9/java_tools-v11.9.zip",
"https://mirror.bazel.build/bazel_java_tools/releases/java/v11.9/java_tools-v11.9.zip",
"https://github.com/bazelbuild/java_tools/releases/download/java_v11.9/java_tools-v11.9.zip",
],
)

http_archive(
name = "remote_java_tools_linux",
sha256 = "512582cac5b7ea7974a77b0da4581b21f546c9478f206eedf54687eeac035989",
urls = [
"http://ats.apps.svc/bazel_java_tools/releases/java/v11.9/java_tools_linux-v11.9.zip",
"https://mirror.bazel.build/bazel_java_tools/releases/java/v11.9/java_tools_linux-v11.9.zip",
"https://github.com/bazelbuild/java_tools/releases/download/java_v11.9/java_tools_linux-v11.9.zip",
],
)
1 change: 1 addition & 0 deletions br/pkg/lightning/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"@com_github_prometheus_client_golang//prometheus/promhttp",
"@com_github_shurcool_httpgzip//:httpgzip",
"@org_golang_x_exp//slices",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
Expand Down
19 changes: 10 additions & 9 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ const (
CREATE SCHEMA IF NOT EXISTS %s;
`

syntaxErrorTableName = "syntax_error_v1"
typeErrorTableName = "type_error_v1"
conflictErrorTableName = "conflict_error_v1"
syntaxErrorTableName = "syntax_error_v1"
typeErrorTableName = "type_error_v1"
// ConflictErrorTableName is the table name for duplicate detection.
ConflictErrorTableName = "conflict_error_v1"

createSyntaxErrorTable = `
CREATE TABLE IF NOT EXISTS %s.` + syntaxErrorTableName + ` (
Expand All @@ -69,7 +70,7 @@ const (
`

createConflictErrorTable = `
CREATE TABLE IF NOT EXISTS %s.` + conflictErrorTableName + ` (
CREATE TABLE IF NOT EXISTS %s.` + ConflictErrorTableName + ` (
task_id bigint NOT NULL,
create_time datetime(6) NOT NULL DEFAULT now(6),
table_name varchar(261) NOT NULL,
Expand All @@ -91,15 +92,15 @@ const (
`

insertIntoConflictErrorData = `
INSERT INTO %s.` + conflictErrorTableName + `
INSERT INTO %s.` + ConflictErrorTableName + `
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row)
VALUES
`

sqlValuesConflictErrorData = "(?,?,'PRIMARY',?,?,?,?,raw_key,raw_value)"

insertIntoConflictErrorIndex = `
INSERT INTO %s.` + conflictErrorTableName + `
INSERT INTO %s.` + ConflictErrorTableName + `
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row)
VALUES
`
Expand All @@ -108,7 +109,7 @@ const (

selectConflictKeys = `
SELECT _tidb_rowid, raw_handle, raw_row
FROM %s.` + conflictErrorTableName + `
FROM %s.` + ConflictErrorTableName + `
WHERE table_name = ? AND _tidb_rowid >= ? and _tidb_rowid < ?
ORDER BY _tidb_rowid LIMIT ?;
`
Expand Down Expand Up @@ -468,7 +469,7 @@ func (em *ErrorManager) LogErrorDetails() {
em.logger.Warn(fmtErrMsg(errCnt, "data type", ""))
}
if errCnt := em.conflictError(); errCnt > 0 {
em.logger.Warn(fmtErrMsg(errCnt, "data type", conflictErrorTableName))
em.logger.Warn(fmtErrMsg(errCnt, "data type", ConflictErrorTableName))
}
}

Expand Down Expand Up @@ -511,7 +512,7 @@ func (em *ErrorManager) Output() string {
}
if errCnt := em.conflictError(); errCnt > 0 {
count++
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(conflictErrorTableName)})
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(ConflictErrorTableName)})
}

res := "\nImport Data Error Summary: \n"
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
Glue: g,
CheckpointStorage: o.checkpointStorage,
CheckpointName: o.checkpointName,
DupIndicator: o.dupIndicator,
}

var procedure *restore.Controller
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ type Controller struct {
diskQuotaState atomic.Int32
compactState atomic.Int32
status *LightningStatus
dupIndicator *atomic.Bool

preInfoGetter PreRestoreInfoGetter
precheckItemBuilder *PrecheckItemBuilder
Expand Down Expand Up @@ -263,6 +264,8 @@ type ControllerParam struct {
CheckpointStorage storage.ExternalStorage
// when CheckpointStorage is not nil, save file checkpoint to it with this name
CheckpointName string
// DupIndicator can expose the duplicate detection result to the caller
DupIndicator *atomic.Bool
}

func NewRestoreController(
Expand Down Expand Up @@ -430,6 +433,7 @@ func NewRestoreControllerWithPauser(
errorMgr: errorMgr,
status: p.Status,
taskMgr: nil,
dupIndicator: p.DupIndicator,

preInfoGetter: preInfoGetter,
precheckItemBuilder: preCheckBuilder,
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,11 @@ func (tr *TableRestore) postProcess(
}
}

if rc.dupIndicator != nil {
tr.logger.Debug("set dupIndicator", zap.Bool("has-duplicate", hasDupe))
rc.dupIndicator.CompareAndSwap(false, hasDupe)
}

nextStage := checkpoints.CheckpointStatusChecksummed
if rc.cfg.PostRestore.Checksum != config.OpLevelOff && !hasDupe && needChecksum {
if cp.Checksum.SumKVS() > 0 || baseTotalChecksum.SumKVS() > 0 {
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/lightning/run_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/util/promutil"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand All @@ -30,6 +31,7 @@ type options struct {
promFactory promutil.Factory
promRegistry promutil.Registry
logger log.Logger
dupIndicator *atomic.Bool
}

type Option func(*options)
Expand Down Expand Up @@ -81,3 +83,10 @@ func WithLogger(logger *zap.Logger) Option {
o.logger = log.Logger{Logger: logger}
}
}

// WithDupIndicator sets a *bool to indicate duplicate detection has found duplicate data.
func WithDupIndicator(b *atomic.Bool) Option {
return func(o *options) {
o.dupIndicator = b
}
}
11 changes: 10 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,9 @@ func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metau
return errors.Annotate(berrors.ErrRestoreIncompatibleSys, "missed system table: "+table.Info.Name.O)
}
backupTi := table.Info
if len(ti.Columns) != len(backupTi.Columns) {
// skip checking the number of columns in mysql.user table,
// because higher versions of TiDB may add new columns.
if len(ti.Columns) != len(backupTi.Columns) && backupTi.Name.L != sysUserTableName {
log.Error("column count mismatch",
zap.Stringer("table", table.Info.Name),
zap.Int("col in cluster", len(ti.Columns)),
Expand All @@ -959,6 +961,13 @@ func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metau
col := ti.Columns[i]
backupCol := backupColMap[col.Name.L]
if backupCol == nil {
// skip when the backed up mysql.user table is missing columns.
if backupTi.Name.L == sysUserTableName {
log.Warn("missing column in backup data",
zap.Stringer("table", table.Info.Name),
zap.String("col", fmt.Sprintf("%s %s", col.Name, col.FieldType.String())))
continue
}
log.Error("missing column in backup data",
zap.Stringer("table", table.Info.Name),
zap.String("col", fmt.Sprintf("%s %s", col.Name, col.FieldType.String())))
Expand Down
28 changes: 16 additions & 12 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,14 @@ func TestCheckSysTableCompatibility(t *testing.T) {
userTI, err := client.GetTableSchema(cluster.Domain, sysDB, model.NewCIStr("user"))
require.NoError(t, err)

// column count mismatch
// user table in cluster have more columns(success)
mockedUserTI := userTI.Clone()
mockedUserTI.Columns = mockedUserTI.Columns[:len(mockedUserTI.Columns)-1]
userTI.Columns = append(userTI.Columns, &model.ColumnInfo{Name: model.NewCIStr("new-name")})
err = client.CheckSysTableCompatibility(cluster.Domain, []*metautil.Table{{
DB: tmpSysDB,
Info: mockedUserTI,
}})
require.True(t, berrors.ErrRestoreIncompatibleSys.Equal(err))
require.NoError(t, err)

// column order mismatch(success)
mockedUserTI = userTI.Clone()
Expand All @@ -213,15 +213,6 @@ func TestCheckSysTableCompatibility(t *testing.T) {
}})
require.NoError(t, err)

// missing column
mockedUserTI = userTI.Clone()
mockedUserTI.Columns[0].Name = model.NewCIStr("new-name")
err = client.CheckSysTableCompatibility(cluster.Domain, []*metautil.Table{{
DB: tmpSysDB,
Info: mockedUserTI,
}})
require.True(t, berrors.ErrRestoreIncompatibleSys.Equal(err))

// incompatible column type
mockedUserTI = userTI.Clone()
mockedUserTI.Columns[0].FieldType.SetFlen(2000) // Columns[0] is `Host` char(255)
Expand All @@ -238,6 +229,19 @@ func TestCheckSysTableCompatibility(t *testing.T) {
Info: mockedUserTI,
}})
require.NoError(t, err)

// use the mysql.db table to test for column count mismatch.
dbTI, err := client.GetTableSchema(cluster.Domain, sysDB, model.NewCIStr("db"))
require.NoError(t, err)

// other system tables in cluster have more columns(failed)
mockedDBTI := dbTI.Clone()
dbTI.Columns = append(dbTI.Columns, &model.ColumnInfo{Name: model.NewCIStr("new-name")})
err = client.CheckSysTableCompatibility(cluster.Domain, []*metautil.Table{{
DB: tmpSysDB,
Info: mockedDBTI,
}})
require.True(t, berrors.ErrRestoreIncompatibleSys.Equal(err))
}

func TestInitFullClusterRestore(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/restore/systable_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"go.uber.org/zap"
)

const sysUserTableName = "user"

var statsTables = map[string]struct{}{
"stats_buckets": {},
"stats_extended": {},
Expand Down
2 changes: 1 addition & 1 deletion server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func (s *Server) startStatusServerAndRPCServer(serverMux *http.ServeMux) {
logutil.BgLogger().Error("tikv store not etcd background", zap.Error(err))
break
}
selfAddr := s.statusListener.Addr().String()
selfAddr := s.cfg.AdvertiseAddress
service := autoid.New(selfAddr, etcdAddr, store, ebd.TLSConfig())
logutil.BgLogger().Info("register auto service at", zap.String("addr", selfAddr))
pb.RegisterAutoIDAllocServer(grpcServer, service)
Expand Down
4 changes: 4 additions & 0 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ import (

const (
// CreateUserTable is the SQL statement creates User table in system db.
// WARNING: There are some limitations on altering the schema of mysql.user table.
// Adding columns that are nullable or have default values is permitted.
// But operations like dropping or renaming columns may break the compatibility with BR.
// REFERENCE ISSUE: https://github.com/pingcap/tidb/issues/38785
CreateUserTable = `CREATE TABLE IF NOT EXISTS mysql.user (
Host CHAR(255),
User CHAR(32),
Expand Down
Loading

0 comments on commit 2ad7f64

Please sign in to comment.