Skip to content

Commit

Permalink
Merge remote-tracking branch 'pingcap/master' into planPartInfo-rename
Browse files Browse the repository at this point in the history
  • Loading branch information
mjonss committed Jan 5, 2024
2 parents d34e4da + 4c7102c commit e0ba603
Show file tree
Hide file tree
Showing 247 changed files with 15,758 additions and 12,997 deletions.
24 changes: 12 additions & 12 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7032,13 +7032,13 @@ def go_deps():
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sha256 = "8b0b78d6aca161df84144b2c0481088f2fdb4aca6772a23576f6ab8eeafd13e0",
strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231227120157-cee6e63b9f06",
sha256 = "1f3e9568f3db14219b8f563315743dd581c718f0b1b0f55878da4bb955a62d4a",
strip_prefix = "github.com/tikv/pd/client@v0.0.0-20240103101103-a4d2f1ca365a",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231227120157-cee6e63b9f06.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231227120157-cee6e63b9f06.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231227120157-cee6e63b9f06.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231227120157-cee6e63b9f06.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240103101103-a4d2f1ca365a.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240103101103-a4d2f1ca365a.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240103101103-a4d2f1ca365a.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240103101103-a4d2f1ca365a.zip",
],
)
go_repository(
Expand Down Expand Up @@ -10234,13 +10234,13 @@ def go_deps():
name = "org_golang_x_sys",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/sys",
sha256 = "8612eb416c739c3b04ce48dcbe65632c6fbc427031fd981caeceec6410d1e1fc",
strip_prefix = "golang.org/x/sys@v0.15.0",
sha256 = "0175809134fc12e040ea427e927036692127f2891b72e224e5153da543af604a",
strip_prefix = "golang.org/x/sys@v0.16.0",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/golang.org/x/sys/org_golang_x_sys-v0.15.0.zip",
"http://ats.apps.svc/gomod/golang.org/x/sys/org_golang_x_sys-v0.15.0.zip",
"https://cache.hawkingrei.com/gomod/golang.org/x/sys/org_golang_x_sys-v0.15.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/golang.org/x/sys/org_golang_x_sys-v0.15.0.zip",
"http://bazel-cache.pingcap.net:8080/gomod/golang.org/x/sys/org_golang_x_sys-v0.16.0.zip",
"http://ats.apps.svc/gomod/golang.org/x/sys/org_golang_x_sys-v0.16.0.zip",
"https://cache.hawkingrei.com/gomod/golang.org/x/sys/org_golang_x_sys-v0.16.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/golang.org/x/sys/org_golang_x_sys-v0.16.0.zip",
],
)
go_repository(
Expand Down
3 changes: 1 addition & 2 deletions Makefile.common
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,4 @@ ifneq ("$(CI)", "")
BAZEL_CMD_CONFIG := --config=ci --repository_cache=/home/jenkins/.tidb/tmp
BAZEL_SYNC_CONFIG := --repository_cache=/home/jenkins/.tidb/tmp
endif
BAZEL_INSTRUMENTATION_FILTER_PACKAGE := go list ./...| sed 's/github.com\/pingcap\/tidb//g'
BAZEL_INSTRUMENTATION_FILTER := --instrument_test_targets --instrumentation_filter='${BAZEL_INSTRUMENTATION_FILTER_PACKAGE}'
BAZEL_INSTRUMENTATION_FILTER := --instrument_test_targets --instrumentation_filter=//pkg/...,//br/...,//dumpling/...
10 changes: 6 additions & 4 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ var (
ErrEnvNotSpecified = errors.Normalize("environment variable not found", errors.RFCCodeText("BR:Common:ErrEnvNotSpecified"))
ErrUnsupportedOperation = errors.Normalize("the operation is not supported", errors.RFCCodeText("BR:Common:ErrUnsupportedOperation"))

ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse"))
ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion"))
ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse"))
ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion"))
ErrPDUnknownScatterResult = errors.Normalize("failed to wait region scattered", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))
ErrPDSplitFailed = errors.Normalize("failed to wait region splitted", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))

