Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle: support fallback for dump historical stats #40889

Merged
merged 14 commits into from
Feb 1, 2023
30 changes: 30 additions & 0 deletions executor/historical_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,33 @@ PARTITION p0 VALUES LESS THAN (6)
require.NotNil(t, jsTable.Partitions["p0"])
require.NotNil(t, jsTable.Partitions["global"])
}

func TestDumpHistoricalStatsFallback(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_historical_stats = 0")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec(`CREATE TABLE t (a int, b int, index idx(b))
PARTITION BY RANGE ( a ) (
PARTITION p0 VALUES LESS THAN (6)
)`)
// dump historical stats
tk.MustExec("analyze table t")
is := dom.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
require.NotNil(t, tbl)

// dump historical stats
hsWorker := dom.GetHistoricalStatsWorker()
tblID := hsWorker.GetOneHistoricalStatsTable()
// assert no historical stats task generated
require.Equal(t, tblID, int64(-1))
tk.MustExec("set global tidb_enable_historical_stats = 1")
h := dom.StatsHandle()
jt, err := h.DumpHistoricalStatsBySnapshot("test", tbl.Meta(), oracle.GoTimeToTS(time.Now()))
require.NoError(t, err)
require.NotNil(t, jt)
require.False(t, jt.IsHistoricalStats)
}
176 changes: 141 additions & 35 deletions server/statistics_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package server

import (
"database/sql"
"encoding/json"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -125,33 +124,99 @@ func TestDumpStatsAPI(t *testing.T) {
_, err = fp1.Write(js)
require.NoError(t, err)
checkData(t, path1, client)

testDumpPartitionTableStats(t, client, statsHandler)
}

func testDumpPartitionTableStats(t *testing.T, client *testServerClient, handler *StatsHandler) {
preparePartitionData(t, client, handler)
check := func(dumpStats bool) {
expectedLen := 1
if dumpStats {
expectedLen = 2
}
url := fmt.Sprintf("/stats/dump/test/test2?dumpPartitionStats=%v", dumpStats)
resp0, err := client.fetchStatus(url)
require.NoError(t, err)
defer func() {
resp0.Body.Close()
}()
b, err := io.ReadAll(resp0.Body)
require.NoError(t, err)
jsonTable := &handle.JSONTable{}
err = json.Unmarshal(b, jsonTable)
func TestDumpPartitionTableStats(t *testing.T) {
store := testkit.CreateMockStore(t)

driver := NewTiDBDriver(store)
client := newTestServerClient()
cfg := newTestConfig()
cfg.Port = client.port
cfg.Status.StatusPort = client.statusPort
cfg.Status.ReportStatus = true
cfg.Socket = fmt.Sprintf("/tmp/tidb-mock-%d.sock", time.Now().UnixNano())

server, err := NewServer(cfg, driver)
require.NoError(t, err)
defer server.Close()

client.port = getPortFromTCPAddr(server.listener.Addr())
client.statusPort = getPortFromTCPAddr(server.statusListener.Addr())
go func() {
err := server.Run()
require.NoError(t, err)
require.NotNil(t, jsonTable.Partitions["global"])
require.Len(t, jsonTable.Partitions, expectedLen)
}
check(false)
check(true)
}()
client.waitUntilServerOnline()

dom, err := session.GetDomain(store)
require.NoError(t, err)
statsHandler := &StatsHandler{dom}

preparePartitionData(t, client, statsHandler)
tableInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("tidb"), model.NewCIStr("test2"))
require.NoError(t, err)
err = dom.GetHistoricalStatsWorker().DumpHistoricalStats(tableInfo.Meta().ID, dom.StatsHandle())
require.NoError(t, err)

router := mux.NewRouter()
router.Handle("/stats/dump/{db}/{table}", statsHandler)

resp0, err := client.fetchStatus("/stats/dump/tidb/test2")
require.NoError(t, err)
defer func() {
require.NoError(t, resp0.Body.Close())
}()

path := "/tmp/stats2.json"
fp, err := os.Create(path)
require.NoError(t, err)
require.NotNil(t, fp)
defer func() {
require.NoError(t, fp.Close())
require.NoError(t, os.Remove(path))
}()

js, err := io.ReadAll(resp0.Body)
require.NoError(t, err)
_, err = fp.Write(js)
require.NoError(t, err)
checkPartitionTableData(t, path, client)

// sleep for 1 seconds to ensure the existence of tidb.test2
time.Sleep(time.Second)
timeBeforeDropStats := time.Now()
snapshot := timeBeforeDropStats.Format("20060102150405")
prepare4DumpPartitionTableHistoryStats(t, client)

// test dump history stats
resp1, err := client.fetchStatus("/stats/dump/tidb/test2")
require.NoError(t, err)
defer func() {
require.NoError(t, resp1.Body.Close())
}()
js, err = io.ReadAll(resp1.Body)
require.NoError(t, err)

path1 := "/tmp/stats2_history.json"
fp1, err := os.Create(path1)
require.NoError(t, err)
require.NotNil(t, fp1)
defer func() {
require.NoError(t, fp1.Close())
require.NoError(t, os.Remove(path1))
}()

resp2, err := client.fetchStatus("/stats/dump/tidb/test2/" + snapshot)
require.NoError(t, err)
defer func() {
require.NoError(t, resp2.Body.Close())
}()
js, err = io.ReadAll(resp2.Body)
require.NoError(t, err)
_, err = fp1.Write(js)
require.NoError(t, err)
checkPartitionTableData(t, path1, client)
}

func prepareData(t *testing.T, client *testServerClient, statHandle *StatsHandler) {
Expand All @@ -172,8 +237,8 @@ func prepareData(t *testing.T, client *testServerClient, statHandle *StatsHandle
tk.MustExec("create index c on test (a, b)")
tk.MustExec("insert test values (1, 's')")
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
tk.MustExec("analyze table test")
tk.MustExec("set global tidb_enable_historical_stats = 1")
tk.MustExec("analyze table test")
tk.MustExec("insert into test(a,b) values (1, 'v'),(3, 'vvv'),(5, 'vv')")
is := statHandle.do.InfoSchema()
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
Expand All @@ -189,15 +254,22 @@ func preparePartitionData(t *testing.T, client *testServerClient, statHandle *St
}()
h := statHandle.do.StatsHandle()
tk := testkit.NewDBTestKit(t, db)
tk.MustExec("create database tidb")
tk.MustExec("use tidb")
tk.MustExec("create table test2(a int) PARTITION BY RANGE ( a ) (PARTITION p0 VALUES LESS THAN (6))")
tk.MustExec("insert into test2 (a) values (1)")
err = h.HandleDDLEvent(<-h.DDLEventCh())
require.NoError(t, err)
tk.MustExec("insert into test2 (a) values (1),(2),(3),(4)")
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
tk.MustExec("set global tidb_enable_historical_stats = 1")
tk.MustExec("analyze table test2")
tk.MustExec("insert into test2 (a) values (1),(2),(3),(4)")
is := statHandle.do.InfoSchema()
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
require.NoError(t, h.Update(is))
}

func prepare4DumpHistoryStats(t *testing.T, client *testServerClient) {
func prepare4DumpPartitionTableHistoryStats(t *testing.T, client *testServerClient) {
db, err := sql.Open("mysql", client.getDSN())
require.NoError(t, err, "Error connecting")
defer func() {
Expand All @@ -206,15 +278,20 @@ func prepare4DumpHistoryStats(t *testing.T, client *testServerClient) {
}()

tk := testkit.NewDBTestKit(t, db)
tk.MustExec("use tidb")
tk.MustExec("drop table tidb.test2")
tk.MustExec("create table test2 (a int) PARTITION BY RANGE ( a ) (PARTITION p0 VALUES LESS THAN (6))")
}

safePointName := "tikv_gc_safe_point"
safePointValue := "20060102-15:04:05 -0700"
safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)"
updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
tk.MustExec(updateSafePoint)
func prepare4DumpHistoryStats(t *testing.T, client *testServerClient) {
db, err := sql.Open("mysql", client.getDSN())
require.NoError(t, err, "Error connecting")
defer func() {
err := db.Close()
require.NoError(t, err)
}()

tk := testkit.NewDBTestKit(t, db)
tk.MustExec("drop table tidb.test")
tk.MustExec("create table tidb.test (a int, b varchar(20))")
}
Expand Down Expand Up @@ -281,3 +358,32 @@ func checkData(t *testing.T, path string, client *testServerClient) {
require.Equal(t, int64(4), count)
require.NoError(t, rows.Close())
}

func checkPartitionTableData(t *testing.T, path string, client *testServerClient) {
db, err := sql.Open("mysql", client.getDSN(func(config *mysql.Config) {
config.AllowAllFiles = true
config.Params["sql_mode"] = "''"
}))
require.NoError(t, err, "Error connecting")
tk := testkit.NewDBTestKit(t, db)
defer func() {
err := db.Close()
require.NoError(t, err)
}()

tk.MustExec("use tidb")
tk.MustExec("drop stats test2")
tk.MustExec(fmt.Sprintf("load stats '%s'", path))
rows := tk.MustQuery("show stats_meta")
require.True(t, rows.Next(), "unexpected data")
var dbName, tableName string
var modifyCount, count int64
var other interface{}
err = rows.Scan(&dbName, &tableName, &other, &other, &modifyCount, &count)
require.NoError(t, err)
require.Equal(t, "tidb", dbName)
require.Equal(t, "test2", tableName)
require.Equal(t, int64(4), modifyCount)
require.Equal(t, int64(8), count)
require.NoError(t, rows.Close())
}
Loading