From 6fa8234413e07622aaf2c643916777be293c3e23 Mon Sep 17 00:00:00 2001 From: fengou1 <85682690+fengou1@users.noreply.github.com> Date: Sun, 26 Dec 2021 21:54:34 +0800 Subject: [PATCH] br: batch ddl for binlog (#31028) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * executor: migrate test-infra to testify for executor/show_stats_test.go (#30000) * ddl/table_split_test.go: migrate test-infra to testify (#29960) * *: fix unstable test caused by TestRandomPanicAggConsume (#30035) * ddl/ddl_test.go: refactor testSchemaInfo to return error #29964 (#30008) * executor: remove useless parameter (#30043) * executor: fix error msg of granting non-table level privilege (#29321) * server: fix bug https://asktug.com/t/topic/213082/11 (#29577) * *: replace compareDatum by compare (#30048) * executor, store: replace compareDatum by compare (#30044) * executor: migrate test-infra to testify for executor/write_test.go (#29953) * planner: add sub plan info of shuffleReceiver when query explain analyze (#27992) * expression: fix wrong flen for CastAsString funtion (#29563) * br: migrate test-infra to testify for redact (#29747) * ddl: stable create expression index test (#29990) * *: replace compareDatum by compare (#30060) * planner: fix the issue that binding cannot work when sql_select_limit is enabled (#29789) * br : migrate test-infra to testify for lightning/backend/tidb/tidb_test.go (#30042) * *: fix unstable test in placement gc cases (#30045) * lightning: let ignore columns be compatible with tidb backend (#27850) * statistics: replace compareDatum by compare (#30052) * br/pkg/mock: migrate test-infra to testify (#30034) * ddl: forbit alter table cache in system db (#29998) * topsql: distinguish the row and index operation type (#29044) * ddl, util/codec: fix add index OOM and prevent panic in logging (#29925) * br/pkg/utils: migrate tests to testify (#30032) * expression, sessionctx: support rand_seed1 and rand_seed2 sysvar (#29936) * *: track the memory usage of IndexJoin more accurate (#29068) * br : migrate test-infra to testify for lightning/backend/importer (#30073) * planner: fix panic when the join key is scalarFunction (#30002) * ddl: fix none info of `alter table placement` (#29929) * expression: cast charset according to the function's resulting charset (#29905) * *: replace compareDatum by compare and fix compare (#30090) * store/driver: Use BatchGet method supported in client-go. (#29860) * privilege: disable role privilege when it is revoked from an user (#30028) * partition: Show partition reformat (#29945) * planner: fix the unstable test `TestPartitionWithVariedDatasources` (#30139) * server: Add a `MockConn` to enable testing connections (#30119) * *: skip unstable test (#29433) * skip test Signed-off-by: yisaer * skip unstable test Signed-off-by: yisaer * table/tables: add `StateRemote` interface for the cached table (#29152) * planner: rebuild range when the range is empty (#30003) * planner: support dump file for trace plan statement (#30059) * planner: make clear for MaybeOverOptimized4PlanCache (#29782) * update: fix the updatable table name resolution in build update list (#30061) * expression : prevent function DATE_ADD/SUB_STRING_XXX pushed down to TiFlash (#30154) * *: remove unused profile memory tracker from global tracker (#30143) * expression, util/codec: fix wrongly eliminated conditions caused by `HashCode()` collision (#30120) * cmd, expression: fix convert binary string to gbk (#30004) * sessionctx/variable: make lc_time_names read only (#30084) * expression: fix misuse of datumsToConstants in GBK test (#30072) * plugin: add more tests to cover audit logs (#30165) * planner: consider prefix index column length in skyline pruning (#27527) * server: support download optimize trace file (#30150) * expression: add ut config (#30156) * ddl/ddl_test.go: refactor testTableInfo to return error (#30069) * *: skip a case in TestClusterTables/ForClusterServerInfo on mac M1 (#29933) * planner: do not add extra limit when handle the execute stmt (#30152) * ddl: migrate test-infra to testify for db_cache_test.go (#30117) * tables: fix data race in mockStatRemoteData (#30180) * server: Port missing in processlist (#30183) * server: add build not race flag (#30184) * unistore: get/batchGet/scan support read-through-lock (#29898) * topsql: fix nil pointer panic in stmtctx (#30181) * types: fix wrong str_to_date() microseconds with leading zeros are not converted correctly (#30122) * dumpling: Add support for `Create Placement` (#29724) * *: add cardinality estimation trace for `Selectivity` (#29883) * *: replace compareDatum by compare and ignore warnings is collate is empty (#30105) * planner: implement aggregation eliminate optimize trace (#30114) * server: Combined fix for authentication issues (#29738) * executer: fix data race (#30186) * dumpling : add a function for the variable call of dm (#30033) * planner: revise optimize trace logic (#30163) * statistics: fix unstable test case TestTraceCE (#30235) * lightning: avoid retry write rows and record duplicate rows in tidb backend (#30179) * executor: send a task with error to the resultCh when panic happen (#30214) * docs: update placement policy limits (#30202) * expression: Support GBK for builtin function AesEncrypt. (#29946) * executor: remove useless log (#30248) * docs: Update 2021-09-29-secure-bootstrap.md (#30243) * executor: fix unstable test of topsql (#30257) * copr: add paging API for streaming-like process (#29612) * *: forbid set TiFlash Replica for a table with unsupport charset (#30162) * privilege, session, server: consistently map user login to identity (#30204) * expression: fix flen for timestamp_add (#30213) * types: check values equals to NaN or Inf when convert to float (#30148) * parser: respect TiDB comment when DROP INDEX IF EXISTS (#30173) * ddl: disallow change columns from zero int to time (#25728) * * : statement summary should know the slow write is blocked on read lock lease (#30166) * planner: regard NULL as point when accessing composite index (#30244) * planner: Add trace for proj elimination rule (#30275) * privilege,session: Match loopback connections to 'localhost' accounts (#29995) (#30189) * ddltest: refactor logutil.InitLogger in ddltest to avoid data race (#30299) * load data: fix bug if load data with long content (#29222) * topsql: reduce data race of sql digest (#30296) * ddl: Do not consider the clustered index when checking the length of the secondary index (#29660) * *: update client-go to use resolveForRead (#30314) * ddl/ddl_algorithm_test.go: migrate test-infra to testify (#30268) * store: Add metrics for pd api call time (#30062) * *: replace compareDatum by compare and fix wrong optimize order by (#30273) * metrics/grafana: fix display for 'Start TSO Wait Duration' panel (#30311) * ddl: migrate test-infra to testify for ddl/options_test.go (#30269) * planner: fix inconsistent schema between UnionAll and child operator (#30231) * test: fix incorrect regexp pattern during migrating test (#30080) * br: add more precise check for lock file (#30218) * executor: replace should not change other rows when auto ID is out of range (#30301) * tidb-server: return 1 while failing to set cpu affinity (#30197) * executor: migrate test-infra to testify for executor/index_lookup_join_test.go (#30260) * expression: fix wrong result of greatest/least(mixed unsigned/signed int) (#30121) * types: casting JSON literal `null` to number types should fail (#30278) * executor: fix data race in oomtest (#30339) * parser: fix missing charset and collation of handle column (#30320) * expression: don't append null when param for char() is null (#30236) * *: add warn log for stale read (#30340) * parser: support multi bracket for subquery (#29656) * session, sessionctx/variable: fix validation recursion bug (#30293) * planner: Add trace for agg pushdown rule (#30262) * planner/core: fix a data race when building plan for cached table (#30355) * executor: avoid sum from avg overflow (#30010) * config: make EnableSlowLog atomic (#30346) * *: fix goroutine leak in ddl intergration test (#30369) * executor: make projection executor unparallel for insert/update/delete (#30290) * executor, util: reset offsets and nullBitMap for MutRow when setting new values (#30265) * expression: fix tidb can't alter table from other-type with null value to timestamp with NOT NULL attribute (#29664) * dumpling: fix default collation with upstream when dump database and table (#30292) * ddl: fix the enum default value by triming trailing space (#30356) * expression: migrate test-infra to testify for flag_simplify_test.go (#30407) * server: refine code logic in handleDownloadFile (#30422) * refine logic Signed-off-by: yisaer * fix Signed-off-by: yisaer * ddl: migrate test-infra to testify for ddl/table_test.go (#30267) * ddl: handle the error from `addBatchDDLJobs()` correctly (#30401) * br: fix the integration tests (#30423) * util, cmd: remove unused filesort (#30438) * *: update client-go for small backoff time (#30436) * server: Fix unstable tests with FakeAuthSwitch (#30287) * dumpling: fix dump failed when sequence exists (#30164) * *: replace compareDatum by compare (#30421) * lightning: fix gcs max key limit (#30393) * expression, parser: add built-in func is_uuid (#30318) * expression: migrate test-infra to testify for constant_fold_test.go (#30424) * executor: fix pipelined window invalid memory address (#30418) * makefile: add gotestsum for verify ci (#29848) * server: close sql rows to fix unstable test (#30306) * Makefile: add coverage record for BR and Dumpling (#30457) * executor: track the mem usage of IndexMergeReader (#30210) * infosync: close body when ReadAll encounters error (#30462) * planner: show accessed partition when explain mpp query over partition table (#30367) * *: Fix use of user identity in SHOW GRANTS + error messages (#30294) * ddl: add not null flag for auto_increment column (#30477) * expression: make some unstable test serial (#30323) * expression: migrate test-infra to testify for constant_propagation_test.go (#30430) * executor: stable test TestSetDDLReorgBatchSize and TestSetDDLReorgWorkerCnt (#30480) * statistics, util/ranger: add cardinality estimation trace for `GetRowCountBy...` (#30321) * *: skip mysql client goroutine leak detection in integration ddl (#30467) * executor,util: write slow query to slow log no matter what log level (#30461) * executor: enable index_merge used in transaction. (#29875) * logutil: add testcase for SlowQueryLogger.MaxDays/MaxSize/MaxBackups (#30316) * expression: fix data race in builtin_other_vec_generated_test.go (#30503) * expression: fix data race in the collationInfo (#30490) * planner/core, session: fix error message of wrong variable scope (#30510) * lightning: support Re/ReregisterMySQL by different tls name (#30463) * executor: TestBatchGetandPointGetwithHashPartition test typo (#29669) (#29671) * mockstore: improve log to avoid panic for nil pointer (#30513) * *: replace compareDatum by compare, PR 10 (#30456) * planner: Disable dynamic partition prune mode for all non-autocommit (#27532) (#30505) * expression: change the log level of an confusing log from warn to debug (#30484) * br: Check crypter.key valid before backup (#29991) * *: replace compareDatum by compare, PR 11 (#30465) * dumpling: fix default column collation with upstream when dump table (#30531) * server: fix prepared cursor select (#30285) * executor: HashJoinExec checks the buildError even if the probeSide is empty (#30471) * parser, expression: follow mysql, increase interval precedence (#30528) * makefile: set timeout 25m for make race (#30555) * planner: fix the unstable test TestAnalyzeGlobalStatsWithOpts/2 (#30576) * expression,types: Adjusts UNIX_TIMESTAMP() for non-existing DST values (#28739) (#30405) * br: add res.Body.close to avoid leak (#30545) * lightning: add back integration test lightning_error_summary (#30547) * sessionctx/variable: small refactor (split large file) (#30511) * ddl: let `admin cancel ddl jobs` run in a new transaction (#30549) * *: Retry when placement PutBundles failed (#30590) * dumpling: delete unit test in github actions (#30562) * *: support trace plan target='estimation' statement (#30491) * expression: migrate test-infra to testify for integration_test.go (#30548) * planner: support trace for min/max eliminate (#30441) * support min/max trace Signed-off-by: yisaer * address the comment Signed-off-by: yisaer Co-authored-by: Ti Chi Robot * br: remove cdclog in br (#30573) * *: show cmd to check if all needed histograms are loaded (#29672) * expression: clone repertoire when clone the scalar function (#30602) * *: use the real StateRemote interface implementation for cached table (#30066) * *: query failed after add index / timestamp out-of-range (#28424) (#29323) * planner: implement collecting predicate columns from logical plan (#29878) * *: show PK name when decoding the clustered index row key (#30623) * ddl/callback_test.go: migrate test-infra to testify (#30317) * *: Rename some names of placement ddl operation (#30622) * executor: fix data race in the index_lookup_hash_join (#30619) * ddl: remove unnecessary locking when adding an index (#29772) * server: try to make `TidbTestSuite` more stable (#30643) * *: Add some PD tests for placement and fix some bug found (#30621) * *: migrate sync.WaitGroup to util.WaitGroupWrapper (#30644) * planner: add trace for join eliminate rule (#30343) * executor: migrate test-infra to testify for executor/shuffle_test.go (#30514) * planner: make (*AccessPath).OnlyPointRange more succinct (#30520) * planner: add trace for join reorder (#30394) * executor: migrate test-infra to testify for executor/union_scan_test.go (#30525) * expression: make cast return error if cast binary literal to another character set (#30537) * *: update tikv client (#30670) * *: update sysutil in go.mod to fix panic when search log (#30523) * topsql: shouldn't evict the SQL meta, since the evicted SQL can be appear on Other components (TiKV) TopN records (#27050) * testify: migrate test-infra to testify for analyze_test.go (#30640) * util: replace compareDatum by compare, point part (#30575) * test: make all the tests run in serial (#30692) * statistics: add mutex for Handle.globalMap and Handle.feedback (#30550) * executor: fix regular expression in json so that it could match identifer start with '$' (#29750) * util/testkit/testkit.go: fix typo (#30638) * planner: Introduce a new global variable to control the historical statistics feature (#30646) * topsql: introduce datasink interface (#30662) * planner: unify the argument of stats functions to use SessionCtx instead of StatementContext (#30668) * metrics: fix the Max SafeTS Gap metrics (#30689) * lightning: Add source dir existence check for s3 (#30674) * golangci-lint: support durationcheck (#30027) * executor: fix data race on IndexHashJoin.cancelFunc (#30701) * sessionctx/variable: change tidb_store_limit to global only (#30522) * statistics: remove reassignment of Handle.pool in NewHandle (#30675) * br: fix some unstable unit test cases. (#30716) * bindinfo: fix the comment typo (#30616) * server: support decoding prepared string args to character_set_client (#30723) * expression: fix enum type join binary get wrong result (#30445) * cmd/explaintest: fix wrong result comparison for explain test (#30717) * parallel create tables in br * metrics: fix copr-cache metrics (#30712) * test: merge executor's serial tests to other tests (#30711) * ddl: add batch create table api Signed-off-by: xhe * ddl: add unit tests Signed-off-by: xhe * ddl: fix fmt Signed-off-by: xhe * ddl: typo Co-authored-by: Arenatlx * ddl: fix tests Signed-off-by: xhe * ddl: rename to BatchCreateTableWithInfo Signed-off-by: xhe * ddl: trace the error Signed-off-by: xhe * ddl: comments Signed-off-by: xhe * ddl: cancle the job right Signed-off-by: xhe * ddl: cancel the job right 2 Signed-off-by: xhe * ddl: report error if entry too large Signed-off-by: xhe * ddl: report error when table is duplicated Signed-off-by: xhe * ddl: go fmt Signed-off-by: xhe * infoschema: improve batch memory perf Signed-off-by: xhe * ddl: retain ID Signed-off-by: xhe * ddl: reduce log frequency Signed-off-by: xhe * ddl: fix tests Signed-off-by: xhe * fix merge issue * fix issue that loss table restore * refactoring code * refactoring code Co-authored-by: Weizhen Wang Co-authored-by: wangggong <793160615@qq.com> Co-authored-by: 王超 Co-authored-by: José Clovis Ramírez de la Rosa Co-authored-by: unconsolable Co-authored-by: zhangguangchao <1614439+zgcbj@users.noreply.github.com> Co-authored-by: wjHuang Co-authored-by: mmyj Co-authored-by: Shenghui Wu <793703860@qq.com> Co-authored-by: tison Co-authored-by: Yuanjia Zhang Co-authored-by: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Co-authored-by: glorv Co-authored-by: sylzd Co-authored-by: Yexiang Zhang Co-authored-by: tangenta Co-authored-by: Yujie Xia Co-authored-by: Hangjie Mo Co-authored-by: HuaiyuXu <391585975@qq.com> Co-authored-by: Song Gao Co-authored-by: xiongjiwei Co-authored-by: WangDeng Co-authored-by: Mattias Jonsson Co-authored-by: djshow832 Co-authored-by: tiancaiamao Co-authored-by: Chengpeng Yan <41809508+Reminiscent@users.noreply.github.com> Co-authored-by: Arenatlx <314806019@qq.com> Co-authored-by: Meng Xin Co-authored-by: Zhou Kunqin <25057648+time-and-fate@users.noreply.github.com> Co-authored-by: Youra Cho Co-authored-by: Yifan Xu <30385241+xuyifangreeneyes@users.noreply.github.com> Co-authored-by: Xiang Zhang Co-authored-by: Lei Zhao Co-authored-by: Daniël van Eeden Co-authored-by: docsir <73268456+docsir@users.noreply.github.com> Co-authored-by: Morgan Tocker Co-authored-by: jakevin <30525741+jackwener@users.noreply.github.com> Co-authored-by: 姬小野 <38575466+JameyWoo@users.noreply.github.com> Co-authored-by: you06 Co-authored-by: lance6716 Co-authored-by: TonsnakeLin <87681388+TonsnakeLin@users.noreply.github.com> Co-authored-by: imentu Co-authored-by: Alkaid <38248129+jyz0309@users.noreply.github.com> Co-authored-by: lvtu <37565148+tongtongyin@users.noreply.github.com> Co-authored-by: 3pointer Co-authored-by: Zhuhe Fang Co-authored-by: WizardXiao <89761062+WizardXiao@users.noreply.github.com> Co-authored-by: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Co-authored-by: guo-shaoge Co-authored-by: Ryan Leung Co-authored-by: xufei Co-authored-by: Ehco Co-authored-by: Zak Zhao <57036248+joccau@users.noreply.github.com> Co-authored-by: xhe Co-authored-by: Chunzhu Li Co-authored-by: Ti Chi Robot Co-authored-by: Xiaoju Wu Co-authored-by: JmPotato Co-authored-by: Zach <51114270+zach030@users.noreply.github.com> Co-authored-by: bb7133 Co-authored-by: crazycs Co-authored-by: znhh6018 <44599853+znhh6018@users.noreply.github.com> Co-authored-by: eddie lin Co-authored-by: dongjunduo Co-authored-by: Zhenchi Co-authored-by: zhangjinpeng1987 Co-authored-by: Jack Yu Co-authored-by: Arenatlx --- br/pkg/glue/glue.go | 2 +- br/pkg/gluetidb/glue.go | 7 +- br/pkg/restore/client.go | 226 +++++++++++++++++++-------- br/pkg/restore/db.go | 89 +++++++++-- br/pkg/task/restore.go | 4 +- ddl/table_test.go | 152 ++++++++++++++++++ executor/executor_test.go | 129 +-------------- executor/explainfor_test.go | 18 +-- go.mod | 4 +- go.sum | 6 +- planner/core/common_plans.go | 1 - planner/core/logical_plan_builder.go | 2 + planner/optimize.go | 20 ++- server/http_handler_test.go | 1 + session/bootstrap.go | 30 +--- session/session_test.go | 31 ---- table/tables/cache.go | 54 +++++-- table/tables/state_remote.go | 102 +++++------- table/tables/state_remote_test.go | 47 ++++-- testkit/testkit.go | 14 -- 20 files changed, 543 insertions(+), 396 deletions(-) diff --git a/br/pkg/glue/glue.go b/br/pkg/glue/glue.go index 2b27b40704767..bb4f563b44ae7 100644 --- a/br/pkg/glue/glue.go +++ b/br/pkg/glue/glue.go @@ -13,7 +13,7 @@ import ( // interface is to bulk create table parallelly type BulkCreateTableSession interface { - CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, batchDdlSize uint) error + CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error } // Glue is an abstraction of TiDB function calls used in BR. diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index b2dd9c1fa711b..d8c5cba59badd 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -131,9 +131,9 @@ func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) } // CreateTable implements glue.Session. -func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, batchDdlSize uint) error { +func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error { d := domain.GetDomain(gs.se).DDL() - log.Info("tidb start create tables", zap.Uint("batchDdlSize", batchDdlSize)) + log.Info("tidb start create tables") var dbName model.CIStr cloneTables := make([]*model.TableInfo, 0, len(tables)) @@ -159,7 +159,8 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo cloneTables = append(cloneTables, table) } gs.se.SetValue(sessionctx.QueryString, queryBuilder.String()) - err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, ddl.OnExistIgnore, true) + err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, ddl.OnExistIgnore) + if err != nil { log.Info("Bulk create table from tidb failure, it possible caused by version mismatch with BR.", zap.String("Error", err.Error())) return err diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 8ec2006ca59b2..9d3dd6ca67c55 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -12,6 +12,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/opentracing/opentracing-go" @@ -81,6 +82,7 @@ type Client struct { restoreStores []uint64 + cipher *backuppb.CipherInfo storage storage.ExternalStorage backend *backuppb.StorageBackend switchModeInterval time.Duration @@ -135,6 +137,10 @@ func (rc *Client) SetRateLimit(rateLimit uint64) { rc.rateLimit = rateLimit } +func (rc *Client) SetCrypter(crypter *backuppb.CipherInfo) { + rc.cipher = crypter +} + // SetStorage set ExternalStorage for client. func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error { var err error @@ -415,7 +421,7 @@ func (rc *Client) createTables( if rc.IsSkipCreateSQL() { log.Info("skip create table and alter autoIncID") } else { - err := db.CreateTables(ctx, tables, rc.GetBatchDdlSize()) + err := db.CreateTables(ctx, tables) if err != nil { return nil, errors.Trace(err) } @@ -451,11 +457,12 @@ func (rc *Client) createTable( dom *domain.Domain, table *metautil.Table, newTS uint64, + ddlTables map[UniqueTableName]bool, ) (CreatedTable, error) { if rc.IsSkipCreateSQL() { log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name)) } else { - err := db.CreateTable(ctx, table) + err := db.CreateTable(ctx, table, ddlTables) if err != nil { return CreatedTable{}, errors.Trace(err) } @@ -494,6 +501,7 @@ func (rc *Client) GoCreateTables( // Could we have a smaller size of tables? log.Info("start create tables") + ddlTables := rc.DDLJobsMap() if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("Client.GoCreateTables", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -501,55 +509,63 @@ func (rc *Client) GoCreateTables( } outCh := make(chan CreatedTable, len(tables)) rater := logutil.TraceRateOver(logutil.MetricTableCreatedCounter) - err := rc.createTablesInWorkerPool(ctx, dom, tables, dbPool, newTS, outCh) - //cts, err := rc.createTables(ctx, rc.db, dom, tables, newTS) - if err == nil { - defer close(outCh) - // fall back to old create table (sequential create table) - } else if strings.Contains(err.Error(), "[ddl:8204]invalid ddl job") { - log.Info("fall back to the old DDL way to create table.") - createOneTable := func(c context.Context, db *DB, t *metautil.Table) error { - select { - case <-c.Done(): - return c.Err() - default: - } - rt, err := rc.createTable(c, db, dom, t, newTS) - if err != nil { - log.Error("create table failed", - zap.Error(err), - zap.Stringer("db", t.DB.Name), - zap.Stringer("table", t.Info.Name)) - return errors.Trace(err) - } - log.Debug("table created and send to next", - zap.Int("output chan size", len(outCh)), - zap.Stringer("table", t.Info.Name), - zap.Stringer("database", t.DB.Name)) - outCh <- rt - rater.Inc() - rater.L().Info("table created", - zap.Stringer("table", t.Info.Name), - zap.Stringer("database", t.DB.Name)) - return nil - } - go func() { + var err error = nil + if rc.batchDllSize > 0 { + err = rc.createTablesInWorkerPool(ctx, dom, tables, dbPool, newTS, outCh) + + if err == nil { + log.Info("bulk to create tables success.") defer close(outCh) - defer log.Debug("all tables are created") - var err error - if len(dbPool) > 0 { - err = rc.createTablesWithDBPool(ctx, createOneTable, tables, dbPool) - } else { - err = rc.createTablesWithSoleDB(ctx, createOneTable, tables) - } - if err != nil { - errCh <- err - } - }() - } else { - errCh <- err + // fall back to old create table (sequential create table) + } else if strings.Contains(err.Error(), "[ddl:8204]invalid ddl job") { + log.Info("fall back to the old DDL way to create table.") + } else { + log.Error("bulk to create tables failure.") + errCh <- err + return outCh + } + } + + createOneTable := func(c context.Context, db *DB, t *metautil.Table) error { + select { + case <-c.Done(): + return c.Err() + default: + + } + rt, err := rc.createTable(c, db, dom, t, newTS, ddlTables) + if err != nil { + log.Error("create table failed", + zap.Error(err), + zap.Stringer("db", t.DB.Name), + zap.Stringer("table", t.Info.Name)) + return errors.Trace(err) + } + log.Debug("table created and send to next", + zap.Int("output chan size", len(outCh)), + zap.Stringer("table", t.Info.Name), + zap.Stringer("database", t.DB.Name)) + outCh <- rt + rater.Inc() + rater.L().Info("table created", + zap.Stringer("table", t.Info.Name), + zap.Stringer("database", t.DB.Name)) + return nil } + go func() { + defer close(outCh) + defer log.Debug("all tables are created") + var err error + if len(dbPool) > 0 { + err = rc.createTablesWithDBPool(ctx, createOneTable, tables, dbPool) + } else { + err = rc.createTablesWithSoleDB(ctx, createOneTable, tables) + } + if err != nil { + errCh <- err + } + }() return outCh } @@ -585,7 +601,9 @@ func (rc *Client) createTablesInWorkerPool(ctx context.Context, dom *domain.Doma workers := utils.NewWorkerPool(uint(len(dbPool)), "Create Tables Worker") numOfTables := len(tables) lastSent := 0 - for i := int(rc.batchDllSize); i <= numOfTables; i = i + int(rc.batchDllSize) { + + for i := int(rc.batchDllSize); i < numOfTables+int(rc.batchDllSize); i = i + int(rc.batchDllSize) { + log.Info("create tables", zap.Int("table start", lastSent), zap.Int("table end", i)) if i > numOfTables { i = numOfTables @@ -727,7 +745,7 @@ func (rc *Client) RestoreFiles( zap.Duration("take", time.Since(fileStart))) updateCh.Inc() }() - return rc.fileImporter.Import(ectx, filesReplica, rewriteRules) + return rc.fileImporter.Import(ectx, filesReplica, rewriteRules, rc.cipher) }) } @@ -768,7 +786,7 @@ func (rc *Client) RestoreRaw( rc.workerPool.ApplyOnErrorGroup(eg, func() error { defer updateCh.Inc() - return rc.fileImporter.Import(ectx, []*backuppb.File{fileReplica}, EmptyRewriteRule()) + return rc.fileImporter.Import(ectx, []*backuppb.File{fileReplica}, EmptyRewriteRule(), rc.cipher) }) } if err := eg.Wait(); err != nil { @@ -844,6 +862,8 @@ func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMo gctx, store.GetAddress(), opt, + grpc.WithBlock(), + grpc.FailOnNonTempDialError(true), grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}), // we don't need to set keepalive timeout here, because the connection lives // at most 5s. (shorter than minimal value for keepalive time!) @@ -880,17 +900,25 @@ func (rc *Client) GoValidateChecksum( ) <-chan struct{} { log.Info("Start to validate checksum") outCh := make(chan struct{}, 1) + wg := new(sync.WaitGroup) + wg.Add(2) + loadStatCh := make(chan *CreatedTable, 1024) + // run the stat loader + go func() { + defer wg.Done() + rc.updateMetaAndLoadStats(ctx, loadStatCh) + }() workers := utils.NewWorkerPool(defaultChecksumConcurrency, "RestoreChecksum") go func() { - wg, ectx := errgroup.WithContext(ctx) + eg, ectx := errgroup.WithContext(ctx) defer func() { - log.Info("all checksum ended") - if err := wg.Wait(); err != nil { + if err := eg.Wait(); err != nil { errCh <- err } - outCh <- struct{}{} - close(outCh) + close(loadStatCh) + wg.Done() }() + for { select { // if we use ectx here, maybe canceled will mask real error. @@ -900,14 +928,14 @@ func (rc *Client) GoValidateChecksum( if !ok { return } - workers.ApplyOnErrorGroup(wg, func() error { + + workers.ApplyOnErrorGroup(eg, func() error { start := time.Now() defer func() { elapsed := time.Since(start) - summary.CollectDuration("restore checksum", elapsed) summary.CollectSuccessUnit("table checksum", 1, elapsed) }() - err := rc.execChecksum(ectx, tbl, kvClient, concurrency) + err := rc.execChecksum(ectx, tbl, kvClient, concurrency, loadStatCh) if err != nil { return errors.Trace(err) } @@ -917,10 +945,21 @@ func (rc *Client) GoValidateChecksum( } } }() + go func() { + wg.Wait() + log.Info("all checksum ended") + close(outCh) + }() return outCh } -func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient kv.Client, concurrency uint) error { +func (rc *Client) execChecksum( + ctx context.Context, + tbl CreatedTable, + kvClient kv.Client, + concurrency uint, + loadStatCh chan<- *CreatedTable, +) error { logger := log.With( zap.String("db", tbl.OldTable.DB.Name.O), zap.String("table", tbl.OldTable.Info.Name.O), @@ -969,16 +1008,49 @@ func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient k ) return errors.Annotate(berrors.ErrRestoreChecksumMismatch, "failed to validate checksum") } - if table.Stats != nil { - logger.Info("start loads analyze after validate checksum", - zap.Int64("old id", tbl.OldTable.Info.ID), - zap.Int64("new id", tbl.Table.ID), - ) - if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil { - logger.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err)) + + loadStatCh <- &tbl + return nil +} + +func (rc *Client) updateMetaAndLoadStats(ctx context.Context, input <-chan *CreatedTable) { + for { + select { + case <-ctx.Done(): + return + case tbl, ok := <-input: + if !ok { + return + } + + // Not need to return err when failed because of update analysis-meta + restoreTS, err := rc.GetTS(ctx) + if err != nil { + log.Error("getTS failed", zap.Error(err)) + } else { + err = rc.db.UpdateStatsMeta(ctx, tbl.Table.ID, restoreTS, tbl.OldTable.TotalKvs) + if err != nil { + log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(err)) + } + } + + table := tbl.OldTable + if table.Stats != nil { + log.Info("start loads analyze after validate checksum", + zap.Int64("old id", tbl.OldTable.Info.ID), + zap.Int64("new id", tbl.Table.ID), + ) + start := time.Now() + if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil { + log.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err)) + } + log.Info("restore stat done", + zap.String("table", table.Info.Name.L), + zap.String("db", table.DB.Name.L), + zap.Duration("cost", time.Since(start))) + } } } - return nil } const ( @@ -1155,6 +1227,24 @@ func (rc *Client) IsSkipCreateSQL() bool { return rc.noSchema } +// DDLJobsMap returns a map[UniqueTableName]bool about < db table, hasCreate/hasTruncate DDL >. +// if we execute some DDLs before create table. +// we may get two situation that need to rebase auto increment/random id. +// 1. truncate table: truncate will generate new id cache. +// 2. create table/create and rename table: the first create table will lock down the id cache. +// because we cannot create onExistReplace table. +// so the final create DDL with the correct auto increment/random id won't be executed. +func (rc *Client) DDLJobsMap() map[UniqueTableName]bool { + m := make(map[UniqueTableName]bool) + for _, job := range rc.ddlJobs { + switch job.Type { + case model.ActionTruncateTable, model.ActionCreateTable, model.ActionRenameTable: + m[UniqueTableName{job.SchemaName, job.BinlogInfo.TableInfo.Name.String()}] = true + } + } + return m +} + // PreCheckTableTiFlashReplica checks whether TiFlash replica is less than TiFlash node. func (rc *Client) PreCheckTableTiFlashReplica( ctx context.Context, diff --git a/br/pkg/restore/db.go b/br/pkg/restore/db.go index bae7cace49d53..9935d6608cf51 100644 --- a/br/pkg/restore/db.go +++ b/br/pkg/restore/db.go @@ -14,6 +14,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "go.uber.org/zap" ) @@ -22,6 +23,11 @@ type DB struct { se glue.Session } +type UniqueTableName struct { + DB string + Table string +} + // NewDB returns a new DB. func NewDB(g glue.Glue, store kv.Storage) (*DB, error) { se, err := g.CreateSession(store) @@ -87,6 +93,28 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { return errors.Trace(err) } +// UpdateStatsMeta update count and snapshot ts in mysql.stats_meta +func (db *DB) UpdateStatsMeta(ctx context.Context, tableID int64, restoreTS uint64, count uint64) error { + sysDB := mysql.SystemDB + statsMetaTbl := "stats_meta" + + // set restoreTS to snapshot and version which is used to update stats_meta + err := db.se.ExecuteInternal( + ctx, + "update %n.%n set snapshot = %?, version = %?, count = %? where table_id = %?", + sysDB, + statsMetaTbl, + restoreTS, + restoreTS, + count, + tableID, + ) + if err != nil { + log.Error("execute update sql failed", zap.Error(err)) + } + return nil +} + // CreateDatabase executes a CREATE DATABASE SQL. func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { err := db.se.CreateDatabase(ctx, schema) @@ -97,14 +125,16 @@ func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { } // CreateTable executes a CREATE TABLE SQL. -func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table, batchDdlSize uint) error { +func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table) error { if bse, ok := db.se.(glue.BulkCreateTableSession); ok { - log.Info("session supports bulk create table.", zap.Uint("batchDdlSize", batchDdlSize), zap.Int("table size", len(tables))) + + log.Info("session supports bulk create table.", zap.Int("table size", len(tables))) + m := map[string][]*model.TableInfo{} for _, table := range tables { m[table.DB.Name.L] = append(m[table.DB.Name.L], table.Info) } - if err := bse.CreateTables(ctx, m, batchDdlSize); err != nil { + if err := bse.CreateTables(ctx, m); err != nil { return err } } @@ -169,7 +199,7 @@ func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table, batchD } // CreateTable executes a CREATE TABLE SQL. -func (db *DB) CreateTable(ctx context.Context, table *metautil.Table) error { +func (db *DB) CreateTable(ctx context.Context, table *metautil.Table, ddlTables map[UniqueTableName]bool) error { err := db.se.CreateTable(ctx, table.DB.Name, table.Info) if err != nil { log.Error("create table failed", @@ -179,7 +209,11 @@ func (db *DB) CreateTable(ctx context.Context, table *metautil.Table) error { return errors.Trace(err) } - if table.Info.IsSequence() { + var restoreMetaSQL string + switch { + case table.Info.IsView(): + return nil + case table.Info.IsSequence(): setValFormat := fmt.Sprintf("do setval(%s.%s, %%d);", utils.EncloseName(table.DB.Name.O), utils.EncloseName(table.Info.Name.O)) @@ -220,8 +254,38 @@ func (db *DB) CreateTable(ctx context.Context, table *metautil.Table) error { return errors.Trace(err) } } - restoreMetaSQL := fmt.Sprintf(setValFormat, table.Info.AutoIncID) - if err = db.se.Execute(ctx, restoreMetaSQL); err != nil { + restoreMetaSQL = fmt.Sprintf(setValFormat, table.Info.AutoIncID) + err = db.se.Execute(ctx, restoreMetaSQL) + if err != nil { + log.Error("restore meta sql failed", + zap.String("query", restoreMetaSQL), + zap.Stringer("db", table.DB.Name), + zap.Stringer("table", table.Info.Name), + zap.Error(err)) + return errors.Trace(err) + } + // only table exists in ddlJobs during incremental restoration should do alter after creation. + case ddlTables[UniqueTableName{table.DB.Name.String(), table.Info.Name.String()}]: + if utils.NeedAutoID(table.Info) { + restoreMetaSQL = fmt.Sprintf( + "alter table %s.%s auto_increment = %d;", + utils.EncloseName(table.DB.Name.O), + utils.EncloseName(table.Info.Name.O), + table.Info.AutoIncID) + } else if table.Info.PKIsHandle && table.Info.ContainsAutoRandomBits() { + restoreMetaSQL = fmt.Sprintf( + "alter table %s.%s auto_random_base = %d", + utils.EncloseName(table.DB.Name.O), + utils.EncloseName(table.Info.Name.O), + table.Info.AutoRandID) + } else { + log.Info("table exists in incremental ddl jobs, but don't need to be altered", + zap.Stringer("db", table.DB.Name), + zap.Stringer("table", table.Info.Name)) + return nil + } + err = db.se.Execute(ctx, restoreMetaSQL) + if err != nil { log.Error("restore meta sql failed", zap.String("query", restoreMetaSQL), zap.Stringer("db", table.DB.Name), @@ -268,20 +332,15 @@ func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs [ } } - type namePair struct { - db string - table string - } - for _, table := range tables { tableIDs := make(map[int64]bool) tableIDs[table.Info.ID] = true - tableNames := make(map[namePair]bool) - name := namePair{table.DB.Name.String(), table.Info.Name.String()} + tableNames := make(map[UniqueTableName]bool) + name := UniqueTableName{table.DB.Name.String(), table.Info.Name.String()} tableNames[name] = true for _, job := range allDDLJobs { if job.BinlogInfo.TableInfo != nil { - name := namePair{job.SchemaName, job.BinlogInfo.TableInfo.Name.String()} + name = UniqueTableName{job.SchemaName, job.BinlogInfo.TableInfo.Name.String()} if tableIDs[job.TableID] || tableNames[name] { ddlJobs = append(ddlJobs, job) tableIDs[job.TableID] = true diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 0f3347fd1d337..a8879134e72c0 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -47,7 +47,7 @@ const ( defaultPDConcurrency = 1 defaultBatchFlushInterval = 16 * time.Second defaultDDLConcurrency = 16 - defaultFlagDdlBatchSize = 500 + defaultFlagDdlBatchSize = 0 ) // RestoreCommonConfig is the common configuration for all BR restore tasks. @@ -83,7 +83,7 @@ func DefineRestoreCommonFlags(flags *pflag.FlagSet) { "the threshold of merging small regions (Default 960_000, region split key count)") flags.Uint(FlagPDConcurrency, defaultPDConcurrency, "concurrency pd-relative operations like split & scatter.") - flags.Duration(FlagBatchFlushInterval, defaultFlagDdlBatchSize, + flags.Duration(FlagBatchFlushInterval, defaultBatchFlushInterval, "batch size for ddl to create a batch of tabes") flags.Uint(FlagDdlBatchSize, defaultFlagDdlBatchSize, "concurrency pd-relative operations like split & scatter.") diff --git a/ddl/table_test.go b/ddl/table_test.go index 98b5824c03d8e..44ef965d70673 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -202,12 +202,164 @@ func TestTable(t *testing.T) { ) require.NoError(t, err) + + job = testDropTableT(t, ctx, ddl, dbInfo, tblInfo) + testCheckJobDoneT(t, ddl, job, false) + + // for truncate table + tblInfo, err = testTableInfo(ddl, "tt", 3) + require.NoError(t, err) + job = testCreateTableT(t, ctx, ddl, dbInfo, tblInfo) + testCheckTableStateT(t, ddl, dbInfo, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + job = testTruncateTable(t, ctx, ddl, dbInfo, tblInfo) + testCheckTableStateT(t, ddl, dbInfo, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + + // for rename table + dbInfo1, err := testSchemaInfo(ddl, "test_rename_table") + require.NoError(t, err) + testCreateSchemaT(t, testNewContext(ddl), ddl, dbInfo1) + job = testRenameTable(t, ctx, ddl, dbInfo1.ID, dbInfo.ID, dbInfo.Name, tblInfo) + testCheckTableStateT(t, ddl, dbInfo1, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + + job = testLockTable(t, ctx, ddl, dbInfo1.ID, tblInfo, model.TableLockWrite) + testCheckTableStateT(t, ddl, dbInfo1, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + checkTableLockedTest(t, ddl, dbInfo1, tblInfo, ddl.GetID(), ctx.GetSessionVars().ConnectionID, model.TableLockWrite) + // for alter cache table + job = testAlterCacheTable(t, ctx, ddl, dbInfo1.ID, tblInfo) + testCheckTableStateT(t, ddl, dbInfo1, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + checkTableCacheTest(t, ddl, dbInfo1, tblInfo) + // for alter no cache table + job = testAlterNoCacheTable(t, ctx, ddl, dbInfo1.ID, tblInfo) + testCheckTableStateT(t, ddl, dbInfo1, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + checkTableNoCacheTest(t, ddl, dbInfo1, tblInfo) + + testDropSchemaT(t, testNewContext(ddl), ddl, dbInfo) + err = ddl.Stop() + require.NoError(t, err) + err = store.Close() + require.NoError(t, err) +} + +func checkTableCacheTest(t *testing.T, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) { + err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { + tt := meta.NewMeta(txn) + info, err := tt.GetTable(dbInfo.ID, tblInfo.ID) + require.NoError(t, err) + require.NotNil(t, info) + require.NotNil(t, info.TableCacheStatusType) + require.Equal(t, model.TableCacheStatusEnable, info.TableCacheStatusType) + return nil + }) + require.NoError(t, err) +} + +func checkTableNoCacheTest(t *testing.T, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) { + err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { + tt := meta.NewMeta(txn) + info, err := tt.GetTable(dbInfo.ID, tblInfo.ID) + require.NoError(t, err) + require.NotNil(t, info) + require.Equal(t, model.TableCacheStatusDisable, info.TableCacheStatusType) + return nil + }) + require.NoError(t, err) +} + +func testAlterCacheTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblInfo *model.TableInfo) *model.Job { + job := &model.Job{ + SchemaID: newSchemaID, + TableID: tblInfo.ID, + Type: model.ActionAlterCacheTable, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{}, + } + err := d.doDDLJob(ctx, job) + require.NoError(t, err) + + v := getSchemaVerT(t, ctx) + checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v}) + return job +} + +func testAlterNoCacheTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblInfo *model.TableInfo) *model.Job { + + job := &model.Job{ + SchemaID: newSchemaID, + TableID: tblInfo.ID, + Type: model.ActionAlterNoCacheTable, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{}, + } + err := d.doDDLJob(ctx, job) + require.NoError(t, err) + + v := getSchemaVerT(t, ctx) + checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v}) + return job +} + +func TestCreateTables(t *testing.T) { + store, err := mockstore.NewMockStore() + require.NoError(t, err) + ddl, err := testNewDDLAndStart( + context.Background(), + WithStore(store), + WithLease(testLease), + ) + require.NoError(t, err) + dbInfo, err := testSchemaInfo(ddl, "test_table") require.NoError(t, err) testCreateSchemaT(t, testNewContext(ddl), ddl, dbInfo) ctx := testNewContext(ddl) + infos := []*model.TableInfo{} + genIDs, err := ddl.genGlobalIDs(3) + require.NoError(t, err) + + infos = append(infos, &model.TableInfo{ + ID: genIDs[0], + Name: model.NewCIStr("s1"), + }) + infos = append(infos, &model.TableInfo{ + ID: genIDs[1], + Name: model.NewCIStr("s2"), + }) + infos = append(infos, &model.TableInfo{ + ID: genIDs[2], + Name: model.NewCIStr("s3"), + }) + + job := &model.Job{ + SchemaID: dbInfo.ID, + Type: model.ActionCreateTables, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{infos}, + } + + err = d.doDDLJob(ctx, job) + c.Assert(err, IsNil) + + t1 := testGetTable(c, d, s.dbInfo.ID, genIDs[0]) + c.Assert(t1, NotNil) + t2 := testGetTable(c, d, s.dbInfo.ID, genIDs[1]) + c.Assert(t2, NotNil) + t3 := testGetTable(c, d, s.dbInfo.ID, genIDs[2]) + c.Assert(t3, NotNil) +} + +func (s *testTableSuite) TestTable(c *C) { + d := s.d + + ctx := testNewContext(ddl) + tblInfo, err := testTableInfo(ddl, "t", 3) require.NoError(t, err) job := testCreateTableT(t, ctx, ddl, dbInfo, tblInfo) diff --git a/executor/executor_test.go b/executor/executor_test.go index ef4f434a9fb67..edd72e030c38f 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -24,8 +24,7 @@ import ( "net" "os" "path/filepath" - "reflect" - "runtime" + "strconv" "strings" "sync" @@ -9505,129 +9504,3 @@ func (s *testSerialSuite) TestIssue30289(c *C) { c.Assert(err.Error(), Matches, "issue30289 build return error") } -// Test invoke Close without invoking Open before for each operators. -func (s *testSerialSuite) TestUnreasonablyClose(c *C) { - defer testleak.AfterTest(c)() - - is := infoschema.MockInfoSchema([]*model.TableInfo{plannercore.MockSignedTable(), plannercore.MockUnsignedTable()}) - se, err := session.CreateSession4Test(s.store) - c.Assert(err, IsNil) - _, err = se.Execute(context.Background(), "use test") - c.Assert(err, IsNil) - // To enable the shuffleExec operator. - _, err = se.Execute(context.Background(), "set @@tidb_merge_join_concurrency=4") - c.Assert(err, IsNil) - - var opsNeedsCovered = []plannercore.PhysicalPlan{ - &plannercore.PhysicalHashJoin{}, - &plannercore.PhysicalMergeJoin{}, - &plannercore.PhysicalIndexJoin{}, - &plannercore.PhysicalIndexHashJoin{}, - &plannercore.PhysicalTableReader{}, - &plannercore.PhysicalIndexReader{}, - &plannercore.PhysicalIndexLookUpReader{}, - &plannercore.PhysicalIndexMergeReader{}, - &plannercore.PhysicalApply{}, - &plannercore.PhysicalHashAgg{}, - &plannercore.PhysicalStreamAgg{}, - &plannercore.PhysicalLimit{}, - &plannercore.PhysicalSort{}, - &plannercore.PhysicalTopN{}, - &plannercore.PhysicalCTE{}, - &plannercore.PhysicalCTETable{}, - &plannercore.PhysicalMaxOneRow{}, - &plannercore.PhysicalProjection{}, - &plannercore.PhysicalSelection{}, - &plannercore.PhysicalTableDual{}, - &plannercore.PhysicalWindow{}, - &plannercore.PhysicalShuffle{}, - &plannercore.PhysicalUnionAll{}, - } - executorBuilder := executor.NewMockExecutorBuilderForTest(se, is, nil, math.MaxUint64, false, "global") - - var opsNeedsCoveredMask uint64 = 1< t1.a) AS a from t as t1) t", - "select /*+ hash_agg() */ count(f) from t group by a", - "select /*+ stream_agg() */ count(f) from t group by a", - "select * from t order by a, f", - "select * from t order by a, f limit 1", - "select * from t limit 1", - "select (select t1.a from t t1 where t1.a > t2.a) as a from t t2;", - "select a + 1 from t", - "select count(*) a from t having a > 1", - "select * from t where a = 1.1", - "with recursive cte1(c1) as (select 1 union select c1 + 1 from cte1 limit 5 offset 0) select * from cte1", - "select /*+use_index_merge(t, c_d_e, f)*/ * from t where c < 1 or f > 2", - "select sum(f) over (partition by f) from t", - "select /*+ merge_join(t1)*/ * from t t1 join t t2 on t1.d = t2.d", - "select a from t union all select a from t", - } { - comment := Commentf("case:%v sql:%s", i, tc) - c.Assert(err, IsNil, comment) - stmt, err := s.ParseOneStmt(tc, "", "") - c.Assert(err, IsNil, comment) - - err = se.NewTxn(context.Background()) - c.Assert(err, IsNil, comment) - p, _, err := planner.Optimize(context.TODO(), se, stmt, is) - c.Assert(err, IsNil, comment) - // This for loop level traverses the plan tree to get which operators are covered. - for child := []plannercore.PhysicalPlan{p.(plannercore.PhysicalPlan)}; len(child) != 0; { - newChild := make([]plannercore.PhysicalPlan, 0, len(child)) - for _, ch := range child { - found := false - for k, t := range opsNeedsCovered { - if reflect.TypeOf(t) == reflect.TypeOf(ch) { - opsAlreadyCoveredMask |= 1 << k - found = true - break - } - } - c.Assert(found, IsTrue, Commentf("case: %v sql: %s operator %v is not registered in opsNeedsCoveredMask", i, tc, reflect.TypeOf(ch))) - switch x := ch.(type) { - case *plannercore.PhysicalCTE: - newChild = append(newChild, x.RecurPlan) - newChild = append(newChild, x.SeedPlan) - continue - case *plannercore.PhysicalShuffle: - newChild = append(newChild, x.DataSources...) - newChild = append(newChild, x.Tails...) - continue - } - newChild = append(newChild, ch.Children()...) - } - child = newChild - } - - e := executorBuilder.Build(p) - - func() { - defer func() { - r := recover() - buf := make([]byte, 4096) - stackSize := runtime.Stack(buf, false) - buf = buf[:stackSize] - c.Assert(r, IsNil, Commentf("case: %v\n sql: %s\n error stack: %v", i, tc, string(buf))) - }() - c.Assert(e.Close(), IsNil, comment) - }() - } - // The following code is used to make sure all the operators registered - // in opsNeedsCoveredMask are covered. - commentBuf := strings.Builder{} - if opsAlreadyCoveredMask != opsNeedsCoveredMask { - for i := range opsNeedsCovered { - if opsAlreadyCoveredMask&(1<= version80 { - return - } - // Check if tidb_analyze_version exists in mysql.GLOBAL_VARIABLES. - // If not, insert "tidb_analyze_version | 1" since this is the old behavior before we introduce this variable. - ctx := context.Background() - rs, err := s.ExecuteInternal(ctx, "SELECT VARIABLE_VALUE FROM %n.%n WHERE VARIABLE_NAME=%?;", - mysql.SystemDB, mysql.GlobalVariablesTable, variable.TiDBAnalyzeVersion) - terror.MustNil(err) - req := rs.NewChunk(nil) - err = rs.Next(ctx, req) - terror.MustNil(err) - if req.NumRows() != 0 { - return - } - - mustExecute(s, "INSERT HIGH_PRIORITY IGNORE INTO %n.%n VALUES (%?, %?);", - mysql.SystemDB, mysql.GlobalVariablesTable, variable.TiDBAnalyzeVersion, 1) -} func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" diff --git a/session/session_test.go b/session/session_test.go index 7b5febe0d18e0..f2ecee0574f68 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -5884,34 +5884,3 @@ func (s *testSessionSuite) TestSameNameObjectWithLocalTemporaryTable(c *C) { ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) } -func (s *testSessionSuite) TestWriteOnMultipleCachedTable(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("drop table if exists ct1, ct2") - tk.MustExec("create table ct1 (id int, c int)") - tk.MustExec("create table ct2 (id int, c int)") - tk.MustExec("alter table ct1 cache") - tk.MustExec("alter table ct2 cache") - tk.MustQuery("select * from ct1").Check(testkit.Rows()) - tk.MustQuery("select * from ct2").Check(testkit.Rows()) - - cached := false - for i := 0; i < 50; i++ { - if tk.HasPlan("select * from ct1", "Union") { - if tk.HasPlan("select * from ct2", "Union") { - cached = true - break - } - } - time.Sleep(100 * time.Millisecond) - } - c.Assert(cached, IsTrue) - - tk.MustExec("begin") - tk.MustExec("insert into ct1 values (3, 4)") - tk.MustExec("insert into ct2 values (5, 6)") - tk.MustExec("commit") - - tk.MustQuery("select * from ct1").Check(testkit.Rows("3 4")) - tk.MustQuery("select * from ct2").Check(testkit.Rows("5 6")) -} diff --git a/table/tables/cache.go b/table/tables/cache.go index 7e3eb7c40b9a9..a31f5d0780854 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -183,31 +183,57 @@ func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, t } // AddRecord implements the AddRecord method for the table.Table interface. -func (c *cachedTable) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { - txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) - return c.TableCommon.AddRecord(sctx, r, opts...) -} -func txnCtxAddCachedTable(sctx sessionctx.Context, tid int64, handle StateRemote) { - txnCtx := sctx.GetSessionVars().TxnCtx - if txnCtx.CachedTables == nil { - txnCtx.CachedTables = make(map[int64]interface{}) +func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { + txn, err := ctx.Txn(true) + if err != nil { + return nil, err } - if _, ok := txnCtx.CachedTables[tid]; !ok { - txnCtx.CachedTables[tid] = handle + now := txn.StartTS() + start := time.Now() + err = c.handle.LockForWrite(context.Background(), c.Meta().ID, leaseFromTS(now)) + if err != nil { + return nil, errors.Trace(err) } + ctx.GetSessionVars().StmtCtx.WaitLockLeaseTime += time.Since(start) + return c.TableCommon.AddRecord(ctx, r, opts...) + } // UpdateRecord implements table.Table func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { - txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) + + txn, err := sctx.Txn(true) + if err != nil { + return err + } + now := txn.StartTS() + start := time.Now() + err = c.handle.LockForWrite(ctx, c.Meta().ID, leaseFromTS(now)) + if err != nil { + return errors.Trace(err) + } + sctx.GetSessionVars().StmtCtx.WaitLockLeaseTime += time.Since(start) + return c.TableCommon.UpdateRecord(ctx, sctx, h, oldData, newData, touched) } // RemoveRecord implements table.Table RemoveRecord interface. -func (c *cachedTable) RemoveRecord(sctx sessionctx.Context, h kv.Handle, r []types.Datum) error { - txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) - return c.TableCommon.RemoveRecord(sctx, h, r) + +func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error { + txn, err := ctx.Txn(true) + if err != nil { + return err + } + now := txn.StartTS() + start := time.Now() + err = c.handle.LockForWrite(context.Background(), c.Meta().ID, leaseFromTS(now)) + if err != nil { + return errors.Trace(err) + } + ctx.GetSessionVars().StmtCtx.WaitLockLeaseTime += time.Since(start) + return c.TableCommon.RemoveRecord(ctx, h, r) + } func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData) func() { diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index aeddd5b972ab2..0138a01ab94e1 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -67,7 +67,9 @@ type StateRemote interface { LockForRead(ctx context.Context, tid int64, lease uint64) (bool, error) // LockForWrite try to add a write lock to the table with the specified tableID - LockForWrite(ctx context.Context, tid int64) (uint64, error) + + LockForWrite(ctx context.Context, tid int64, lease uint64) error + // RenewLease attempt to renew the read / write lock on the table with the specified tableID RenewLease(ctx context.Context, tid int64, newTs uint64, op RenewLeaseType) (bool, error) @@ -132,32 +134,33 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, ts uint6 return succ, err } -// LockForWrite try to add a write lock to the table with the specified tableID, return the write lock lease. -func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64) (uint64, error) { + +func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, ts uint64) error { h.Lock() defer h.Unlock() - var ret uint64 for { - waitAndRetry, lease, err := h.lockForWriteOnce(ctx, tid) + waitAndRetry, err := h.lockForWriteOnce(ctx, tid, ts) if err != nil { - return 0, err + return err } if waitAndRetry == 0 { - ret = lease + break } time.Sleep(waitAndRetry) } - return ret, nil + + return nil } -func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64) (waitAndRetry time.Duration, ts uint64, err error) { +func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, ts uint64) (waitAndRetry time.Duration, err error) { + err = h.runInTxn(ctx, func(ctx context.Context, now uint64) error { lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) } - ts = leaseFromTS(now) + // The lease is outdated, so lock is invalid, clear orphan lock of any kind. if now > lease { if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { @@ -218,69 +221,38 @@ func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, newLease h.Lock() defer h.Unlock() - switch op { - case RenewReadLease: - return h.renewReadLease(ctx, tid, newLease) - case RenewWriteLease: - return h.renewWriteLease(ctx, tid, newLease) - } - return false, errors.New("wrong renew lease type") -} -func (h *stateRemoteHandle) renewReadLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { var succ bool - err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { - lockType, oldLease, _, err := h.loadRow(ctx, tid) - if err != nil { - return errors.Trace(err) - } - if now >= oldLease { - // read lock had already expired, fail to renew - return nil - } - if lockType != CachedTableLockRead { - // Not read lock, fail to renew - return nil - } - - if newLease > oldLease { // lease should never decrease! - err = h.updateRow(ctx, tid, "READ", newLease) + if op == RenewReadLease { + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + lockType, oldLease, _, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) } - } - succ = true - return nil - }) - return succ, err -} + if now >= oldLease { + // read lock had already expired, fail to renew + return nil + } + if lockType != CachedTableLockRead { + // Not read lock, fail to renew + return nil + } -func (h *stateRemoteHandle) renewWriteLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { - var succ bool - err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { - lockType, oldLease, _, err := h.loadRow(ctx, tid) - if err != nil { - return errors.Trace(err) - } - if now >= oldLease { - // write lock had already expired, fail to renew - return nil - } - if lockType != CachedTableLockWrite { - // Not write lock, fail to renew + if newLease > oldLease { // lease should never decrease! + err = h.updateRow(ctx, tid, "READ", newLease) + if err != nil { + return errors.Trace(err) + } + } + succ = true return nil - } + }) + return succ, err + } + + // TODO: renew for write lease + return false, errors.New("not implement yet") - if newLease > oldLease { // lease should never decrease! - err = h.updateRow(ctx, tid, "WRITE", newLease) - if err != nil { - return errors.Trace(err) - } - } - succ = true - return nil - }) - return succ, err } func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { diff --git a/table/tables/state_remote_test.go b/table/tables/state_remote_test.go index dc4e9272b1830..188598761d95b 100644 --- a/table/tables/state_remote_test.go +++ b/table/tables/state_remote_test.go @@ -27,7 +27,21 @@ import ( "github.com/tikv/client-go/v2/oracle" ) -// initRow add a new record into the cached table meta lock table. + +// CreateMetaLockForCachedTable initializes the cached table meta lock information. +func createMetaLockForCachedTable(h session.Session) error { + createTable := "CREATE TABLE IF NOT EXISTS `mysql`.`table_cache_meta` (" + + "`tid` int(11) NOT NULL DEFAULT 0," + + "`lock_type` enum('NONE','READ', 'INTEND', 'WRITE') NOT NULL DEFAULT 'NONE'," + + "`lease` bigint(20) NOT NULL DEFAULT 0," + + "`oldReadLease` bigint(20) NOT NULL DEFAULT 0," + + "PRIMARY KEY (`tid`))" + _, err := h.ExecuteInternal(context.Background(), createTable) + return err +} + +// InitRow add a new record into the cached table meta lock table. + func initRow(ctx context.Context, exec session.Session, tid int) error { _, err := exec.ExecuteInternal(ctx, "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", tid) return err @@ -43,6 +57,11 @@ func TestStateRemote(t *testing.T) { ctx := context.Background() h := tables.NewStateRemote(se) + err := createMetaLockForCachedTable(se) + require.NoError(t, err) + require.Equal(t, tables.CachedTableLockNone, tables.CachedTableLockType(0)) + + // Check the initial value. require.NoError(t, initRow(ctx, se, 5)) lockType, lease, err := h.Load(ctx, 5) @@ -89,18 +108,21 @@ func TestStateRemote(t *testing.T) { require.Equal(t, lease, leaseVal) // Check write lock. - writeLease, err := h.LockForWrite(ctx, 5) - require.NoError(t, err) + + leaseVal = oracle.GoTimeToTS(physicalTime.Add(700 * time.Millisecond)) + require.NoError(t, h.LockForWrite(ctx, 5, leaseVal)) + lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockWrite) require.Equal(t, lockType.String(), "WRITE") - require.Equal(t, writeLease, lease) - require.Greater(t, writeLease, leaseVal) + + require.Equal(t, lease, leaseVal) // Lock for write again - writeLease, err = h.LockForWrite(ctx, 5) - require.NoError(t, err) + leaseVal = oracle.GoTimeToTS(physicalTime.Add(800 * time.Millisecond)) + require.NoError(t, h.LockForWrite(ctx, 5, leaseVal)) + lockType, _, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockWrite) @@ -116,14 +138,11 @@ func TestStateRemote(t *testing.T) { require.NoError(t, err) require.False(t, succ) - // Renew write lease. - succ, err = h.RenewLease(ctx, 5, writeLease+1, tables.RenewWriteLease) + // But clear orphan write lock should success. + time.Sleep(time.Second) + leaseVal = oracle.GoTimeToTS(physicalTime.Add(2 * time.Second)) + succ, err = h.LockForRead(ctx, 5, leaseVal) require.NoError(t, err) require.True(t, succ) - lockType, lease, err = h.Load(ctx, 5) - require.NoError(t, err) - require.Equal(t, lockType, tables.CachedTableLockWrite) - require.Equal(t, lockType.String(), "WRITE") - require.Equal(t, lease, writeLease+1) } diff --git a/testkit/testkit.go b/testkit/testkit.go index c99791efe369a..e6cb548a566c8 100644 --- a/testkit/testkit.go +++ b/testkit/testkit.go @@ -238,20 +238,6 @@ func (tk *TestKit) MustUseIndex(sql string, index string, args ...interface{}) b return false } -// MustUseIndex4ExplainFor checks if the result execution plan contains specific index(es). -func (tk *TestKit) MustUseIndex4ExplainFor(result *Result, index string) bool { - for i := range result.rows { - // It depends on whether we enable to collect the execution info. - if strings.Contains(result.rows[i][3], "index:"+index) { - return true - } - if strings.Contains(result.rows[i][4], "index:"+index) { - return true - } - } - return false -} - // CheckExecResult checks the affected rows and the insert id after executing MustExec. func (tk *TestKit) CheckExecResult(affectedRows, insertID int64) { tk.require.Equal(int64(tk.Session().AffectedRows()), affectedRows)