From e8049442e3e3a7432354637b10777ccaede64156 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Mon, 19 Jul 2021 18:50:18 +0800 Subject: [PATCH 1/6] executor: Add union scan support for local temporary table --- executor/mem_reader.go | 11 ++ executor/union_iter.go | 159 +++++++++++++++++++++++++ executor/union_iter_test.go | 171 +++++++++++++++++++++++++++ planner/core/logical_plan_builder.go | 2 +- planner/core/stats.go | 4 +- session/session_test.go | 92 ++++++++++++++ 6 files changed, 437 insertions(+), 2 deletions(-) create mode 100644 executor/union_iter.go create mode 100644 executor/union_iter_test.go diff --git a/executor/mem_reader.go b/executor/mem_reader.go index d6c9fb49fef0c..88459bbda1fbb 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -322,8 +322,19 @@ func iterTxnMemBuffer(ctx sessionctx.Context, kvRanges []kv.KeyRange, fn process if err != nil { return err } + + tempTableData := ctx.GetSessionVars().TemporaryTableData for _, rg := range kvRanges { iter := txn.GetMemBuffer().SnapshotIter(rg.StartKey, rg.EndKey) + if tempTableData != nil { + snapIter, err := tempTableData.Iter(rg.StartKey, rg.EndKey) + if err != nil { + return err + } + + iter, err = NewUnionIter(iter, snapIter, false) + } + for ; iter.Valid(); err = iter.Next() { if err != nil { return err diff --git a/executor/union_iter.go b/executor/union_iter.go new file mode 100644 index 0000000000000..d490a8f70c387 --- /dev/null +++ b/executor/union_iter.go @@ -0,0 +1,159 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "bytes" + + "github.com/pingcap/tidb/kv" +) + +// UnionIter implements kv.Iterator +type UnionIter struct { + dirtyIt kv.Iterator + snapshotIt kv.Iterator + + dirtyValid bool + snapshotValid bool + + curIsDirty bool + isValid bool + reverse bool +} + +// NewUnionIter returns a union iterator for BufferStore. +func NewUnionIter(dirtyIt kv.Iterator, snapshotIt kv.Iterator, reverse bool) (*UnionIter, error) { + it := &UnionIter{ + dirtyIt: dirtyIt, + snapshotIt: snapshotIt, + dirtyValid: dirtyIt.Valid(), + snapshotValid: snapshotIt.Valid(), + reverse: reverse, + } + err := it.updateCur() + if err != nil { + return nil, err + } + return it, nil +} + +// dirtyNext makes iter.dirtyIt go and update valid status. +func (iter *UnionIter) dirtyNext() error { + err := iter.dirtyIt.Next() + iter.dirtyValid = iter.dirtyIt.Valid() + return err +} + +// snapshotNext makes iter.snapshotIt go and update valid status. +func (iter *UnionIter) snapshotNext() error { + err := iter.snapshotIt.Next() + iter.snapshotValid = iter.snapshotIt.Valid() + return err +} + +func (iter *UnionIter) updateCur() error { + iter.isValid = true + for { + if !iter.dirtyValid && !iter.snapshotValid { + iter.isValid = false + break + } + + if !iter.dirtyValid { + iter.curIsDirty = false + break + } + + if !iter.snapshotValid { + iter.curIsDirty = true + break + } + + // both valid + if iter.snapshotValid && iter.dirtyValid { + snapshotKey := iter.snapshotIt.Key() + dirtyKey := iter.dirtyIt.Key() + cmp := bytes.Compare(dirtyKey, snapshotKey) + if iter.reverse { + cmp = -cmp + } + // if equal, means both have value + if cmp == 0 { + if err := iter.snapshotNext(); err != nil { + return err + } + iter.curIsDirty = true + break + } else if cmp > 0 { + // record from snapshot comes first + iter.curIsDirty = false + break + } else { + // record from dirty comes first + iter.curIsDirty = true + break + } + } + } + return nil +} + +// Next implements the Iterator Next interface. +func (iter *UnionIter) Next() error { + var err error + if !iter.curIsDirty { + err = iter.snapshotNext() + } else { + err = iter.dirtyNext() + } + if err != nil { + return err + } + err = iter.updateCur() + return err +} + +// Value implements the Iterator Value interface. +// Multi columns +func (iter *UnionIter) Value() []byte { + if !iter.curIsDirty { + return iter.snapshotIt.Value() + } + return iter.dirtyIt.Value() +} + +// Key implements the Iterator Key interface. +func (iter *UnionIter) Key() kv.Key { + if !iter.curIsDirty { + return iter.snapshotIt.Key() + } + return iter.dirtyIt.Key() +} + +// Valid implements the Iterator Valid interface. +func (iter *UnionIter) Valid() bool { + return iter.isValid +} + +// Close implements the Iterator Close interface. +func (iter *UnionIter) Close() { + if iter.snapshotIt != nil { + iter.snapshotIt.Close() + iter.snapshotIt = nil + } + if iter.dirtyIt != nil { + iter.dirtyIt.Close() + iter.dirtyIt = nil + } +} diff --git a/executor/union_iter_test.go b/executor/union_iter_test.go new file mode 100644 index 0000000000000..06d4ed69091b2 --- /dev/null +++ b/executor/union_iter_test.go @@ -0,0 +1,171 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + . "github.com/pingcap/check" + "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/kv" +) + +var _ = Suite(&testUnionIterSuit{}) + +type testUnionIterSuit struct { +} + +func (s *testUnionIterSuit) TestUnionIter(c *C) { + // test iter normal cases, snap iter become invalid before dirty iter + snapRecords := []*mockRecord{ + r("k01", "v1"), + r("k03", "v3"), + r("k06", "v6"), + r("k10", "v10"), + r("k12", "v12"), + r("k15", "v15"), + r("k16", "v16"), + } + + dirtyRecords := []*mockRecord{ + r("k03", "x3"), + r("k05", "x5"), + r("k07", "x7"), + r("k08", "x8"), + } + + assertUnionIter(c, dirtyRecords, snapRecords, []*mockRecord{ + r("k01", "v1"), + r("k03", "x3"), + r("k05", "x5"), + r("k06", "v6"), + r("k07", "x7"), + r("k08", "x8"), + r("k10", "v10"), + r("k12", "v12"), + r("k15", "v15"), + r("k16", "v16"), + }) + + // test iter normal cases, dirty iter become invalid before snap iter + dirtyRecords = []*mockRecord{ + r("k03", "x3"), + r("k05", "x5"), + r("k07", "x7"), + r("k08", "x8"), + r("k17", "x17"), + r("k18", "x18"), + } + + assertUnionIter(c, dirtyRecords, snapRecords, []*mockRecord{ + r("k01", "v1"), + r("k03", "x3"), + r("k05", "x5"), + r("k06", "v6"), + r("k07", "x7"), + r("k08", "x8"), + r("k10", "v10"), + r("k12", "v12"), + r("k15", "v15"), + r("k16", "v16"), + r("k17", "x17"), + r("k18", "x18"), + }) +} + +func assertUnionIter(c *C, dirtyRecords, snapRecords, expected []*mockRecord) { + iter, err := executor.NewUnionIter(newMockIter(dirtyRecords), newMockIter(snapRecords), false) + c.Assert(err, IsNil) + assertIter(c, iter, expected) + + // assert reverse is true + iter, err = executor.NewUnionIter(newMockIter(reverseRecords(dirtyRecords)), newMockIter(reverseRecords(snapRecords)), true) + c.Assert(err, IsNil) + assertIter(c, iter, reverseRecords(expected)) +} + +func assertIter(c *C, iter kv.Iterator, expected []*mockRecord) { + records := make([]*mockRecord, 0, len(expected)) + for iter.Valid() { + records = append(records, &mockRecord{iter.Key(), iter.Value()}) + err := iter.Next() + c.Assert(err, IsNil) + } + c.Assert(len(records), Equals, len(expected)) + for idx, record := range records { + c.Assert(record.key, BytesEquals, expected[idx].key) + c.Assert(record.value, BytesEquals, expected[idx].value) + } +} + +func reverseRecords(records []*mockRecord) []*mockRecord { + reversed := make([]*mockRecord, 0) + for i := range records { + reversed = append(reversed, records[len(records)-i-1]) + } + return reversed +} + +type mockRecord struct { + key []byte + value []byte +} + +func r(key, value string) *mockRecord { + bKey := []byte(key) + bValue := []byte(value) + if value == "nil" { + bValue = nil + } + + return &mockRecord{bKey, bValue} +} + +type mockIter struct { + data []*mockRecord + cur int +} + +func newMockIter(records []*mockRecord) *mockIter { + return &mockIter{ + records, + 0, + } +} + +func (m *mockIter) Valid() bool { + return m.cur >= 0 && m.cur < len(m.data) +} + +func (m *mockIter) Key() kv.Key { + return m.data[m.cur].key +} + +func (m *mockIter) Value() []byte { + return m.data[m.cur].value +} + +func (m *mockIter) Next() error { + if m.Valid() { + m.cur += 1 + } + + return nil +} + +func (m *mockIter) Close() { + m.cur = -1 +} + +func (m *mockIter) reset() { + m.cur = 0 +} diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index bcda8eb5dcd37..38f4bbbeb8e02 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -4085,7 +4085,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as var result LogicalPlan = ds dirty := tableHasDirtyContent(b.ctx, tableInfo) - if dirty { + if dirty || tableInfo.TempTableType == model.TempTableLocal { us := LogicalUnionScan{handleCols: handleCols}.Init(b.ctx, b.getSelectOffset()) us.SetChildren(ds) result = us diff --git a/planner/core/stats.go b/planner/core/stats.go index 2fc0e73f5c704..cba344cfb3aae 100644 --- a/planner/core/stats.go +++ b/planner/core/stats.go @@ -17,6 +17,8 @@ import ( "math" "sort" + "github.com/pingcap/parser/model" + "github.com/pingcap/errors" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/mysql" @@ -326,7 +328,7 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo, selfSchema * } } } - if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn { + if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn && ds.tableInfo.TempTableType == model.TempTableNone { err := ds.generateAndPruneIndexMergePath(ds.indexMergeHints != nil) if err != nil { return nil, err diff --git a/session/session_test.go b/session/session_test.go index 6800ccd3277ed..0d9e0d3e7b1d1 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -5057,3 +5057,95 @@ func (s *testSessionSuite) TestLocalTemporaryTableBatchPointGet(c *C) { tk.MustQuery("select * from tmp1 where id in (1, 4)").Check(testkit.Rows("1 11 101")) tk.MustQuery("select * from tmp1 where u in (11, 14)").Check(testkit.Rows("1 11 101")) } + +func (s *testSessionSuite) TestLocalTemporaryTableScan(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("set @@tidb_enable_noop_functions=1") + tk.MustExec("use test") + tk.MustExec("create temporary table tmp1 (id int primary key auto_increment, u int unique, v int)") + tk.MustExec("insert into tmp1 values" + + "(1, 101, 1001), (3, 113, 1003), (5, 105, 1005), (7, 117, 1007), (9, 109, 1009)," + + "(10, 110, 1010), (12, 112, 1012), (14, 114, 1014), (16, 116, 1016), (18, 118, 1018)", + ) + + assertSelectAsUnModified := func() { + // For TableReader + tk.MustQuery("select * from tmp1 where id>3 order by id").Check(testkit.Rows( + "5 105 1005", "7 117 1007", "9 109 1009", + "10 110 1010", "12 112 1012", "14 114 1014", "16 116 1016", "18 118 1018", + )) + + // For IndexLookUpReader + tk.MustQuery("select /*+ use_index(tmp1, u) */ * from tmp1 where u>101 order by u").Check(testkit.Rows( + "5 105 1005", "9 109 1009", "10 110 1010", + "12 112 1012", "3 113 1003", "14 114 1014", "16 116 1016", "7 117 1007", "18 118 1018", + )) + tk.MustQuery("show warnings").Check(testkit.Rows()) + + // For IndexReader + tk.MustQuery("select /*+ use_index(tmp1, u) */ id,u from tmp1 where u>101 order by id").Check(testkit.Rows( + "3 113", "5 105", "7 117", "9 109", "10 110", + "12 112", "14 114", "16 116", "18 118", + )) + tk.MustQuery("show warnings").Check(testkit.Rows()) + + // For IndexMerge, temporary table should not use index merge + tk.MustQuery("select /*+ use_index_merge(tmp1, primary, u) */ * from tmp1 where id>5 or u>110 order by u").Check(testkit.Rows( + "9 109 1009", "10 110 1010", + "12 112 1012", "3 113 1003", "14 114 1014", "16 116 1016", "7 117 1007", "18 118 1018", + )) + + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 IndexMerge is inapplicable or disabled")) + } + + doModify := func() { + tk.MustExec("insert into tmp1 values(2, 100, 1002)") + tk.MustExec("insert into tmp1 values(4, 104, 1004)") + tk.MustExec("insert into tmp1 values(11, 111, 1011)") + tk.MustExec("update tmp1 set v=9999 where id=7") + tk.MustExec("update tmp1 set u=132 where id=12") + tk.MustExec("delete from tmp1 where id=16") + } + + assertSelectAsModified := func() { + // For TableReader + tk.MustQuery("select * from tmp1 where id>3 order by id").Check(testkit.Rows( + "4 104 1004", "5 105 1005", "7 117 9999", "9 109 1009", + "10 110 1010", "11 111 1011", "12 132 1012", "14 114 1014", "18 118 1018", + )) + + // For IndexLookUpReader + tk.MustQuery("select /*+ use_index(tmp1, u) */ * from tmp1 where u>101 order by u").Check(testkit.Rows( + "4 104 1004", "5 105 1005", "9 109 1009", "10 110 1010", "11 111 1011", + "3 113 1003", "14 114 1014", "7 117 9999", "18 118 1018", "12 132 1012", + )) + tk.MustQuery("show warnings").Check(testkit.Rows()) + + // For IndexReader + tk.MustQuery("select /*+ use_index(tmp1, u) */ id,u from tmp1 where u>101 order by id").Check(testkit.Rows( + "3 113", "4 104", "5 105", "7 117", "9 109", + "10 110", "11 111", "12 132", "14 114", "18 118", + )) + tk.MustQuery("show warnings").Check(testkit.Rows()) + + // For IndexMerge, temporary table should not use index merge + tk.MustQuery("select /*+ use_index_merge(tmp1, primary, u) */ * from tmp1 where id>5 or u>110 order by u").Check(testkit.Rows( + "9 109 1009", "10 110 1010", "11 111 1011", + "3 113 1003", "14 114 1014", "7 117 9999", "18 118 1018", "12 132 1012", + )) + + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 IndexMerge is inapplicable or disabled")) + } + + assertSelectAsUnModified() + tk.MustExec("begin") + assertSelectAsUnModified() + doModify() + tk.MustExec("rollback") + assertSelectAsUnModified() + tk.MustExec("begin") + doModify() + assertSelectAsModified() + tk.MustExec("commit") + assertSelectAsModified() +} From 4f1100f013462118d9980ec8e352b1c0f620fe2c Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Mon, 19 Jul 2021 19:03:54 +0800 Subject: [PATCH 2/6] modify some code --- planner/core/stats.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/planner/core/stats.go b/planner/core/stats.go index cba344cfb3aae..406782aa44c83 100644 --- a/planner/core/stats.go +++ b/planner/core/stats.go @@ -17,10 +17,9 @@ import ( "math" "sort" - "github.com/pingcap/parser/model" - "github.com/pingcap/errors" "github.com/pingcap/parser/ast" + "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/planner/property" @@ -328,7 +327,7 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo, selfSchema * } } } - if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn && ds.tableInfo.TempTableType == model.TempTableNone { + if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn && ds.tableInfo.TempTableType != model.TempTableLocal { err := ds.generateAndPruneIndexMergePath(ds.indexMergeHints != nil) if err != nil { return nil, err From 7c901636d9b55e71e3bd543314d9c8f1f6248a0d Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Mon, 19 Jul 2021 19:11:43 +0800 Subject: [PATCH 3/6] remove unused method --- executor/union_iter_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/executor/union_iter_test.go b/executor/union_iter_test.go index 06d4ed69091b2..1f51f2fa8d51f 100644 --- a/executor/union_iter_test.go +++ b/executor/union_iter_test.go @@ -165,7 +165,3 @@ func (m *mockIter) Next() error { func (m *mockIter) Close() { m.cur = -1 } - -func (m *mockIter) reset() { - m.cur = 0 -} From b08f1c18093125992e29ff4d9c4fb9c94ce3bfbe Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Mon, 19 Jul 2021 19:22:24 +0800 Subject: [PATCH 4/6] fix some code --- executor/mem_reader.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/executor/mem_reader.go b/executor/mem_reader.go index 88459bbda1fbb..5dcfedc76188e 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -333,6 +333,9 @@ func iterTxnMemBuffer(ctx sessionctx.Context, kvRanges []kv.KeyRange, fn process } iter, err = NewUnionIter(iter, snapIter, false) + if err != nil { + return err + } } for ; iter.Valid(); err = iter.Next() { From 733aa2fa0f5204ae9aa60e75bec8311a8bc11a34 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 20 Jul 2021 17:35:03 +0800 Subject: [PATCH 5/6] add comments --- planner/core/logical_plans.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 7c02b3526b5bf..b9c22239c7e22 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -460,7 +460,7 @@ type LogicalMemTable struct { QueryTimeRange QueryTimeRange } -// LogicalUnionScan is only used in non read-only txn. +// LogicalUnionScan is used in non read-only txn or for scanning a local temporary table whose snapshot data is located in memory type LogicalUnionScan struct { baseLogicalPlan From a658c0c5742124a5fb6cee4bc2a47df92341303a Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 20 Jul 2021 17:53:10 +0800 Subject: [PATCH 6/6] modify comments --- planner/core/logical_plans.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 8f8d346ed407e..3865332458582 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -460,7 +460,7 @@ type LogicalMemTable struct { QueryTimeRange QueryTimeRange } -// LogicalUnionScan is used in non read-only txn or for scanning a local temporary table whose snapshot data is located in memory +// LogicalUnionScan is used in non read-only txn or for scanning a local temporary table whose snapshot data is located in memory. type LogicalUnionScan struct { baseLogicalPlan