ErrBackupChecksumMismatch = errors.Normalize("backup checksum mismatch", errors.RFCCodeText("BR:Backup:ErrBackupChecksumMismatch"))
ErrBackupInvalidRange = errors.Normalize("backup range invalid", errors.RFCCodeText("BR:Backup:ErrBackupInvalidRange"))
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ type LocalEngineConfig struct {
CompactThreshold int64
// compact routine concurrency
CompactConcurrency int

// blocksize
BlockSize int
}

// ExternalEngineConfig is the configuration used for local backend external engine.
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//br/pkg/logutil",
"//br/pkg/redact",
"//br/pkg/utils",
"//pkg/errctx",
"//pkg/expression",
"//pkg/kv",
"//pkg/meta/autoid",
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu
// if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic.
value = types.GetMinValue(&col.FieldType)
case isBadNullValue:
err = col.HandleBadNull(&value, e.SessionCtx.Vars.StmtCtx, 0)
err = col.HandleBadNull(e.SessionCtx.Vars.StmtCtx.ErrCtx(), &value, 0)
default:
// copy from the following GetColDefaultValue function, when this is true it will use getColDefaultExprValue
if col.DefaultIsExpr {
Expand Down
9 changes: 8 additions & 1 deletion br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/manual"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
Expand Down Expand Up @@ -285,14 +286,20 @@ func NewSession(options *encode.SessionOptions, logger log.Logger) *Session {
vars.SkipUTF8Check = true
vars.StmtCtx.InInsertStmt = true
vars.StmtCtx.BatchCheck = true
vars.StmtCtx.BadNullAsWarning = !sqlMode.HasStrictMode()
vars.SQLMode = sqlMode

typeFlags := vars.StmtCtx.TypeFlags().
WithTruncateAsWarning(!sqlMode.HasStrictMode()).
WithIgnoreInvalidDateErr(sqlMode.HasAllowInvalidDatesMode()).
WithIgnoreZeroInDate(!sqlMode.HasStrictMode() || sqlMode.HasAllowInvalidDatesMode())
vars.StmtCtx.SetTypeFlags(typeFlags)

errLevels := vars.StmtCtx.ErrLevels()
errLevels[errctx.ErrGroupBadNull] = errctx.ResolveErrLevel(false, !sqlMode.HasStrictMode())
errLevels[errctx.ErrGroupDividedByZero] =
errctx.ResolveErrLevel(!sqlMode.HasErrorForDivisionByZeroMode(), !sqlMode.HasStrictMode())
vars.StmtCtx.SetErrLevels(errLevels)

if options.SysVars != nil {
for k, v := range options.SysVars {
// since 6.3(current master) tidb checks whether we can set a system variable
Expand Down
14 changes: 7 additions & 7 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ func (e *Engine) ingestSSTLoop() {
}
ingestMetas := metas.metas
if e.config.Compact {
newMeta, err := e.sstIngester.mergeSSTs(metas.metas, e.sstDir)
newMeta, err := e.sstIngester.mergeSSTs(metas.metas, e.sstDir, e.config.BlockSize)
if err != nil {
e.setError(err)
return
Expand Down Expand Up @@ -1349,7 +1349,7 @@ func (w *Writer) addSST(ctx context.Context, meta *sstMeta) error {

func (w *Writer) createSSTWriter() (*sstWriter, error) {
path := filepath.Join(w.engine.sstDir, uuid.New().String()+".sst")
writer, err := newSSTWriter(path)
writer, err := newSSTWriter(path, w.engine.config.BlockSize)
if err != nil {
return nil, err
}
Expand All @@ -1365,7 +1365,7 @@ type sstWriter struct {
logger log.Logger
}

func newSSTWriter(path string) (*sstable.Writer, error) {
func newSSTWriter(path string, blockSize int) (*sstable.Writer, error) {
f, err := os.Create(path)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -1374,7 +1374,7 @@ func newSSTWriter(path string) (*sstable.Writer, error) {
TablePropertyCollectors: []func() pebble.TablePropertyCollector{
newRangePropertiesCollector,
},
BlockSize: 16 * 1024,
BlockSize: blockSize,
})
return writer, nil
}
Expand Down Expand Up @@ -1504,15 +1504,15 @@ func (h *sstIterHeap) Next() ([]byte, []byte, error) {
// sstIngester is a interface used to merge and ingest SST files.
// it's a interface mainly used for test convenience
type sstIngester interface {
mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error)
mergeSSTs(metas []*sstMeta, dir string, blockSize int) (*sstMeta, error)
ingest([]*sstMeta) error
}

type dbSSTIngester struct {
e *Engine
}

func (i dbSSTIngester) mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error) {
func (i dbSSTIngester) mergeSSTs(metas []*sstMeta, dir string, blockSize int) (*sstMeta, error) {
if len(metas) == 0 {
return nil, errors.New("sst metas is empty")
} else if len(metas) == 1 {
Expand Down Expand Up @@ -1561,7 +1561,7 @@ func (i dbSSTIngester) mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error)
heap.Init(mergeIter)

name := filepath.Join(dir, fmt.Sprintf("%s.sst", uuid.New()))
writer, err := newSSTWriter(name)
writer, err := newSSTWriter(name, blockSize)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (em *engineManager) openEngineDB(engineUUID uuid.UUID, readOnly bool) (*peb
opt.Levels = []pebble.LevelOptions{
{
TargetFileSize: 16 * units.GiB,
BlockSize: em.BlockSize,
},
}

Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ type BackendConfig struct {
// see DisableAutomaticCompactions of pebble.Options for more details.
// default true.
DisableAutomaticCompactions bool
BlockSize int
}

// NewBackendConfig creates a new BackendConfig.
Expand All @@ -430,6 +431,7 @@ func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName, resour
MaxConnPerStore: cfg.TikvImporter.RangeConcurrency,
ConnCompressType: cfg.TikvImporter.CompressKVPairs,
WorkerConcurrency: cfg.TikvImporter.RangeConcurrency * 2,
BlockSize: int(cfg.TikvImporter.BlockSize),
KVWriteBatchSize: int64(cfg.TikvImporter.SendKVSize),
RegionSplitBatchSize: cfg.TikvImporter.RegionSplitBatchSize,
RegionSplitConcurrency: cfg.TikvImporter.RegionSplitConcurrency,
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (c *mockSplitClient) GetRegion(ctx context.Context, key []byte) (*split.Reg

type testIngester struct{}

func (i testIngester) mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error) {
func (i testIngester) mergeSSTs(metas []*sstMeta, dir string, blockSize int) (*sstMeta, error) {
if len(metas) == 0 {
return nil, errors.New("sst metas is empty")
} else if len(metas) == 1 {
Expand Down Expand Up @@ -592,7 +592,7 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) {

createSSTWriter := func() (*sstWriter, error) {
path := filepath.Join(f.sstDir, uuid.New().String()+".sst")
writer, err := newSSTWriter(path)
writer, err := newSSTWriter(path, 16*1024)
if err != nil {
return nil, err
}
Expand All @@ -614,7 +614,7 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) {
}

i := dbSSTIngester{e: f}
newMeta, err := i.mergeSSTs(metas, tmpPath)
newMeta, err := i.mergeSSTs(metas, tmpPath, 16*1024)
require.NoError(t, err)

require.Equal(t, meta.totalCount, newMeta.totalCount)
Expand Down
51 changes: 24 additions & 27 deletions br/pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,12 +1615,12 @@ func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, table
Logger: log.FromContext(ctx).With(zap.String("table", tableName)),
}
err := s.Transact(ctx, "ignore error checkpoints", func(c context.Context, tx *sql.Tx) error {
query := fmt.Sprintf("UPDATE %s.%s SET status = ? WHERE ? = ? AND status <= ?", cpdb.schema, CheckpointTableNameEngine)
if _, e := tx.ExecContext(c, query, CheckpointStatusLoaded, colName, tableName, CheckpointStatusMaxInvalid); e != nil {
query := fmt.Sprintf("UPDATE %s.%s SET status = ? WHERE %s = ? AND status <= ?", cpdb.schema, CheckpointTableNameEngine, colName)
if _, e := tx.ExecContext(c, query, CheckpointStatusLoaded, tableName, CheckpointStatusMaxInvalid); e != nil {
return errors.Trace(e)
}
query = fmt.Sprintf("UPDATE %s.%s SET status = ? WHERE ? = ? AND status <= ?", cpdb.schema, CheckpointTableNameTable)
if _, e := tx.ExecContext(c, query, CheckpointStatusLoaded, colName, tableName, CheckpointStatusMaxInvalid); e != nil {
query = fmt.Sprintf("UPDATE %s.%s SET status = ? WHERE %s = ? AND status <= ?", cpdb.schema, CheckpointTableNameTable, colName)
if _, e := tx.ExecContext(c, query, CheckpointStatusLoaded, tableName, CheckpointStatusMaxInvalid); e != nil {
return errors.Trace(e)
}
return nil
Expand All @@ -1643,30 +1643,27 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl
}

selectQuery := fmt.Sprintf(`
SELECT
t.table_name,
COALESCE(MIN(e.engine_id), 0),
COALESCE(MAX(e.engine_id), -1)
FROM %[1]s.%[4]s t
LEFT JOIN %[1]s.%[5]s e ON t.table_name = e.table_name
WHERE %[2]s = ? AND t.status <= %[3]d
GROUP BY t.table_name;
`, cpdb.schema, aliasedColName, CheckpointStatusMaxInvalid, CheckpointTableNameTable, CheckpointTableNameEngine)

// nolint:gosec
SELECT
t.table_name,
COALESCE(MIN(e.engine_id), 0),
COALESCE(MAX(e.engine_id), -1)
FROM %[1]s.%[3]s t
LEFT JOIN %[1]s.%[4]s e ON t.table_name = e.table_name
WHERE %[2]s = ? AND t.status <= ?
GROUP BY t.table_name;
`, cpdb.schema, aliasedColName, CheckpointTableNameTable, CheckpointTableNameEngine)

deleteChunkQuery := fmt.Sprintf(`
DELETE FROM %[1]s.%[4]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[5]s WHERE %[2]s = ? AND status <= %[3]d)
`, cpdb.schema, colName, CheckpointStatusMaxInvalid, CheckpointTableNameChunk, CheckpointTableNameTable)
DELETE FROM %[1]s.%[3]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[4]s WHERE %[2]s = ? AND status <= ?)
`, cpdb.schema, colName, CheckpointTableNameChunk, CheckpointTableNameTable)

// nolint:gosec
deleteEngineQuery := fmt.Sprintf(`
DELETE FROM %[1]s.%[4]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[5]s WHERE %[2]s = ? AND status <= %[3]d)
`, cpdb.schema, colName, CheckpointStatusMaxInvalid, CheckpointTableNameEngine, CheckpointTableNameTable)
DELETE FROM %[1]s.%[3]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[4]s WHERE %[2]s = ? AND status <= ?)
`, cpdb.schema, colName, CheckpointTableNameEngine, CheckpointTableNameTable)

// nolint:gosec
deleteTableQuery := fmt.Sprintf(`
DELETE FROM %s.%s WHERE %s = ? AND status <= %d
`, cpdb.schema, CheckpointTableNameTable, colName, CheckpointStatusMaxInvalid)
DELETE FROM %s.%s WHERE %s = ? AND status <= ?
`, cpdb.schema, CheckpointTableNameTable, colName)

var targetTables []DestroyedTableCheckpoint

Expand All @@ -1677,7 +1674,7 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl
err := s.Transact(ctx, "destroy error checkpoints", func(c context.Context, tx *sql.Tx) error {
// Obtain the list of tables
targetTables = nil
rows, e := tx.QueryContext(c, selectQuery, tableName) // #nosec G201
rows, e := tx.QueryContext(c, selectQuery, tableName, CheckpointStatusMaxInvalid)
if e != nil {
return errors.Trace(e)
}
Expand All @@ -1695,13 +1692,13 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl
}

// Delete the checkpoints
if _, e := tx.ExecContext(c, deleteChunkQuery, tableName); e != nil {
if _, e := tx.ExecContext(c, deleteChunkQuery, tableName, CheckpointStatusMaxInvalid); e != nil {
return errors.Trace(e)
}
if _, e := tx.ExecContext(c, deleteEngineQuery, tableName); e != nil {
if _, e := tx.ExecContext(c, deleteEngineQuery, tableName, CheckpointStatusMaxInvalid); e != nil {
return errors.Trace(e)
}
if _, e := tx.ExecContext(c, deleteTableQuery, tableName); e != nil {
if _, e := tx.ExecContext(c, deleteTableQuery, tableName, CheckpointStatusMaxInvalid); e != nil {
return errors.Trace(e)
}
return nil
Expand Down
Loading

0 comments on commit e0ba603

Please sign in to comment.