Skip to content

Commit

Permalink
Merge branch 'master' into placement_recover
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored Dec 15, 2021
2 parents d5cbd60 + 813f6ef commit 3336b37
Show file tree
Hide file tree
Showing 45 changed files with 463 additions and 324 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ linters:
- rowserrcheck
- unconvert
- makezero
- durationcheck

linters-settings:
staticcheck:
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ func (r *testBackup) TestSendCreds(c *C) {
c.Assert(err, IsNil)
opts := &storage.ExternalStorageOptions{
SendCredentials: true,
SkipCheckPath: true,
}
_, err = storage.New(r.ctx, backend, opts)
c.Assert(err, IsNil)
Expand All @@ -284,7 +283,6 @@ func (r *testBackup) TestSendCreds(c *C) {
c.Assert(err, IsNil)
opts = &storage.ExternalStorageOptions{
SendCredentials: false,
SkipCheckPath: true,
}
_, err = storage.New(r.ctx, backend, opts)
c.Assert(err, IsNil)
Expand Down
13 changes: 13 additions & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,19 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.
return errors.Annotate(err, "create storage failed")
}

// return expectedErr means at least meet one file
expectedErr := errors.New("Stop Iter")
walkErr := s.WalkDir(ctx, &storage.WalkOption{ListCount: 1}, func(string, int64) error {
// return an error when meet the first regular file to break the walk loop
return expectedErr
})
if !errors.ErrorEqual(walkErr, expectedErr) {
if walkErr == nil {
return errors.Errorf("data-source-dir '%s' doesn't exist or contains no files", taskCfg.Mydumper.SourceDir)
}
return errors.Annotatef(walkErr, "visit data-source-dir '%s' failed", taskCfg.Mydumper.SourceDir)
}

loadTask := log.L().Begin(zap.InfoLevel, "load data source")
var mdl *mydump.MDLoader
mdl, err = mydump.NewMyDumpLoaderWithStore(ctx, taskCfg, s)
Expand Down
8 changes: 0 additions & 8 deletions br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,6 @@ func newGCSStorage(ctx context.Context, gcs *backuppb.GCS, opts *ExternalStorage
// so we need find sst in slash directory
gcs.Prefix += "//"
}
// TODO remove it after BR remove cfg skip-check-path
if !opts.SkipCheckPath {
// check bucket exists
_, err = bucket.Attrs(ctx)
if err != nil {
return nil, errors.Annotatef(err, "gcs://%s/%s", gcs.Bucket, gcs.Prefix)
}
}
return &gcsStorage{gcs: gcs, bucket: bucket}, nil
}

Expand Down
8 changes: 0 additions & 8 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,6 @@ func newS3Storage(backend *backuppb.S3, opts *ExternalStorageOptions) (*S3Storag
}

c := s3.New(ses)
// TODO remove it after BR remove cfg skip-check-path
if !opts.SkipCheckPath {
err = checkS3Bucket(c, &qs)
if err != nil {
return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "Bucket %s is not accessible: %v", qs.Bucket, err)
}
}

