Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exec: access the table_storage_stats need privilege #26298

Merged
merged 12 commits into from
Jul 19, 2021
18 changes: 16 additions & 2 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1672,19 +1672,33 @@ func (e *tableStorageStatsRetriever) initialize(sctx sessionctx.Context) error {
}
}

// Privilege checker.
checker := func(db, table string) bool {
if pm := privilege.GetPrivilegeManager(sctx); pm != nil {
return pm.RequestVerification(sctx.GetSessionVars().ActiveRoles, db, table, "", mysql.AllPrivMask)
}
return true
}

// Extract the tables to the initialTable.
for _, DB := range databases {
// The user didn't specified the table, extract all tables of this db to initialTable.
if len(tables) == 0 {
tbs := is.SchemaTables(model.NewCIStr(DB))
for _, tb := range tbs {
e.initialTables = append(e.initialTables, &initialTable{DB, tb.Meta()})
// For every db.table, check it's privileges.
if checker(DB, tb.Meta().Name.L) {
e.initialTables = append(e.initialTables, &initialTable{DB, tb.Meta()})
}
}
} else {
// The user specified the table, extract the specified tables of this db to initialTable.
for tb := range tables {
if tb, err := is.TableByName(model.NewCIStr(DB), model.NewCIStr(tb)); err == nil {
e.initialTables = append(e.initialTables, &initialTable{DB, tb.Meta()})
// For every db.table, check it's privileges.
if checker(DB, tb.Meta().Name.L) {
e.initialTables = append(e.initialTables, &initialTable{DB, tb.Meta()})
}
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,29 @@ func (s *testInfoschemaClusterTableSuite) TestTableStorageStats(c *C) {
"test 2",
))
c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 24)

// More tests about the privileges.
tk.MustExec("create user 'testuser'@'localhost'")
tk.MustExec("create user 'testuser2'@'localhost'")
tk1 := testkit.NewTestKit(c, store)
defer tk1.MustExec("drop user 'testuser'@'localhost'")
defer tk1.MustExec("drop user 'testuser2'@'localhost'")

tk.MustExec("grant all privileges on *.* to 'testuser2'@'localhost'")
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(tk.Se.Auth(&auth.UserIdentity{
Username: "testuser",
Hostname: "localhost",
}, nil, nil), Equals, true)

// User has no access to this schema, so the result set is empty.
tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'information_schema'").Check(testkit.Rows("0"))

c.Assert(tk.Se.Auth(&auth.UserIdentity{
Username: "testuser2",
Hostname: "localhost",
}, nil, nil), Equals, true)

tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("24"))
}

func (s *testInfoschemaTableSuite) TestSequences(c *C) {
Expand Down