diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 45d5f1fa334a4..b5236127f5ee1 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -455,6 +455,9 @@ type CSVConfig struct { TrimLastSep bool `toml:"trim-last-separator" json:"trim-last-separator"` NotNull bool `toml:"not-null" json:"not-null"` BackslashEscape bool `toml:"backslash-escape" json:"backslash-escape"` + // hide these options for lightning configuration file, they can only be used by LOAD DATA + // https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-field-line-handling + StartingBy string `toml:"-" json:"-"` } type MydumperRuntime struct { diff --git a/br/pkg/lightning/mydump/csv_parser.go b/br/pkg/lightning/mydump/csv_parser.go index b7d6c6fc21903..26fb65a493183 100644 --- a/br/pkg/lightning/mydump/csv_parser.go +++ b/br/pkg/lightning/mydump/csv_parser.go @@ -47,9 +47,10 @@ type CSVParser struct { blockParser cfg *config.CSVConfig - comma []byte - quote []byte - newLine []byte + comma []byte + quote []byte + newLine []byte + startingBy []byte charsetConvertor *CharsetConvertor // These variables are used with IndexAnyByte to search a byte slice for the @@ -120,6 +121,12 @@ func NewCSVParser( } unquoteStopSet = append(unquoteStopSet, newLineStopSet...) + if len(cfg.StartingBy) > 0 { + if strings.Contains(cfg.StartingBy, terminator) { + return nil, errors.New("starting-by cannot contain (line) terminator") + } + } + escFlavor := backslashEscapeFlavorNone if cfg.BackslashEscape { escFlavor = backslashEscapeFlavorMySQL @@ -138,6 +145,7 @@ func NewCSVParser( comma: []byte(separator), quote: []byte(delimiter), newLine: []byte(terminator), + startingBy: []byte(cfg.StartingBy), escFlavor: escFlavor, quoteByteSet: makeByteSet(quoteStopSet), unquoteByteSet: makeByteSet(unquoteStopSet), @@ -370,11 +378,43 @@ func (parser *CSVParser) readRecord(dst []string) ([]string, error) { isEmptyLine := true whitespaceLine := true + foundStartingByThisLine := false prevToken := csvTokenNewLine var firstToken csvToken outside: for { + // we should drop + // 1. the whole line if it does not contain startingBy + // 2. any character before startingBy + // since we have checked startingBy does not contain terminator, we can + // split at terminator to check the substring contains startingBy. Even + // if the terminator is inside a quoted field which means it's not the + // end of a line, the substring can still be dropped by rule 2. + if len(parser.startingBy) > 0 && !foundStartingByThisLine { + oldPos := parser.pos + content, _, err := parser.ReadUntilTerminator() + if err != nil { + if !(errors.Cause(err) == io.EOF) { + return nil, err + } + if len(content) == 0 { + return nil, err + } + // if we reached EOF, we should still check the content contains + // startingBy and try to put back and parse it. + } + idx := bytes.Index(content, parser.startingBy) + if idx == -1 { + continue + } + foundStartingByThisLine = true + content = content[idx+len(parser.startingBy):] + content = append(content, parser.newLine...) + parser.buf = append(content, parser.buf...) + parser.pos = oldPos + int64(idx+len(parser.startingBy)) + } + content, firstByte, err := parser.readUntil(&parser.unquoteByteSet) if len(content) > 0 { @@ -415,6 +455,7 @@ outside: } whitespaceLine = false case csvTokenNewLine: + foundStartingByThisLine = false // new line = end of record (ignore empty lines) prevToken = firstToken if isEmptyLine { @@ -578,17 +619,21 @@ func (parser *CSVParser) ReadColumns() error { } // ReadUntilTerminator seeks the file until the terminator token is found, and -// returns the file offset beyond the terminator. -// This function is used in strict-format dividing a CSV file. -func (parser *CSVParser) ReadUntilTerminator() (int64, error) { +// returns +// - the content before terminator +// - the file offset beyond the terminator +// - error +// Note that the terminator string pattern may be the content of a field, which +// means it's inside quotes. Caller should make sure to handle this case. +func (parser *CSVParser) ReadUntilTerminator() ([]byte, int64, error) { for { - _, firstByte, err := parser.readUntil(&parser.newLineByteSet) + content, firstByte, err := parser.readUntil(&parser.newLineByteSet) if err != nil { - return 0, err + return content, 0, err } parser.skipBytes(1) if ok, err := parser.tryReadNewLine(firstByte); ok || err != nil { - return parser.pos, err + return content, parser.pos, err } } } diff --git a/br/pkg/lightning/mydump/csv_parser_test.go b/br/pkg/lightning/mydump/csv_parser_test.go index da06c15ed39d9..adb057679b3a4 100644 --- a/br/pkg/lightning/mydump/csv_parser_test.go +++ b/br/pkg/lightning/mydump/csv_parser_test.go @@ -55,6 +55,35 @@ func runTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int } } +func runTestCasesCSVIgnoreNLines(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int64, cases []testCase, ignoreNLines int) { + for _, tc := range cases { + charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) + assert.NoError(t, err) + parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(tc.input), blockBufSize, ioWorkers, false, charsetConvertor) + assert.NoError(t, err) + + for ignoreNLines > 0 { + // IGNORE N LINES will directly find (line) terminator without checking it's inside quotes + _, _, err = parser.ReadUntilTerminator() + if errors.Cause(err) == io.EOF { + assert.Len(t, tc.expected, 0, "input = %q", tc.input) + return + } + assert.NoError(t, err) + ignoreNLines-- + } + + for i, row := range tc.expected { + comment := fmt.Sprintf("input = %q, row = %d", tc.input, i+1) + e := parser.ReadRow() + assert.NoErrorf(t, e, "input = %q, row = %d, error = %s", tc.input, i+1, errors.ErrorStack(e)) + assert.Equal(t, int64(i)+1, parser.LastRow().RowID, comment) + assert.Equal(t, row, parser.LastRow().Row, comment) + } + assert.ErrorIsf(t, errors.Cause(parser.ReadRow()), io.EOF, "input = %q", tc.input) + } +} + func runFailingTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int64, cases []string) { for _, tc := range cases { charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) @@ -935,6 +964,211 @@ func TestTerminator(t *testing.T) { runTestCasesCSV(t, &cfg, 1, testCases) } +func TestStartingBy(t *testing.T) { + cfg := config.MydumperRuntime{ + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + Terminator: "\n", + StartingBy: "xxx", + }, + } + testCases := []testCase{ + { + input: `xxx"abc",1 +something xxx"def",2 +"ghi",3`, + expected: [][]types.Datum{ + {types.NewStringDatum("abc"), types.NewStringDatum("1")}, + {types.NewStringDatum("def"), types.NewStringDatum("2")}, + }, + }, + } + runTestCasesCSV(t, &cfg, 1, testCases) + + testCases = []testCase{ + { + input: `xxxabc,1 +something xxxdef,2 +ghi,3 +"bad syntax"aaa`, + expected: [][]types.Datum{ + {types.NewStringDatum("abc"), types.NewStringDatum("1")}, + {types.NewStringDatum("def"), types.NewStringDatum("2")}, + }, + }, + } + runTestCasesCSV(t, &cfg, 1, testCases) + + // test that special characters appears before StartingBy, and StartingBy only takes effect after once + + testCases = []testCase{ + { + input: `xxx"abc",1 +something xxxdef,2 +"ghi",3 +"yyy"xxx"yyy",4 +"yyy",5,xxxyyy,5 +qwe,zzzxxxyyy,6 +"yyyxxx"yyyxxx",7 +yyy",5,xxxxxx,8 +`, + expected: [][]types.Datum{ + {types.NewStringDatum("abc"), types.NewStringDatum("1")}, + {types.NewStringDatum("def"), types.NewStringDatum("2")}, + {types.NewStringDatum("yyy"), types.NewStringDatum("4")}, + {types.NewStringDatum("yyy"), types.NewStringDatum("5")}, + {types.NewStringDatum("yyy"), types.NewStringDatum("6")}, + {types.NewStringDatum("yyyxxx"), types.NewStringDatum("7")}, + {types.NewStringDatum("xxx"), types.NewStringDatum("8")}, + }, + }, + } + runTestCasesCSV(t, &cfg, 1, testCases) + + // test StartingBy contains special characters + + cfg = config.MydumperRuntime{ + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + Terminator: "\n", + StartingBy: "x,xx", + }, + } + testCases = []testCase{ + { + input: `x,xx"abc",1 +something x,xxdef,2 +"ghi",3 +"yyy"xxx"yyy",4 +"yyy",5,xxxyyy,5 +qwe,zzzxxxyyy,6 +"yyyxxx"yyyxxx",7 +yyy",5,xx,xxxx,8`, + expected: [][]types.Datum{ + {types.NewStringDatum("abc"), types.NewStringDatum("1")}, + {types.NewStringDatum("def"), types.NewStringDatum("2")}, + {types.NewStringDatum("xx"), types.NewStringDatum("8")}, + }, + }, + } + runTestCasesCSV(t, &cfg, 1, testCases) + + cfg = config.MydumperRuntime{ + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + Terminator: "\n", + StartingBy: `x"xx`, + }, + } + testCases = []testCase{ + { + input: `x"xx"abc",1 +something x"xxdef,2 +"ghi",3 +"yyy"xxx"yyy",4 +"yyy",5,xxxyyy,5 +qwe,zzzxxxyyy,6 +"yyyxxx"yyyxxx",7 +yyy",5,xx"xxxx,8 +`, + expected: [][]types.Datum{ + {types.NewStringDatum("abc"), types.NewStringDatum("1")}, + {types.NewStringDatum("def"), types.NewStringDatum("2")}, + {types.NewStringDatum("xx"), types.NewStringDatum("8")}, + }, + }, + } + runTestCasesCSV(t, &cfg, 1, testCases) + + cfg = config.MydumperRuntime{ + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + Terminator: "\n", + StartingBy: "x\nxx", + }, + } + _, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, nil, 1, ioWorkers, false, nil) + require.ErrorContains(t, err, "starting-by cannot contain (line) terminator") +} + +func TestCallerCanIgnoreNLines(t *testing.T) { + cfg := config.MydumperRuntime{ + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + Terminator: "\n", + }, + } + testCases := []testCase{ + { + input: `1,1 +2,2 +3,3`, + expected: [][]types.Datum{ + {types.NewStringDatum("3"), types.NewStringDatum("3")}, + }, + }, + } + runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2) + + testCases = []testCase{ + { + input: `"bad syntax"1 +"b",2 +"c",3`, + expected: [][]types.Datum{ + {types.NewStringDatum("c"), types.NewStringDatum("3")}, + }, + }, + } + runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2) + + cfg = config.MydumperRuntime{ + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + Terminator: "\n", + }, + } + testCases = []testCase{ + { + input: `1,1 +2,2 +3,3`, + expected: [][]types.Datum{}, + }, + } + runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 100) + + // test IGNORE N LINES will directly find (line) terminator without checking it's inside quotes + + cfg = config.MydumperRuntime{ + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + Terminator: "\n", + }, + } + testCases = []testCase{ + { + input: `"a +",1 +"b +",2 +"c",3`, + expected: [][]types.Datum{ + {types.NewStringDatum("b\n"), types.NewStringDatum("2")}, + {types.NewStringDatum("c"), types.NewStringDatum("3")}, + }, + }, + } + runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2) +} + func TestCharsetConversion(t *testing.T) { cfg := config.MydumperRuntime{ CSV: config.CSVConfig{ diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index da3b4d0af1a53..aba71f666be2e 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -431,7 +431,7 @@ func SplitLargeFile( if err = parser.SetPos(endOffset, prevRowIDMax); err != nil { return 0, nil, nil, err } - pos, err := parser.ReadUntilTerminator() + _, pos, err := parser.ReadUntilTerminator() if err != nil { if !errors.ErrorEqual(err, io.EOF) { return 0, nil, nil, err diff --git a/executor/seqtest/prepared_test.go b/executor/seqtest/prepared_test.go index 9da67370b9b09..5811a386f137d 100644 --- a/executor/seqtest/prepared_test.go +++ b/executor/seqtest/prepared_test.go @@ -806,3 +806,63 @@ func TestIssue38323(t *testing.T) { tk.MustExec("set @a = 1;") tk.MustQuery("execute stmt using @a, @a").Check(tk.MustQuery("explain select * from t where 1 = id and 1 = k group by id, k").Rows()) } + +func TestSetPlanCacheLimitSwitch(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustQuery("select @@session.tidb_enable_plan_cache_for_param_limit").Check(testkit.Rows("1")) + tk.MustQuery("select @@global.tidb_enable_plan_cache_for_param_limit").Check(testkit.Rows("1")) + + tk.MustExec("set @@session.tidb_enable_plan_cache_for_param_limit = OFF;") + tk.MustQuery("select @@session.tidb_enable_plan_cache_for_param_limit").Check(testkit.Rows("0")) + + tk.MustExec("set @@session.tidb_enable_plan_cache_for_param_limit = 1;") + tk.MustQuery("select @@session.tidb_enable_plan_cache_for_param_limit").Check(testkit.Rows("1")) + + tk.MustExec("set @@global.tidb_enable_plan_cache_for_param_limit = off;") + tk.MustQuery("select @@global.tidb_enable_plan_cache_for_param_limit").Check(testkit.Rows("0")) + + tk.MustExec("set @@global.tidb_enable_plan_cache_for_param_limit = ON;") + tk.MustQuery("select @@global.tidb_enable_plan_cache_for_param_limit").Check(testkit.Rows("1")) +} + +func TestPlanCacheLimitSwitchEffective(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, key(a))") + + checkIfCached := func(res string) { + tk.MustExec("set @a = 1") + tk.MustExec("execute stmt using @a") + tk.MustExec("execute stmt using @a") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows(res)) + } + + // before prepare + tk.MustExec("set @@session.tidb_enable_plan_cache_for_param_limit = OFF") + tk.MustExec("prepare stmt from 'select * from t limit ?'") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: query has 'limit ?' is un-cacheable")) + checkIfCached("0") + tk.MustExec("deallocate prepare stmt") + + // after prepare + tk.MustExec("set @@session.tidb_enable_plan_cache_for_param_limit = ON") + tk.MustExec("prepare stmt from 'select * from t limit ?'") + tk.MustExec("set @@session.tidb_enable_plan_cache_for_param_limit = OFF") + checkIfCached("0") + tk.MustExec("execute stmt using @a") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: the switch 'tidb_enable_plan_cache_for_param_limit' is off")) + tk.MustExec("deallocate prepare stmt") + + // after execute + tk.MustExec("set @@session.tidb_enable_plan_cache_for_param_limit = ON") + tk.MustExec("prepare stmt from 'select * from t limit ?'") + checkIfCached("1") + tk.MustExec("set @@session.tidb_enable_plan_cache_for_param_limit = OFF") + checkIfCached("0") + tk.MustExec("deallocate prepare stmt") +} diff --git a/metrics/grafana/tidb_runtime.json b/metrics/grafana/tidb_runtime.json index 0324f6851de98..d17b1d90a104e 100644 --- a/metrics/grafana/tidb_runtime.json +++ b/metrics/grafana/tidb_runtime.json @@ -1384,6 +1384,131 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "the time goroutines have spent in the scheduler", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 57 + }, + "id": 30, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "null as zero", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.999, sum(rate(go_sched_latencies_seconds_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[30s])) by (le, instance))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "999", + "refId": "A", + "step": 10 + }, + { + "expr": "histogram_quantile(0.9999, sum(rate(go_sched_latencies_seconds_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[30s])) by (le, instance))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "9999", + "refId": "B", + "step": 10 + }, + { + "expr": "histogram_quantile(0.99999, sum(rate(go_sched_latencies_seconds_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[30s])) by (le, instance))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "99999", + "refId": "C", + "step": 10 + }, + { + "expr": "histogram_quantile(0.999999, sum(rate(go_sched_latencies_seconds_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[30s])) by (le, instance))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "999999", + "refId": "D", + "step": 10 + }, + { + "expr": "histogram_quantile(1, sum(rate(go_sched_latencies_seconds_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[30s])) by (le, instance))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "max", + "refId": "E", + "step": 10 + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "goroutine scheduler lattency", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "repeat": "instance", diff --git a/planner/core/plan_cache.go b/planner/core/plan_cache.go index 98b49c1bcc452..dbbed82a32e8f 100644 --- a/planner/core/plan_cache.go +++ b/planner/core/plan_cache.go @@ -287,7 +287,7 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared // check whether this plan is cacheable. if stmtCtx.UseCache { - checkPlanCacheability(sctx, p, len(paramTypes)) + checkPlanCacheability(sctx, p, len(paramTypes), len(limitParams)) } // put this plan into the plan cache. @@ -312,7 +312,7 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared } // checkPlanCacheability checks whether this plan is cacheable and set to skip plan cache if it's uncacheable. -func checkPlanCacheability(sctx sessionctx.Context, p Plan, paramNum int) { +func checkPlanCacheability(sctx sessionctx.Context, p Plan, paramNum int, limitParamNum int) { stmtCtx := sctx.GetSessionVars().StmtCtx var pp PhysicalPlan switch x := p.(type) { @@ -347,6 +347,11 @@ func checkPlanCacheability(sctx sessionctx.Context, p Plan, paramNum int) { stmtCtx.SetSkipPlanCache(errors.New("skip plan-cache: the plan with IndexMerge accessing Multi-Valued Index is un-cacheable")) return } + + // before cache the param limit plan, check switch + if limitParamNum != 0 && !sctx.GetSessionVars().EnablePlanCacheForParamLimit { + stmtCtx.SetSkipPlanCache(errors.New("skip plan-cache: the switch 'tidb_enable_plan_cache_for_param_limit' is off")) + } } // RebuildPlan4CachedPlan will rebuild this plan under current user parameters. diff --git a/planner/core/plan_cache_lru.go b/planner/core/plan_cache_lru.go index ac3c90b269056..7379ec5411a37 100644 --- a/planner/core/plan_cache_lru.go +++ b/planner/core/plan_cache_lru.go @@ -258,14 +258,22 @@ func (l *LRUPlanCache) memoryControl() { func (l *LRUPlanCache) pickFromBucket(bucket map[*list.Element]struct{}, matchOpts *planCacheMatchOpts) (*list.Element, bool) { for k := range bucket { plan := k.Value.(*planCacheEntry).PlanValue.(*PlanCacheValue) + // check param types' compatibility ok1 := plan.matchOpts.paramTypes.CheckTypesCompatibility4PC(matchOpts.paramTypes) if !ok1 { continue } + + // check limit offset and key if equal and check switch if enabled ok2 := checkUint64SliceIfEqual(plan.matchOpts.limitOffsetAndCount, matchOpts.limitOffsetAndCount) - if ok2 { - return k, true + if !ok2 { + continue + } + if len(plan.matchOpts.limitOffsetAndCount) > 0 && !l.sctx.GetSessionVars().EnablePlanCacheForParamLimit { + // offset and key slice matched, but it is a plan with param limit and the switch is disabled + continue } + return k, true } return nil, false } diff --git a/planner/core/plan_cache_lru_test.go b/planner/core/plan_cache_lru_test.go index 11b145ef4c372..e3d6c43c9310b 100644 --- a/planner/core/plan_cache_lru_test.go +++ b/planner/core/plan_cache_lru_test.go @@ -47,11 +47,13 @@ func randomPlanCacheValue(types []*types.FieldType) *PlanCacheValue { func TestLRUPCPut(t *testing.T) { // test initialize - lruA := NewLRUPlanCache(0, 0, 0, MockContext()) + mockCtx := MockContext() + mockCtx.GetSessionVars().EnablePlanCacheForParamLimit = true + lruA := NewLRUPlanCache(0, 0, 0, mockCtx) require.Equal(t, lruA.capacity, uint(100)) maxMemDroppedKv := make(map[kvcache.Key]kvcache.Value) - lru := NewLRUPlanCache(3, 0, 0, MockContext()) + lru := NewLRUPlanCache(3, 0, 0, mockCtx) lru.onEvict = func(key kvcache.Key, value kvcache.Value) { maxMemDroppedKv[key] = value } @@ -131,7 +133,9 @@ func TestLRUPCPut(t *testing.T) { } func TestLRUPCGet(t *testing.T) { - lru := NewLRUPlanCache(3, 0, 0, MockContext()) + mockCtx := MockContext() + mockCtx.GetSessionVars().EnablePlanCacheForParamLimit = true + lru := NewLRUPlanCache(3, 0, 0, mockCtx) keys := make([]*planCacheKey, 5) vals := make([]*PlanCacheValue, 5) @@ -185,7 +189,9 @@ func TestLRUPCGet(t *testing.T) { } func TestLRUPCDelete(t *testing.T) { - lru := NewLRUPlanCache(3, 0, 0, MockContext()) + mockCtx := MockContext() + mockCtx.GetSessionVars().EnablePlanCacheForParamLimit = true + lru := NewLRUPlanCache(3, 0, 0, mockCtx) keys := make([]*planCacheKey, 3) vals := make([]*PlanCacheValue, 3) diff --git a/planner/core/plan_cacheable_checker.go b/planner/core/plan_cacheable_checker.go index 2ff9e51823ee2..3da0f285cd9bf 100644 --- a/planner/core/plan_cacheable_checker.go +++ b/planner/core/plan_cacheable_checker.go @@ -135,22 +135,21 @@ func (checker *cacheableChecker) Enter(in ast.Node) (out ast.Node, skipChildren return in, true } } - // todo: these comment is used to add switch in the later pr - //case *ast.Limit: - // if node.Count != nil { - // if _, isParamMarker := node.Count.(*driver.ParamMarkerExpr); isParamMarker { - // checker.cacheable = false - // checker.reason = "query has 'limit ?' is un-cacheable" - // return in, true - // } - // } - // if node.Offset != nil { - // if _, isParamMarker := node.Offset.(*driver.ParamMarkerExpr); isParamMarker { - // checker.cacheable = false - // checker.reason = "query has 'limit ?, 10' is un-cacheable" - // return in, true - // } - // } + case *ast.Limit: + if node.Count != nil { + if _, isParamMarker := node.Count.(*driver.ParamMarkerExpr); isParamMarker && !checker.sctx.GetSessionVars().EnablePlanCacheForParamLimit { + checker.cacheable = false + checker.reason = "query has 'limit ?' is un-cacheable" + return in, true + } + } + if node.Offset != nil { + if _, isParamMarker := node.Offset.(*driver.ParamMarkerExpr); isParamMarker && !checker.sctx.GetSessionVars().EnablePlanCacheForParamLimit { + checker.cacheable = false + checker.reason = "query has 'limit ?, 10' is un-cacheable" + return in, true + } + } case *ast.FrameBound: if _, ok := node.Expr.(*driver.ParamMarkerExpr); ok { checker.cacheable = false diff --git a/planner/core/plan_cacheable_checker_test.go b/planner/core/plan_cacheable_checker_test.go index 7d417e377888f..fc09b7b536530 100644 --- a/planner/core/plan_cacheable_checker_test.go +++ b/planner/core/plan_cacheable_checker_test.go @@ -26,11 +26,14 @@ import ( "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/testkit" driver "github.com/pingcap/tidb/types/parser_driver" + "github.com/pingcap/tidb/util/mock" "github.com/stretchr/testify/require" ) func TestCacheable(t *testing.T) { store := testkit.CreateMockStore(t) + mockCtx := mock.NewContext() + mockCtx.GetSessionVars().EnablePlanCacheForParamLimit = true tk := testkit.NewTestKit(t, store) @@ -87,7 +90,8 @@ func TestCacheable(t *testing.T) { TableRefs: tableRefsClause, Limit: limitStmt, } - require.True(t, core.Cacheable(stmt, is)) + c, _ := core.CacheableWithCtx(mockCtx, stmt, is) + require.True(t, c) limitStmt = &ast.Limit{ Offset: &driver.ParamMarkerExpr{}, @@ -96,14 +100,16 @@ func TestCacheable(t *testing.T) { TableRefs: tableRefsClause, Limit: limitStmt, } - require.True(t, core.Cacheable(stmt, is)) + c, _ = core.CacheableWithCtx(mockCtx, stmt, is) + require.True(t, c) limitStmt = &ast.Limit{} stmt = &ast.DeleteStmt{ TableRefs: tableRefsClause, Limit: limitStmt, } - require.True(t, core.Cacheable(stmt, is)) + c, _ = core.CacheableWithCtx(mockCtx, stmt, is) + require.True(t, c) stmt.(*ast.DeleteStmt).TableHints = append(stmt.(*ast.DeleteStmt).TableHints, &ast.TableOptimizerHint{ HintName: model.NewCIStr(core.HintIgnorePlanCache), @@ -139,7 +145,8 @@ func TestCacheable(t *testing.T) { TableRefs: tableRefsClause, Limit: limitStmt, } - require.True(t, core.Cacheable(stmt, is)) + c, _ = core.CacheableWithCtx(mockCtx, stmt, is) + require.True(t, c) limitStmt = &ast.Limit{ Offset: &driver.ParamMarkerExpr{}, @@ -148,14 +155,16 @@ func TestCacheable(t *testing.T) { TableRefs: tableRefsClause, Limit: limitStmt, } - require.True(t, core.Cacheable(stmt, is)) + c, _ = core.CacheableWithCtx(mockCtx, stmt, is) + require.True(t, c) limitStmt = &ast.Limit{} stmt = &ast.UpdateStmt{ TableRefs: tableRefsClause, Limit: limitStmt, } - require.True(t, core.Cacheable(stmt, is)) + c, _ = core.CacheableWithCtx(mockCtx, stmt, is) + require.True(t, c) stmt.(*ast.UpdateStmt).TableHints = append(stmt.(*ast.UpdateStmt).TableHints, &ast.TableOptimizerHint{ HintName: model.NewCIStr(core.HintIgnorePlanCache), @@ -188,7 +197,8 @@ func TestCacheable(t *testing.T) { stmt = &ast.SelectStmt{ Limit: limitStmt, } - require.True(t, core.Cacheable(stmt, is)) + c, _ = core.CacheableWithCtx(mockCtx, stmt, is) + require.True(t, c) limitStmt = &ast.Limit{ Offset: &driver.ParamMarkerExpr{}, @@ -196,13 +206,15 @@ func TestCacheable(t *testing.T) { stmt = &ast.SelectStmt{ Limit: limitStmt, } - require.True(t, core.Cacheable(stmt, is)) + c, _ = core.CacheableWithCtx(mockCtx, stmt, is) + require.True(t, c) limitStmt = &ast.Limit{} stmt = &ast.SelectStmt{ Limit: limitStmt, } - require.True(t, core.Cacheable(stmt, is)) + c, _ = core.CacheableWithCtx(mockCtx, stmt, is) + require.True(t, c) paramExpr := &driver.ParamMarkerExpr{} orderByClause := &ast.OrderByClause{Items: []*ast.ByItem{{Expr: paramExpr}}} diff --git a/planner/core/rule_generate_column_substitute.go b/planner/core/rule_generate_column_substitute.go index c796f99af62c5..88039392bf1a3 100644 --- a/planner/core/rule_generate_column_substitute.go +++ b/planner/core/rule_generate_column_substitute.go @@ -15,12 +15,11 @@ package core import ( + "bytes" "context" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/parser/ast" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/types" ) @@ -37,13 +36,13 @@ type ExprColumnMap map[expression.Expression]*expression.Column // For example: select a+1 from t order by a+1, with a virtual generate column c as (a+1) and // an index on c. We need to replace a+1 with c so that we can use the index on c. // See also https://dev.mysql.com/doc/refman/8.0/en/generated-column-index-optimizations.html -func (gc *gcSubstituter) optimize(ctx context.Context, lp LogicalPlan, _ *logicalOptimizeOp) (LogicalPlan, error) { +func (gc *gcSubstituter) optimize(ctx context.Context, lp LogicalPlan, opt *logicalOptimizeOp) (LogicalPlan, error) { exprToColumn := make(ExprColumnMap) collectGenerateColumn(lp, exprToColumn) if len(exprToColumn) == 0 { return lp, nil } - return gc.substitute(ctx, lp, exprToColumn), nil + return gc.substitute(ctx, lp, exprToColumn, opt), nil } // collectGenerateColumn collect the generate column and save them to a map from their expressions to themselves. @@ -74,18 +73,33 @@ func collectGenerateColumn(lp LogicalPlan, exprToColumn ExprColumnMap) { } } -func tryToSubstituteExpr(expr *expression.Expression, sctx sessionctx.Context, candidateExpr expression.Expression, tp types.EvalType, schema *expression.Schema, col *expression.Column) { - if (*expr).Equal(sctx, candidateExpr) && candidateExpr.GetType().EvalType() == tp && +func tryToSubstituteExpr(expr *expression.Expression, lp LogicalPlan, candidateExpr expression.Expression, tp types.EvalType, schema *expression.Schema, col *expression.Column, opt *logicalOptimizeOp) { + if (*expr).Equal(lp.SCtx(), candidateExpr) && candidateExpr.GetType().EvalType() == tp && schema.ColumnIndex(col) != -1 { *expr = col + appendSubstituteColumnStep(lp, candidateExpr, col, opt) } } -func substituteExpression(cond expression.Expression, sctx *stmtctx.StatementContext, sessionCtx sessionctx.Context, exprToColumn ExprColumnMap, schema *expression.Schema) { +func appendSubstituteColumnStep(lp LogicalPlan, candidateExpr expression.Expression, col *expression.Column, opt *logicalOptimizeOp) { + reason := func() string { return "" } + action := func() string { + buffer := bytes.NewBufferString("expression:") + buffer.WriteString(candidateExpr.String()) + buffer.WriteString(" substituted by") + buffer.WriteString(" column:") + buffer.WriteString(col.String()) + return buffer.String() + } + opt.appendStepToCurrent(lp.ID(), lp.TP(), reason, action) +} + +func substituteExpression(cond expression.Expression, lp LogicalPlan, exprToColumn ExprColumnMap, schema *expression.Schema, opt *logicalOptimizeOp) { sf, ok := cond.(*expression.ScalarFunction) if !ok { return } + sctx := lp.SCtx().GetSessionVars().StmtCtx defer func() { // If the argument is not changed, hash code doesn't need to recount again. // But we always do it to keep the code simple and stupid. @@ -96,10 +110,10 @@ func substituteExpression(cond expression.Expression, sctx *stmtctx.StatementCon switch sf.FuncName.L { case ast.EQ, ast.LT, ast.LE, ast.GT, ast.GE: for candidateExpr, column := range exprToColumn { - tryToSubstituteExpr(&sf.GetArgs()[1], sessionCtx, candidateExpr, sf.GetArgs()[0].GetType().EvalType(), schema, column) + tryToSubstituteExpr(&sf.GetArgs()[1], lp, candidateExpr, sf.GetArgs()[0].GetType().EvalType(), schema, column, opt) } for candidateExpr, column := range exprToColumn { - tryToSubstituteExpr(&sf.GetArgs()[0], sessionCtx, candidateExpr, sf.GetArgs()[1].GetType().EvalType(), schema, column) + tryToSubstituteExpr(&sf.GetArgs()[0], lp, candidateExpr, sf.GetArgs()[1].GetType().EvalType(), schema, column, opt) } case ast.In: expr = &sf.GetArgs()[0] @@ -115,43 +129,42 @@ func substituteExpression(cond expression.Expression, sctx *stmtctx.StatementCon } if canSubstitute { for candidateExpr, column := range exprToColumn { - tryToSubstituteExpr(expr, sessionCtx, candidateExpr, tp, schema, column) + tryToSubstituteExpr(expr, lp, candidateExpr, tp, schema, column, opt) } } case ast.Like: expr = &sf.GetArgs()[0] tp = sf.GetArgs()[1].GetType().EvalType() for candidateExpr, column := range exprToColumn { - tryToSubstituteExpr(expr, sessionCtx, candidateExpr, tp, schema, column) + tryToSubstituteExpr(expr, lp, candidateExpr, tp, schema, column, opt) } case ast.LogicOr, ast.LogicAnd: - substituteExpression(sf.GetArgs()[0], sctx, sessionCtx, exprToColumn, schema) - substituteExpression(sf.GetArgs()[1], sctx, sessionCtx, exprToColumn, schema) + substituteExpression(sf.GetArgs()[0], lp, exprToColumn, schema, opt) + substituteExpression(sf.GetArgs()[1], lp, exprToColumn, schema, opt) case ast.UnaryNot: - substituteExpression(sf.GetArgs()[0], sctx, sessionCtx, exprToColumn, schema) + substituteExpression(sf.GetArgs()[0], lp, exprToColumn, schema, opt) } } -func (gc *gcSubstituter) substitute(ctx context.Context, lp LogicalPlan, exprToColumn ExprColumnMap) LogicalPlan { - sctx := lp.SCtx().GetSessionVars().StmtCtx +func (gc *gcSubstituter) substitute(ctx context.Context, lp LogicalPlan, exprToColumn ExprColumnMap, opt *logicalOptimizeOp) LogicalPlan { var tp types.EvalType switch x := lp.(type) { case *LogicalSelection: for _, cond := range x.Conditions { - substituteExpression(cond, sctx, lp.SCtx(), exprToColumn, x.Schema()) + substituteExpression(cond, lp, exprToColumn, x.Schema(), opt) } case *LogicalProjection: for i := range x.Exprs { tp = x.Exprs[i].GetType().EvalType() for candidateExpr, column := range exprToColumn { - tryToSubstituteExpr(&x.Exprs[i], lp.SCtx(), candidateExpr, tp, x.children[0].Schema(), column) + tryToSubstituteExpr(&x.Exprs[i], lp, candidateExpr, tp, x.children[0].Schema(), column, opt) } } case *LogicalSort: for i := range x.ByItems { tp = x.ByItems[i].Expr.GetType().EvalType() for candidateExpr, column := range exprToColumn { - tryToSubstituteExpr(&x.ByItems[i].Expr, lp.SCtx(), candidateExpr, tp, x.Schema(), column) + tryToSubstituteExpr(&x.ByItems[i].Expr, lp, candidateExpr, tp, x.Schema(), column, opt) } } case *LogicalAggregation: @@ -162,6 +175,7 @@ func (gc *gcSubstituter) substitute(ctx context.Context, lp LogicalPlan, exprToC if aggFunc.Args[i].Equal(lp.SCtx(), candidateExpr) && candidateExpr.GetType().EvalType() == tp && x.Schema().ColumnIndex(column) != -1 { aggFunc.Args[i] = column + appendSubstituteColumnStep(lp, candidateExpr, column, opt) } } } @@ -172,12 +186,13 @@ func (gc *gcSubstituter) substitute(ctx context.Context, lp LogicalPlan, exprToC if x.GroupByItems[i].Equal(lp.SCtx(), candidateExpr) && candidateExpr.GetType().EvalType() == tp && x.Schema().ColumnIndex(column) != -1 { x.GroupByItems[i] = column + appendSubstituteColumnStep(lp, candidateExpr, column, opt) } } } } for _, child := range lp.Children() { - gc.substitute(ctx, child, exprToColumn) + gc.substitute(ctx, child, exprToColumn, opt) } return lp } diff --git a/server/http_handler.go b/server/http_handler.go index 6044f82861386..2855186b9cdfa 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -206,7 +206,7 @@ func (t *tikvHandlerTool) getHandle(tb table.PhysicalTable, params map[string]st return handle, nil } -func (t *tikvHandlerTool) getMvccByIdxValue(idx table.Index, values url.Values, idxCols []*model.ColumnInfo, handle kv.Handle) (*helper.MvccKV, error) { +func (t *tikvHandlerTool) getMvccByIdxValue(idx table.Index, values url.Values, idxCols []*model.ColumnInfo, handle kv.Handle) ([]*helper.MvccKV, error) { sc := new(stmtctx.StatementContext) // HTTP request is not a database session, set timezone to UTC directly here. // See https://github.com/pingcap/tidb/blob/master/docs/tidb_http_api.md for more details. @@ -227,7 +227,18 @@ func (t *tikvHandlerTool) getMvccByIdxValue(idx table.Index, values url.Values, if err != nil { return nil, err } - return &helper.MvccKV{Key: strings.ToUpper(hex.EncodeToString(encodedKey)), RegionID: regionID, Value: data}, err + idxData := &helper.MvccKV{Key: strings.ToUpper(hex.EncodeToString(encodedKey)), RegionID: regionID, Value: data} + tablecodec.IndexKey2TempIndexKey(idx.Meta().ID, encodedKey) + data, err = t.GetMvccByEncodedKey(encodedKey) + if err != nil { + return nil, err + } + regionID, err = t.getRegionIDByKey(encodedKey) + if err != nil { + return nil, err + } + tempIdxData := &helper.MvccKV{Key: strings.ToUpper(hex.EncodeToString(encodedKey)), RegionID: regionID, Value: data} + return append([]*helper.MvccKV{}, idxData, tempIdxData), err } // formValue2DatumRow converts URL query string to a Datum Row. diff --git a/server/http_handler_test.go b/server/http_handler_test.go index 7d2b8c3867bf5..c28967abf06ee 100644 --- a/server/http_handler_test.go +++ b/server/http_handler_test.go @@ -539,16 +539,16 @@ partition by range (a) func decodeKeyMvcc(closer io.ReadCloser, t *testing.T, valid bool) { decoder := json.NewDecoder(closer) - var data helper.MvccKV + var data []helper.MvccKV err := decoder.Decode(&data) require.NoError(t, err) if valid { - require.NotNil(t, data.Value.Info) - require.Greater(t, len(data.Value.Info.Writes), 0) + require.NotNil(t, data[0].Value.Info) + require.Greater(t, len(data[0].Value.Info.Writes), 0) } else { - require.Nil(t, data.Value.Info.Lock) - require.Nil(t, data.Value.Info.Writes) - require.Nil(t, data.Value.Info.Values) + require.Nil(t, data[0].Value.Info.Lock) + require.Nil(t, data[0].Value.Info.Writes) + require.Nil(t, data[0].Value.Info.Values) } } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 0b6e7ef0cb0d0..eb6eeadc72d79 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1253,6 +1253,9 @@ type SessionVars struct { // PreparedPlanCacheMonitor indicates whether to enable prepared plan cache monitor. EnablePreparedPlanCacheMemoryMonitor bool + // EnablePlanCacheForParamLimit controls whether the prepare statement with parameterized limit can be cached + EnablePlanCacheForParamLimit bool + // EnableNonPreparedPlanCache indicates whether to enable non-prepared plan cache. EnableNonPreparedPlanCache bool diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 6a872c6e18fd2..b339ac39140cc 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -2284,6 +2284,10 @@ var defaultSysVars = []*SysVar{ s.PessimisticTransactionAggressiveLocking = TiDBOptOn(val) return nil }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnablePlanCacheForParamLimit, Value: BoolToOnOff(DefTiDBEnablePlanCacheForParamLimit), Type: TypeBool, SetSession: func(s *SessionVars, val string) error { + s.EnablePlanCacheForParamLimit = TiDBOptOn(val) + return nil + }}, } // FeedbackProbability points to the FeedbackProbability in statistics package. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index c147fdda69ba7..c86795937d544 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -796,6 +796,9 @@ const ( // TiDBPessimisticTransactionAggressiveLocking controls whether aggressive locking for pessimistic transaction // is enabled. TiDBPessimisticTransactionAggressiveLocking = "tidb_pessimistic_txn_aggressive_locking" + + // TiDBEnablePlanCacheForParamLimit controls whether prepare statement with parameterized limit can be cached + TiDBEnablePlanCacheForParamLimit = "tidb_enable_plan_cache_for_param_limit" ) // TiDB vars that have only global scope @@ -1167,6 +1170,7 @@ const ( DefTiDBTTLDeleteWorkerCount = 4 DefTiDBEnableResourceControl = false DefTiDBPessimisticTransactionAggressiveLocking = false + DefTiDBEnablePlanCacheForParamLimit = true ) // Process global variables.