Skip to content

Commit

Permalink
Merge branch 'master' into enum-join
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored Nov 4, 2021
2 parents 325a6a8 + 86caab9 commit f8c2b87
Show file tree
Hide file tree
Showing 102 changed files with 2,632 additions and 1,463 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/dumpling_integration_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
- name: Download dependencies
run: sh dumpling/install.sh
- name: Integration test
run: make dumpling_integration_test
run: make dumpling_integration_test VERBOSE="true"
- name: Set up tmate session
if: ${{ failure() }}
uses: mxschmitt/action-tmate@v3
Expand Down Expand Up @@ -114,7 +114,7 @@ jobs:
- name: Download dependencies
run: sh dumpling/install.sh
- name: Integration test
run: make dumpling_integration_test
run: make dumpling_integration_test VERBOSE="true"
- name: Set up tmate session
if: ${{ failure() }}
uses: mxschmitt/action-tmate@v3
Expand Down Expand Up @@ -151,7 +151,7 @@ jobs:
- name: Download dependencies
run: sh dumpling/install.sh
- name: Integration test
run: make dumpling_integration_test
run: make dumpling_integration_test VERBOSE="true"
- name: Set up tmate session
if: ${{ failure() }}
uses: mxschmitt/action-tmate@v3
1 change: 1 addition & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Glue interface {
// Session is an abstraction of the session.Session interface.
type Session interface {
Execute(ctx context.Context, sql string) error
ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error
CreateDatabase(ctx context.Context, schema *model.DBInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error
Close()
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func (gs *tidbSession) Execute(ctx context.Context, sql string) error {
return errors.Trace(err)
}

func (gs *tidbSession) ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error {
_, err := gs.se.ExecuteInternal(ctx, sql, args...)
return errors.Trace(err)
}

// CreateDatabase implements glue.Session.
func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
d := domain.GetDomain(gs.se).DDL()
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/mydump/parquet_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s testParquetParserSuite) TestParquetParser(c *C) {
verifyRow := func(i int) {
c.Assert(reader.lastRow.RowID, Equals, int64(i+1))
c.Assert(len(reader.lastRow.Row), Equals, 2)
c.Assert(reader.lastRow.Row[0], DeepEquals, types.NewCollationStringDatum(strconv.Itoa(i), "", 0))
c.Assert(reader.lastRow.Row[0], DeepEquals, types.NewCollationStringDatum(strconv.Itoa(i), ""))
c.Assert(reader.lastRow.Row[1], DeepEquals, types.NewIntDatum(int64(i)))
}

Expand Down Expand Up @@ -184,7 +184,7 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) {

for i, testCase := range cases {
c.Assert(reader.ReadRow(), IsNil)
vals := []types.Datum{types.NewCollationStringDatum(testCase[1].(string), "", 0)}
vals := []types.Datum{types.NewCollationStringDatum(testCase[1].(string), "")}
if i%2 == 0 {
vals = append(vals, vals[0])
} else {
Expand Down
53 changes: 36 additions & 17 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ func (rc *Client) GoValidateChecksum(
// run the stat loader
go func() {
defer wg.Done()
rc.statLoader(ctx, loadStatCh)
rc.updateMetaAndLoadStats(ctx, loadStatCh)
}()
workers := utils.NewWorkerPool(defaultChecksumConcurrency, "RestoreChecksum")
go func() {
Expand Down Expand Up @@ -842,7 +842,13 @@ func (rc *Client) GoValidateChecksum(
return outCh
}

func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient kv.Client, concurrency uint, loadStatCh chan<- *CreatedTable) error {
func (rc *Client) execChecksum(
ctx context.Context,
tbl CreatedTable,
kvClient kv.Client,
concurrency uint,
loadStatCh chan<- *CreatedTable,
) error {
logger := log.With(
zap.String("db", tbl.OldTable.DB.Name.O),
zap.String("table", tbl.OldTable.Info.Name.O),
Expand Down Expand Up @@ -891,13 +897,12 @@ func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient k
)
return errors.Annotate(berrors.ErrRestoreChecksumMismatch, "failed to validate checksum")
}
if table.Stats != nil {
loadStatCh <- &tbl
}

loadStatCh <- &tbl
return nil
}

func (rc *Client) statLoader(ctx context.Context, input <-chan *CreatedTable) {
func (rc *Client) updateMetaAndLoadStats(ctx context.Context, input <-chan *CreatedTable) {
for {
select {
case <-ctx.Done():
Expand All @@ -906,19 +911,33 @@ func (rc *Client) statLoader(ctx context.Context, input <-chan *CreatedTable) {
if !ok {
return
}

// Not need to return err when failed because of update analysis-meta
restoreTS, err := rc.GetTS(ctx)
if err != nil {
log.Error("getTS failed", zap.Error(err))
} else {
err = rc.db.UpdateStatsMeta(ctx, tbl.Table.ID, restoreTS, tbl.OldTable.TotalKvs)
if err != nil {
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(err))
}
}

table := tbl.OldTable
log.Info("start loads analyze after validate checksum",
zap.Int64("old id", tbl.OldTable.Info.ID),
zap.Int64("new id", tbl.Table.ID),
)
start := time.Now()
if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil {
log.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err))
if table.Stats != nil {
log.Info("start loads analyze after validate checksum",
zap.Int64("old id", tbl.OldTable.Info.ID),
zap.Int64("new id", tbl.Table.ID),
)
start := time.Now()
if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil {
log.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err))
}
log.Info("restore stat done",
zap.String("table", table.Info.Name.L),
zap.String("db", table.DB.Name.L),
zap.Duration("cost", time.Since(start)))
}
log.Info("restore stat done",
zap.String("table", table.Info.Name.L),
zap.String("db", table.DB.Name.L),
zap.Duration("cost", time.Since(start)))
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -92,6 +93,28 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
return errors.Trace(err)
}

// UpdateStatsMeta update count and snapshot ts in mysql.stats_meta
func (db *DB) UpdateStatsMeta(ctx context.Context, tableID int64, restoreTS uint64, count uint64) error {
sysDB := mysql.SystemDB
statsMetaTbl := "stats_meta"

// set restoreTS to snapshot and version which is used to update stats_meta
err := db.se.ExecuteInternal(
ctx,
"update %n.%n set snapshot = %?, version = %?, count = %? where table_id = %?",
sysDB,
statsMetaTbl,
restoreTS,
restoreTS,
count,
tableID,
)
if err != nil {
log.Error("execute update sql failed", zap.Error(err))
}
return nil
}

// CreateDatabase executes a CREATE DATABASE SQL.
func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
err := db.se.CreateDatabase(ctx, schema)
Expand Down
11 changes: 11 additions & 0 deletions cmd/explaintest/r/new_character_set_builtin.result
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,14 @@ select md5(a), md5(b), md5(c) from t;
md5(a) md5(b) md5(c)
8093a32450075324682d01456d6e3919 a45d4af7b243e7f393fa09bed72ac73e aae0117857fe54811a5239275dd81133
set @@tidb_enable_vectorized_expression = false;
drop table if exists t;
create table t (a char(20) charset utf8mb4, b char(20) charset gbk, c binary(20));
insert into t values ('一二三', '一二三', '一二三');
select decode(encode(a,"monty"),"monty") = a, md5(decode(encode(b,"monty"),"monty")) = md5(b), decode(encode(c,"monty"),"monty") = c from t;
decode(encode(a,"monty"),"monty") = a md5(decode(encode(b,"monty"),"monty")) = md5(b) decode(encode(c,"monty"),"monty") = c
1 1 1
set @@tidb_enable_vectorized_expression = true;
select decode(encode(a,"monty"),"monty") = a, md5(decode(encode(b,"monty"),"monty")) = md5(b), decode(encode(c,"monty"),"monty") = c from t;
decode(encode(a,"monty"),"monty") = a md5(decode(encode(b,"monty"),"monty")) = md5(b) decode(encode(c,"monty"),"monty") = c
1 1 1
set @@tidb_enable_vectorized_expression = false;
9 changes: 9 additions & 0 deletions cmd/explaintest/t/new_character_set_builtin.test
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,12 @@ select md5(a), md5(b), md5(c) from t;
set @@tidb_enable_vectorized_expression = true;
select md5(a), md5(b), md5(c) from t;
set @@tidb_enable_vectorized_expression = false;

-- test for builtin function decode()/encode()
drop table if exists t;
create table t (a char(20) charset utf8mb4, b char(20) charset gbk, c binary(20));
insert into t values ('一二三', '一二三', '一二三');
select decode(encode(a,"monty"),"monty") = a, md5(decode(encode(b,"monty"),"monty")) = md5(b), decode(encode(c,"monty"),"monty") = c from t;
set @@tidb_enable_vectorized_expression = true;
select decode(encode(a,"monty"),"monty") = a, md5(decode(encode(b,"monty"),"monty")) = md5(b), decode(encode(c,"monty"),"monty") = c from t;
set @@tidb_enable_vectorized_expression = false;
1 change: 0 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1284,7 +1284,6 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i
// BackfillDataInTxn will backfill table index in a transaction, lock corresponding rowKey, if the value of rowKey is changed,
// indicate that index columns values may changed, index is not allowed to be added, so the txn will rollback and retry.
// BackfillDataInTxn will add w.batchCnt indices once, default value of w.batchCnt is 128.
// TODO: make w.batchCnt can be modified by system variable.
func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
failpoint.Inject("errorMockPanic", func(val failpoint.Value) {
if val.(bool) {
Expand Down
3 changes: 3 additions & 0 deletions ddl/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,10 @@ func (s *testSerialSuite) TestTableLocksEnable(c *C) {
})

tk.MustExec("lock tables t1 write")
tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1235 LOCK TABLES is not supported. To enable this experimental feature, set 'enable-table-lock' in the configuration file."))
checkTableLock(c, tk.Se, "test", "t1", model.TableLockNone)
tk.MustExec("unlock tables")
tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1235 UNLOCK TABLES is not supported. To enable this experimental feature, set 'enable-table-lock' in the configuration file."))
}

func (s *testSerialDBSuite) TestAutoRandomOnTemporaryTable(c *C) {
Expand Down
3 changes: 2 additions & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import (

// DispatchMPPTasks dispatches all tasks and returns an iterator.
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int) (SelectResult, error) {
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks)
_, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback)
if resp == nil {
err := errors.New("client returns nil response")
return nil, err
Expand Down
7 changes: 7 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,13 @@ const (
// NotifyUpdatePrivilege updates privilege key in etcd, TiDB client that watches
// the key will get notification.
func (do *Domain) NotifyUpdatePrivilege() error {
// If skip-grant-table is configured, do not flush privileges.
// Because LoadPrivilegeLoop does not run and the privilege Handle is nil,
// the call to do.PrivilegeHandle().Update would panic.
if config.GetGlobalConfig().Security.SkipGrantTable {
return nil
}

if do.etcdClient != nil {
row := do.etcdClient.KV
_, err := row.Put(context.Background(), privilegeKey, "")
Expand Down
3 changes: 2 additions & 1 deletion dumpling/export/block_allow_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (

"github.com/pingcap/tidb-tools/pkg/filter"
tf "github.com/pingcap/tidb-tools/pkg/table-filter"
tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/stretchr/testify/require"

tcontext "github.com/pingcap/tidb/dumpling/context"
)

func TestFilterTables(t *testing.T) {
Expand Down
24 changes: 6 additions & 18 deletions dumpling/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"strings"
Expand All @@ -21,11 +20,12 @@ import (
"github.com/pingcap/errors"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb/br/pkg/storage"
tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"go.uber.org/zap"

"github.com/pingcap/tidb/br/pkg/storage"
tcontext "github.com/pingcap/tidb/dumpling/context"
)

const (
Expand Down Expand Up @@ -531,21 +531,9 @@ func (conf *Config) createExternalStorage(ctx context.Context) (storage.External
if err != nil {
return nil, errors.Trace(err)
}
httpClient := http.DefaultClient
httpClient.Timeout = 30 * time.Second
maxIdleConnsPerHost := http.DefaultMaxIdleConnsPerHost
if conf.Threads > maxIdleConnsPerHost {
maxIdleConnsPerHost = conf.Threads
}
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConnsPerHost = maxIdleConnsPerHost
httpClient.Transport = transport

return storage.New(ctx, b, &storage.ExternalStorageOptions{
HTTPClient: httpClient,
SkipCheckPath: true,
SendCredentials: false,
})

// TODO: support setting httpClient with certification later
return storage.New(ctx, b, &storage.ExternalStorageOptions{})
}

const (
Expand Down
3 changes: 2 additions & 1 deletion dumpling/export/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ package export
import (
"testing"

tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/stretchr/testify/require"

tcontext "github.com/pingcap/tidb/dumpling/context"
)

func TestCreateExternalStorage(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions dumpling/export/consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"database/sql"

"github.com/pingcap/errors"

"github.com/pingcap/tidb/br/pkg/utils"
tcontext "github.com/pingcap/tidb/dumpling/context"
)
Expand Down
6 changes: 3 additions & 3 deletions dumpling/export/consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"errors"
"testing"

tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/stretchr/testify/require"

"github.com/DATA-DOG/go-sqlmock"
"github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/require"

tcontext "github.com/pingcap/tidb/dumpling/context"
)

func TestConsistencyController(t *testing.T) {
Expand Down
Loading

0 comments on commit f8c2b87

Please sign in to comment.