if len(qs.Prefix) > 0 && !strings.HasSuffix(qs.Prefix, "/") {
qs.Prefix += "/"
}
Expand Down
3 changes: 1 addition & 2 deletions br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ func (s *s3Suite) TestS3Storage(c *C) {
_, err := New(ctx, s3, &ExternalStorageOptions{
SendCredentials: test.sendCredential,
CheckPermissions: test.hackPermission,
SkipCheckPath: true,
})
if test.errReturn {
c.Assert(err, NotNil)
Expand Down Expand Up @@ -414,7 +413,7 @@ func (s *s3Suite) TestS3Storage(c *C) {
func (s *s3Suite) TestS3URI(c *C) {
backend, err := ParseBackend("s3://bucket/prefix/", nil)
c.Assert(err, IsNil)
storage, err := New(context.Background(), backend, &ExternalStorageOptions{SkipCheckPath: true})
storage, err := New(context.Background(), backend, &ExternalStorageOptions{})
c.Assert(err, IsNil)
c.Assert(storage.URI(), Equals, "s3://bucket/prefix/")
}
Expand Down
13 changes: 0 additions & 13 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,6 @@ type ExternalStorageOptions struct {
// NoCredentials means that no cloud credentials are supplied to BR
NoCredentials bool

// SkipCheckPath marks whether to skip checking path's existence.
//
// This should only be set to true in testing, to avoid interacting with the
// real world.
// When this field is false (i.e. path checking is enabled), the New()
// function will ensure the path referred by the backend exists by
// recursively creating the folders. This will also throw an error if such
// operation is impossible (e.g. when the bucket storing the path is missing).

// deprecated: use checkPermissions and specify the checkPermission instead.
SkipCheckPath bool

// HTTPClient to use. The created storage may ignore this field if it is not
// directly using HTTP (e.g. the local storage).
HTTPClient *http.Client
Expand All @@ -148,7 +136,6 @@ type ExternalStorageOptions struct {
func Create(ctx context.Context, backend *backuppb.StorageBackend, sendCreds bool) (ExternalStorage, error) {
return New(ctx, backend, &ExternalStorageOptions{
SendCredentials: sendCreds,
SkipCheckPath: false,
HTTPClient: nil,
})
}
Expand Down
1 change: 0 additions & 1 deletion br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
SkipCheckPath: cfg.SkipCheckPath,
}
if err = client.SetStorage(ctx, u, &opts); err != nil {
return errors.Trace(err)
Expand Down
1 change: 0 additions & 1 deletion br/pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
SkipCheckPath: cfg.SkipCheckPath,
}
if err = client.SetStorage(ctx, u, &opts); err != nil {
return errors.Trace(err)
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
if cfg.SkipCheckPath, err = flags.GetBool(flagSkipCheckPath); err != nil {
return errors.Trace(err)
}
if cfg.SkipCheckPath {
log.L().Info("--skip-check-path is deprecated, need explicitly set it anymore")
}

if err = cfg.parseCipherInfo(flags); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -548,7 +551,6 @@ func storageOpts(cfg *Config) *storage.ExternalStorageOptions {
return &storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
SkipCheckPath: cfg.SkipCheckPath,
}
}

Expand Down
1 change: 0 additions & 1 deletion br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
SkipCheckPath: cfg.SkipCheckPath,
}
if err = client.SetStorage(ctx, u, &opts); err != nil {
return errors.Trace(err)
Expand Down
14 changes: 14 additions & 0 deletions br/tests/lightning_s3/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ _EOF_
run_sql "DROP DATABASE IF EXISTS $DB;"
run_sql "DROP TABLE IF EXISTS $DB.$TABLE;"

# test not exist path
rm -f $TEST_DIR/lightning.log
SOURCE_DIR="s3://$BUCKET/not-exist-path?endpoint=http%3A//127.0.0.1%3A9900&access_key=$MINIO_ACCESS_KEY&secret_access_key=$MINIO_SECRET_KEY&force_path_style=true"
! run_lightning -d $SOURCE_DIR --backend local 2> /dev/null
grep -Eq "data-source-dir .* doesn't exist or contains no files" $TEST_DIR/lightning.log

# test empty dir
rm -f $TEST_DIR/lightning.log
emptyPath=empty-bucket/empty-path
mkdir -p $DBPATH/$emptyPath
SOURCE_DIR="s3://$emptyPath/not-exist-path?endpoint=http%3A//127.0.0.1%3A9900&access_key=$MINIO_ACCESS_KEY&secret_access_key=$MINIO_SECRET_KEY&force_path_style=true"
! run_lightning -d $SOURCE_DIR --backend local 2> /dev/null
grep -Eq "data-source-dir .* doesn't exist or contains no files" $TEST_DIR/lightning.log

SOURCE_DIR="s3://$BUCKET/?endpoint=http%3A//127.0.0.1%3A9900&access_key=$MINIO_ACCESS_KEY&secret_access_key=$MINIO_SECRET_KEY&force_path_style=true"
run_lightning -d $SOURCE_DIR --backend local 2> /dev/null
run_sql "SELECT count(*), sum(i) FROM \`$DB\`.$TABLE"
Expand Down
2 changes: 1 addition & 1 deletion executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error {
}
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.cancelFunc = nil
e.innerPtrBytes = make([][]byte, 0, 8)
if e.runtimeStats != nil {
e.stats = &indexLookUpJoinRuntimeStats{}
Expand Down Expand Up @@ -311,7 +312,6 @@ func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool {
func (e *IndexNestedLoopHashJoin) Close() error {
if e.cancelFunc != nil {
e.cancelFunc()
e.cancelFunc = nil
}
if e.resultCh != nil {
for range e.resultCh {
Expand Down
1 change: 1 addition & 0 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error {
e.stats = &indexLookUpJoinRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.cancelFunc = nil
e.startWorkers(ctx)
return nil
}
Expand Down
17 changes: 9 additions & 8 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,16 +366,11 @@ func (s *testSerialSuite1) TestSetVar(c *C) {
tk.MustExec("set @@tidb_expensive_query_time_threshold=70")
tk.MustQuery("select @@tidb_expensive_query_time_threshold;").Check(testkit.Rows("70"))

tk.MustQuery("select @@tidb_store_limit;").Check(testkit.Rows("0"))
tk.MustExec("set @@tidb_store_limit = 100")
tk.MustQuery("select @@tidb_store_limit;").Check(testkit.Rows("100"))
tk.MustQuery("select @@session.tidb_store_limit;").Check(testkit.Rows("100"))
tk.MustQuery("select @@global.tidb_store_limit;").Check(testkit.Rows("0"))
tk.MustExec("set @@tidb_store_limit = 0")

tk.MustExec("set @@global.tidb_store_limit = 100")
tk.MustQuery("select @@global.tidb_store_limit;").Check(testkit.Rows("100"))
tk.MustExec("set @@global.tidb_store_limit = 0")
tk.MustExec("set global tidb_store_limit = 100")
tk.MustQuery("select @@tidb_store_limit;").Check(testkit.Rows("100"))
tk.MustQuery("select @@session.tidb_store_limit;").Check(testkit.Rows("100"))
tk.MustQuery("select @@global.tidb_store_limit;").Check(testkit.Rows("100"))

tk.MustQuery("select @@session.tidb_metric_query_step;").Check(testkit.Rows("60"))
Expand Down Expand Up @@ -587,6 +582,12 @@ func (s *testSerialSuite1) TestSetVar(c *C) {
tk.MustExec("set global tidb_enable_tso_follower_proxy = 0")
tk.MustQuery("select @@tidb_enable_tso_follower_proxy").Check(testkit.Rows("0"))
c.Assert(tk.ExecToErr("set tidb_enable_tso_follower_proxy = 1"), NotNil)

tk.MustQuery("select @@tidb_enable_historical_stats").Check(testkit.Rows("0"))
tk.MustExec("set global tidb_enable_historical_stats = 1")
tk.MustQuery("select @@tidb_enable_historical_stats").Check(testkit.Rows("1"))
tk.MustExec("set global tidb_enable_historical_stats = 0")
tk.MustQuery("select @@tidb_enable_historical_stats").Check(testkit.Rows("0"))
}

func (s *testSuite5) TestTruncateIncorrectIntSessionVar(c *C) {
Expand Down
4 changes: 2 additions & 2 deletions metrics/grafana/tidb.json
Original file line number Diff line number Diff line change
Expand Up @@ -5426,7 +5426,7 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(tidb_tikvclient_safets_gap_seconds{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (instance, store)",
"expr": "tidb_tikvclient_min_safets_gap_seconds{tidb_cluster=\"$tidb_cluster\"}",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{instance}}-store-{{store}}",
Expand All @@ -5438,7 +5438,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Max SafeTS gap",
"title": "Max SafeTS Gap",
"tooltip": {
"msResolution": false,
"shared": true,
Expand Down
17 changes: 8 additions & 9 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
tidbutil "github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -1478,7 +1478,7 @@ func getMostCorrCol4Handle(exprs []expression.Expression, histColl *statistics.T
}

// getColumnRangeCounts estimates row count for each range respectively.
func getColumnRangeCounts(sc *stmtctx.StatementContext, colID int64, ranges []*ranger.Range, histColl *statistics.HistColl, idxID int64) ([]float64, bool) {
func getColumnRangeCounts(sctx sessionctx.Context, colID int64, ranges []*ranger.Range, histColl *statistics.HistColl, idxID int64) ([]float64, bool) {
var err error
var count float64
rangeCounts := make([]float64, len(ranges))
Expand All @@ -1488,13 +1488,13 @@ func getColumnRangeCounts(sc *stmtctx.StatementContext, colID int64, ranges []*r
if idxHist == nil || idxHist.IsInvalid(false) {
return nil, false
}
count, err = histColl.GetRowCountByIndexRanges(sc, idxID, []*ranger.Range{ran})
count, err = histColl.GetRowCountByIndexRanges(sctx, idxID, []*ranger.Range{ran})
} else {
colHist, ok := histColl.Columns[colID]
if !ok || colHist.IsInvalid(sc, false) {
if !ok || colHist.IsInvalid(sctx, false) {
return nil, false
}
count, err = histColl.GetRowCountByColumnRanges(sc, colID, []*ranger.Range{ran})
count, err = histColl.GetRowCountByColumnRanges(sctx, colID, []*ranger.Range{ran})
}
if err != nil {
return nil, false
Expand Down Expand Up @@ -1564,7 +1564,6 @@ func (ds *DataSource) crossEstimateRowCount(path *util.AccessPath, conds []expre
if len(accessConds) == 0 {
return 0, false, corr
}
sc := ds.ctx.GetSessionVars().StmtCtx
ranges, err := ranger.BuildColumnRange(accessConds, ds.ctx, col.RetType, types.UnspecifiedLength)
if len(ranges) == 0 || err != nil {
return 0, err == nil, corr
Expand All @@ -1573,7 +1572,7 @@ func (ds *DataSource) crossEstimateRowCount(path *util.AccessPath, conds []expre
if !idxExists {
idxID = -1
}
rangeCounts, ok := getColumnRangeCounts(sc, colID, ranges, ds.tableStats.HistColl, idxID)
rangeCounts, ok := getColumnRangeCounts(ds.ctx, colID, ranges, ds.tableStats.HistColl, idxID)
if !ok {
return 0, false, corr
}
Expand All @@ -1583,9 +1582,9 @@ func (ds *DataSource) crossEstimateRowCount(path *util.AccessPath, conds []expre
}
var rangeCount float64
if idxExists {
rangeCount, err = ds.tableStats.HistColl.GetRowCountByIndexRanges(sc, idxID, convertedRanges)
rangeCount, err = ds.tableStats.HistColl.GetRowCountByIndexRanges(ds.ctx, idxID, convertedRanges)
} else {
rangeCount, err = ds.tableStats.HistColl.GetRowCountByColumnRanges(sc, colID, convertedRanges)
rangeCount, err = ds.tableStats.HistColl.GetRowCountByColumnRanges(ds.ctx, colID, convertedRanges)
}
if err != nil {
return 0, false, corr
Expand Down
9 changes: 3 additions & 6 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,6 @@ func (ds *DataSource) deriveCommonHandleTablePathStats(path *util.AccessPath, co
if len(conds) == 0 {
return nil
}
sc := ds.ctx.GetSessionVars().StmtCtx
if len(path.IdxCols) != 0 {
res, err := ranger.DetachCondAndBuildRangeForIndex(ds.ctx, conds, path.IdxCols, path.IdxColLens)
if err != nil {
Expand All @@ -744,7 +743,7 @@ func (ds *DataSource) deriveCommonHandleTablePathStats(path *util.AccessPath, co
path.ConstCols[i] = res.ColumnValues[i] != nil
}
}
path.CountAfterAccess, err = ds.tableStats.HistColl.GetRowCountByIndexRanges(sc, path.Index.ID, path.Ranges)
path.CountAfterAccess, err = ds.tableStats.HistColl.GetRowCountByIndexRanges(ds.ctx, path.Index.ID, path.Ranges)
if err != nil {
return err
}
Expand Down Expand Up @@ -785,7 +784,6 @@ func (ds *DataSource) deriveTablePathStats(path *util.AccessPath, conds []expres
return ds.deriveCommonHandleTablePathStats(path, conds, isIm)
}
var err error
sc := ds.ctx.GetSessionVars().StmtCtx
path.CountAfterAccess = float64(ds.statisticTable.Count)
path.TableFilters = conds
var pkCol *expression.Column
Expand Down Expand Up @@ -848,7 +846,7 @@ func (ds *DataSource) deriveTablePathStats(path *util.AccessPath, conds []expres
if err != nil {
return err
}
path.CountAfterAccess, err = ds.statisticTable.GetRowCountByIntColumnRanges(sc, pkCol.ID, path.Ranges)
path.CountAfterAccess, err = ds.statisticTable.GetRowCountByIntColumnRanges(ds.ctx, pkCol.ID, path.Ranges)
// If the `CountAfterAccess` is less than `stats.RowCount`, there must be some inconsistent stats info.
// We prefer the `stats.RowCount` because it could use more stats info to calculate the selectivity.
if path.CountAfterAccess < ds.stats.RowCount && !isIm {
Expand All @@ -858,7 +856,6 @@ func (ds *DataSource) deriveTablePathStats(path *util.AccessPath, conds []expres
}

func (ds *DataSource) fillIndexPath(path *util.AccessPath, conds []expression.Expression) error {
sc := ds.ctx.GetSessionVars().StmtCtx
path.Ranges = ranger.FullRange()
path.CountAfterAccess = float64(ds.statisticTable.Count)
path.IdxCols, path.IdxColLens = expression.IndexInfo2PrefixCols(ds.Columns, ds.schema.Columns, path.Index)
Expand Down Expand Up @@ -900,7 +897,7 @@ func (ds *DataSource) fillIndexPath(path *util.AccessPath, conds []expression.Ex
path.ConstCols[i] = res.ColumnValues[i] != nil
}
}
path.CountAfterAccess, err = ds.tableStats.HistColl.GetRowCountByIndexRanges(sc, path.Index.ID, path.Ranges)
path.CountAfterAccess, err = ds.tableStats.HistColl.GetRowCountByIndexRanges(ds.ctx, path.Index.ID, path.Ranges)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions planner/core/rule_partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (s *partitionProcessor) findUsedPartitions(ctx sessionctx.Context, tbl tabl
ranges := detachedResult.Ranges
used := make([]int, 0, len(ranges))
for _, r := range ranges {
if r.IsPointNullable(ctx.GetSessionVars().StmtCtx) {
if r.IsPointNullable(ctx) {
if !r.HighVal[0].IsNull() {
if len(r.HighVal) != len(partIdx) {
used = []int{-1}
Expand Down Expand Up @@ -473,7 +473,7 @@ func (l *listPartitionPruner) locateColumnPartitionsByCondition(cond expression.
return nil, true, nil
}
var locations []tables.ListPartitionLocation
if r.IsPointNullable(l.ctx.GetSessionVars().StmtCtx) {
if r.IsPointNullable(l.ctx) {
location, err := colPrune.LocatePartition(sc, r.HighVal[0])
if types.ErrOverflow.Equal(err) {
return nil, true, nil // return full-scan if over-flow
Expand Down Expand Up @@ -555,7 +555,7 @@ func (l *listPartitionPruner) findUsedListPartitions(conds []expression.Expressi
}
used := make(map[int]struct{}, len(ranges))
for _, r := range ranges {
if r.IsPointNullable(l.ctx.GetSessionVars().StmtCtx) {
if r.IsPointNullable(l.ctx) {
if len(r.HighVal) != len(exprCols) {
return l.fullRange, nil
}
Expand Down
Loading

0 comments on commit 3336b37

Please sign in to comment.