Skip to content

Commit

Permalink
Merge branch 'master' into binding_create
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Dec 1, 2022
2 parents 17e6791 + c3565a1 commit bbfb53d
Show file tree
Hide file tree
Showing 39 changed files with 797 additions and 150 deletions.
20 changes: 20 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,23 @@ http_archive(
load("@com_google_protobuf//:protobuf_deps.bzl", "protobuf_deps")

protobuf_deps()

http_archive(
name = "remote_java_tools",
sha256 = "5cd59ea6bf938a1efc1e11ea562d37b39c82f76781211b7cd941a2346ea8484d",
urls = [
"http://ats.apps.svc/bazel_java_tools/releases/java/v11.9/java_tools-v11.9.zip",
"https://mirror.bazel.build/bazel_java_tools/releases/java/v11.9/java_tools-v11.9.zip",
"https://github.com/bazelbuild/java_tools/releases/download/java_v11.9/java_tools-v11.9.zip",
],
)

http_archive(
name = "remote_java_tools_linux",
sha256 = "512582cac5b7ea7974a77b0da4581b21f546c9478f206eedf54687eeac035989",
urls = [
"http://ats.apps.svc/bazel_java_tools/releases/java/v11.9/java_tools_linux-v11.9.zip",
"https://mirror.bazel.build/bazel_java_tools/releases/java/v11.9/java_tools_linux-v11.9.zip",
"https://github.com/bazelbuild/java_tools/releases/download/java_v11.9/java_tools_linux-v11.9.zip",
],
)
1 change: 1 addition & 0 deletions br/pkg/lightning/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"@com_github_prometheus_client_golang//prometheus/promhttp",
"@com_github_shurcool_httpgzip//:httpgzip",
"@org_golang_x_exp//slices",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
Expand Down
19 changes: 10 additions & 9 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ const (
CREATE SCHEMA IF NOT EXISTS %s;
`

syntaxErrorTableName = "syntax_error_v1"
typeErrorTableName = "type_error_v1"
conflictErrorTableName = "conflict_error_v1"
syntaxErrorTableName = "syntax_error_v1"
typeErrorTableName = "type_error_v1"
// ConflictErrorTableName is the table name for duplicate detection.
ConflictErrorTableName = "conflict_error_v1"

createSyntaxErrorTable = `
CREATE TABLE IF NOT EXISTS %s.` + syntaxErrorTableName + ` (
Expand All @@ -69,7 +70,7 @@ const (
`

createConflictErrorTable = `
CREATE TABLE IF NOT EXISTS %s.` + conflictErrorTableName + ` (
CREATE TABLE IF NOT EXISTS %s.` + ConflictErrorTableName + ` (
task_id bigint NOT NULL,
create_time datetime(6) NOT NULL DEFAULT now(6),
table_name varchar(261) NOT NULL,
Expand All @@ -91,15 +92,15 @@ const (
`

insertIntoConflictErrorData = `
INSERT INTO %s.` + conflictErrorTableName + `
INSERT INTO %s.` + ConflictErrorTableName + `
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row)
VALUES
`

sqlValuesConflictErrorData = "(?,?,'PRIMARY',?,?,?,?,raw_key,raw_value)"

insertIntoConflictErrorIndex = `
INSERT INTO %s.` + conflictErrorTableName + `
INSERT INTO %s.` + ConflictErrorTableName + `
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row)
VALUES
`
Expand All @@ -108,7 +109,7 @@ const (

selectConflictKeys = `
SELECT _tidb_rowid, raw_handle, raw_row
FROM %s.` + conflictErrorTableName + `
FROM %s.` + ConflictErrorTableName + `
WHERE table_name = ? AND _tidb_rowid >= ? and _tidb_rowid < ?
ORDER BY _tidb_rowid LIMIT ?;
`
Expand Down Expand Up @@ -468,7 +469,7 @@ func (em *ErrorManager) LogErrorDetails() {
em.logger.Warn(fmtErrMsg(errCnt, "data type", ""))
}
if errCnt := em.conflictError(); errCnt > 0 {
em.logger.Warn(fmtErrMsg(errCnt, "data type", conflictErrorTableName))
em.logger.Warn(fmtErrMsg(errCnt, "data type", ConflictErrorTableName))
}
}

Expand Down Expand Up @@ -511,7 +512,7 @@ func (em *ErrorManager) Output() string {
}
if errCnt := em.conflictError(); errCnt > 0 {
count++
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(conflictErrorTableName)})
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(ConflictErrorTableName)})
}

res := "\nImport Data Error Summary: \n"
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
Glue: g,
CheckpointStorage: o.checkpointStorage,
CheckpointName: o.checkpointName,
DupIndicator: o.dupIndicator,
}

var procedure *restore.Controller
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ type Controller struct {
diskQuotaState atomic.Int32
compactState atomic.Int32
status *LightningStatus
dupIndicator *atomic.Bool

preInfoGetter PreRestoreInfoGetter
precheckItemBuilder *PrecheckItemBuilder
Expand Down Expand Up @@ -263,6 +264,8 @@ type ControllerParam struct {
CheckpointStorage storage.ExternalStorage
// when CheckpointStorage is not nil, save file checkpoint to it with this name
CheckpointName string
// DupIndicator can expose the duplicate detection result to the caller
DupIndicator *atomic.Bool
}

func NewRestoreController(
Expand Down Expand Up @@ -430,6 +433,7 @@ func NewRestoreControllerWithPauser(
errorMgr: errorMgr,
status: p.Status,
taskMgr: nil,
dupIndicator: p.DupIndicator,

preInfoGetter: preInfoGetter,
precheckItemBuilder: preCheckBuilder,
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,11 @@ func (tr *TableRestore) postProcess(
}
}

if rc.dupIndicator != nil {
tr.logger.Debug("set dupIndicator", zap.Bool("has-duplicate", hasDupe))
rc.dupIndicator.CompareAndSwap(false, hasDupe)
}

nextStage := checkpoints.CheckpointStatusChecksummed
if rc.cfg.PostRestore.Checksum != config.OpLevelOff && !hasDupe && needChecksum {
if cp.Checksum.SumKVS() > 0 || baseTotalChecksum.SumKVS() > 0 {
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/lightning/run_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/util/promutil"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand All @@ -30,6 +31,7 @@ type options struct {
promFactory promutil.Factory
promRegistry promutil.Registry
logger log.Logger
dupIndicator *atomic.Bool
}

type Option func(*options)
Expand Down Expand Up @@ -81,3 +83,10 @@ func WithLogger(logger *zap.Logger) Option {
o.logger = log.Logger{Logger: logger}
}
}

// WithDupIndicator sets a *bool to indicate duplicate detection has found duplicate data.
func WithDupIndicator(b *atomic.Bool) Option {
return func(o *options) {
o.dupIndicator = b
}
}
11 changes: 10 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,9 @@ func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metau
return errors.Annotate(berrors.ErrRestoreIncompatibleSys, "missed system table: "+table.Info.Name.O)
}
backupTi := table.Info
if len(ti.Columns) != len(backupTi.Columns) {
// skip checking the number of columns in mysql.user table,
// because higher versions of TiDB may add new columns.
if len(ti.Columns) != len(backupTi.Columns) && backupTi.Name.L != sysUserTableName {
log.Error("column count mismatch",
zap.Stringer("table", table.Info.Name),
zap.Int("col in cluster", len(ti.Columns)),
Expand All @@ -959,6 +961,13 @@ func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metau
col := ti.Columns[i]
backupCol := backupColMap[col.Name.L]
if backupCol == nil {
// skip when the backed up mysql.user table is missing columns.
if backupTi.Name.L == sysUserTableName {
log.Warn("missing column in backup data",
zap.Stringer("table", table.Info.Name),
zap.String("col", fmt.Sprintf("%s %s", col.Name, col.FieldType.String())))
continue
}
log.Error("missing column in backup data",
zap.Stringer("table", table.Info.Name),
zap.String("col", fmt.Sprintf("%s %s", col.Name, col.FieldType.String())))
Expand Down
28 changes: 16 additions & 12 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,14 @@ func TestCheckSysTableCompatibility(t *testing.T) {
userTI, err := client.GetTableSchema(cluster.Domain, sysDB, model.NewCIStr("user"))
require.NoError(t, err)

// column count mismatch
// user table in cluster have more columns(success)
mockedUserTI := userTI.Clone()
mockedUserTI.Columns = mockedUserTI.Columns[:len(mockedUserTI.Columns)-1]
userTI.Columns = append(userTI.Columns, &model.ColumnInfo{Name: model.NewCIStr("new-name")})
err = client.CheckSysTableCompatibility(cluster.Domain, []*metautil.Table{{
DB: tmpSysDB,
Info: mockedUserTI,
}})
require.True(t, berrors.ErrRestoreIncompatibleSys.Equal(err))
require.NoError(t, err)

// column order mismatch(success)
mockedUserTI = userTI.Clone()
Expand All @@ -213,15 +213,6 @@ func TestCheckSysTableCompatibility(t *testing.T) {
}})
require.NoError(t, err)

// missing column
mockedUserTI = userTI.Clone()
mockedUserTI.Columns[0].Name = model.NewCIStr("new-name")
err = client.CheckSysTableCompatibility(cluster.Domain, []*metautil.Table{{
DB: tmpSysDB,
Info: mockedUserTI,
}})
require.True(t, berrors.ErrRestoreIncompatibleSys.Equal(err))

// incompatible column type
mockedUserTI = userTI.Clone()
mockedUserTI.Columns[0].FieldType.SetFlen(2000) // Columns[0] is `Host` char(255)
Expand All @@ -238,6 +229,19 @@ func TestCheckSysTableCompatibility(t *testing.T) {
Info: mockedUserTI,
}})
require.NoError(t, err)

// use the mysql.db table to test for column count mismatch.
dbTI, err := client.GetTableSchema(cluster.Domain, sysDB, model.NewCIStr("db"))
require.NoError(t, err)

// other system tables in cluster have more columns(failed)
mockedDBTI := dbTI.Clone()
dbTI.Columns = append(dbTI.Columns, &model.ColumnInfo{Name: model.NewCIStr("new-name")})
err = client.CheckSysTableCompatibility(cluster.Domain, []*metautil.Table{{
DB: tmpSysDB,
Info: mockedDBTI,
}})
require.True(t, berrors.ErrRestoreIncompatibleSys.Equal(err))
}

func TestInitFullClusterRestore(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/restore/systable_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"go.uber.org/zap"
)

const sysUserTableName = "user"

var statsTables = map[string]struct{}{
"stats_buckets": {},
"stats_extended": {},
Expand Down
10 changes: 9 additions & 1 deletion br/pkg/streamhelper/spans/sorted.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,19 @@ func (f *ValuedFull) mergeWithOverlap(val Valued, overlapped []Valued, newItems

// overlapped inserts the overlapped ranges of the span into the `result` slice.
func (f *ValuedFull) overlapped(k Span, result *[]Valued) {
var first Span
var (
first Span
hasFirst bool
)
// Firstly, let's find whether there is a overlapped region with less start key.
f.inner.DescendLessOrEqual(Valued{Key: k}, func(item btree.Item) bool {
first = item.(Valued).Key
hasFirst = true
return false
})
if !hasFirst || !Overlaps(first, k) {
first = k
}

f.inner.AscendGreaterOrEqual(Valued{Key: first}, func(item btree.Item) bool {
r := item.(Valued)
Expand Down
37 changes: 37 additions & 0 deletions br/pkg/streamhelper/spans/sorted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,43 @@ func TestSubRange(t *testing.T) {
kv(s("0008", ""), 42),
},
},
{
Range: []spans.Span{
s("0001", "0004"),
s("0005", "0008"),
},
InputSequence: []spans.Valued{
kv(s("0001", "0002"), 42),
kv(s("0002", "0008"), 43),
kv(s("0004", "0007"), 45),
kv(s("0000", "00015"), 48),
},
Result: []spans.Valued{
kv(s("0001", "00015"), 48),
kv(s("00015", "0002"), 42),
kv(s("0002", "0004"), 43),
kv(s("0005", "0007"), 45),
kv(s("0007", "0008"), 43),
},
},
{
Range: []spans.Span{
s("0001", "0004"),
s("0005", "0008"),
},
InputSequence: []spans.Valued{
kv(s("0004", "0008"), 32),
kv(s("00041", "0007"), 33),
kv(s("0004", "00041"), 99999),
kv(s("0005", "0006"), 34),
},
Result: []spans.Valued{
kv(s("0001", "0004"), 0),
kv(s("0005", "0006"), 34),
kv(s("0006", "0007"), 33),
kv(s("0007", "0008"), 32),
},
},
}

for i, c := range cases {
Expand Down
1 change: 1 addition & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"domain.go",
"domain_sysvars.go",
"domainctx.go",
"historical_stats.go",
"optimize_trace.go",
"plan_replayer.go",
"plan_replayer_dump.go",
Expand Down
Loading

0 comments on commit bbfb53d

Please sign in to comment.