Skip to content

Commit

Permalink
ddl: show more jobs in the tidb_mdl_view (#40860)
Browse files Browse the repository at this point in the history
close #40838
  • Loading branch information
YangKeao authored Feb 3, 2023
1 parent 1606b05 commit 15bac9e
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 31 deletions.
78 changes: 49 additions & 29 deletions infoschema/cluster_tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,42 +816,62 @@ func (s *clusterTablesSuite) newTestKitWithRoot(t *testing.T) *testkit.TestKit {
}

func TestMDLView(t *testing.T) {
// setup suite
s := new(clusterTablesSuite)
s.store, s.dom = testkit.CreateMockStoreAndDomain(t)
s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer()
s.startTime = time.Now()
defer s.httpServer.Close()

tk := s.newTestKitWithRoot(t)
tkDDL := s.newTestKitWithRoot(t)
tk3 := s.newTestKitWithRoot(t)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_metadata_lock=1")
tk.MustExec("create table t(a int);")
tk.MustExec("insert into t values(1);")
testCases := []struct {
name string
createTable string
ddl string
queryInTxn []string
sqlDigest string
}{
{"add column", "create table t(a int)", "alter table test.t add column b int", []string{"select 1", "select * from t"}, "[\"begin\",\"select ?\",\"select * from `t`\"]"},
{"change column in 1 step", "create table t(a int)", "alter table test.t change column a b int", []string{"select 1", "select * from t"}, "[\"begin\",\"select ?\",\"select * from `t`\"]"},
}
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
// setup suite
s := new(clusterTablesSuite)
s.store, s.dom = testkit.CreateMockStoreAndDomain(t)
s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer()
s.startTime = time.Now()
defer s.httpServer.Close()

tk.MustExec("begin")
tk.MustQuery("select 1;")
tk.MustQuery("select * from t;")
tk := s.newTestKitWithRoot(t)
tkDDL := s.newTestKitWithRoot(t)
tk3 := s.newTestKitWithRoot(t)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_metadata_lock=1")
tk.MustExec(c.createTable)

tk.MustExec("begin")
for _, q := range c.queryInTxn {
tk.MustQuery(q)
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
tkDDL.MustExec("alter table test.t add column b int;")
wg.Done()
}()
var wg sync.WaitGroup
wg.Add(1)
go func() {
tkDDL.MustExec(c.ddl)
wg.Done()
}()

time.Sleep(200 * time.Millisecond)
time.Sleep(200 * time.Millisecond)

s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", tk3.Session().GetSessionManager())
defer s.rpcserver.Stop()
s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", tk3.Session().GetSessionManager())
defer s.rpcserver.Stop()

tk3.MustQuery("select DB_NAME, QUERY, SQL_DIGESTS from mysql.tidb_mdl_view").Check(testkit.Rows("test alter table test.t add column b int; [\"begin\",\"select ? ;\",\"select * from `t` ;\"]"))
tk3.MustQuery("select DB_NAME, QUERY, SQL_DIGESTS from mysql.tidb_mdl_view").Check(testkit.Rows(
strings.Join([]string{
"test",
c.ddl,
c.sqlDigest,
}, " "),
))

tk.MustExec("commit")
tk.MustExec("commit")

wg.Wait()
wg.Wait()
})
}
}

func TestCreateBindingFromHistory(t *testing.T) {
Expand Down
26 changes: 24 additions & 2 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,19 @@ const (
);`
// CreateMDLView is a view about metadata locks.
CreateMDLView = `CREATE OR REPLACE VIEW mysql.tidb_mdl_view as (
select JOB_ID, DB_NAME, TABLE_NAME, QUERY, SESSION_ID, TxnStart, TIDB_DECODE_SQL_DIGESTS(ALL_SQL_DIGESTS, 4096) AS SQL_DIGESTS from information_schema.ddl_jobs, information_schema.CLUSTER_TIDB_TRX, information_schema.CLUSTER_PROCESSLIST where ddl_jobs.STATE = 'running' and find_in_set(ddl_jobs.table_id, CLUSTER_TIDB_TRX.RELATED_TABLE_IDS) and CLUSTER_TIDB_TRX.SESSION_ID=CLUSTER_PROCESSLIST.ID
SELECT job_id,
db_name,
table_name,
query,
session_id,
txnstart,
tidb_decode_sql_digests(all_sql_digests, 4096) AS SQL_DIGESTS
FROM information_schema.ddl_jobs,
information_schema.cluster_tidb_trx,
information_schema.cluster_processlist
WHERE (ddl_jobs.state != 'synced' and ddl_jobs.state != 'cancelled')
AND Find_in_set(ddl_jobs.table_id, cluster_tidb_trx.related_table_ids)
AND cluster_tidb_trx.session_id = cluster_processlist.id
);`

// CreatePlanReplayerStatusTable is a table about plan replayer status
Expand Down Expand Up @@ -779,11 +791,13 @@ const (
version110 = 110
// version111 adds the table tidb_ttl_task and tidb_ttl_job_history
version111 = 111
// version112 modifies the view tidb_mdl_view
version112 = 112
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version111
var currentBootstrapVersion int64 = version112

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -902,6 +916,7 @@ var (
upgradeToVer109,
upgradeToVer110,
upgradeToVer111,
upgradeToVer112,
}
)

Expand Down Expand Up @@ -2262,6 +2277,13 @@ func upgradeToVer111(s Session, ver int64) {
doReentrantDDL(s, CreateTTLJobHistory)
}

func upgradeToVer112(s Session, ver int64) {
if ver >= version112 {
return
}
doReentrantDDL(s, CreateMDLView)
}

func writeOOMAction(s Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
Expand Down

0 comments on commit 15bac9e

Please sign in to comment.