-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor: Support read operation for local temporary table #26353
Changes from 9 commits
e804944
1e77d38
4f1100f
521f5e4
7c90163
3967043
b08f1c1
733aa2f
e569e42
180c731
a658c0c
1954958
eb73e47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
// Copyright 2021 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package executor | ||
|
||
import ( | ||
"bytes" | ||
|
||
"github.com/pingcap/tidb/kv" | ||
) | ||
|
||
// UnionIter implements kv.Iterator | ||
type UnionIter struct { | ||
dirtyIt kv.Iterator | ||
snapshotIt kv.Iterator | ||
|
||
dirtyValid bool | ||
snapshotValid bool | ||
|
||
curIsDirty bool | ||
isValid bool | ||
reverse bool | ||
} | ||
|
||
// NewUnionIter returns a union iterator for BufferStore. | ||
func NewUnionIter(dirtyIt kv.Iterator, snapshotIt kv.Iterator, reverse bool) (*UnionIter, error) { | ||
it := &UnionIter{ | ||
dirtyIt: dirtyIt, | ||
snapshotIt: snapshotIt, | ||
dirtyValid: dirtyIt.Valid(), | ||
snapshotValid: snapshotIt.Valid(), | ||
reverse: reverse, | ||
} | ||
err := it.updateCur() | ||
if err != nil { | ||
return nil, err | ||
} | ||
return it, nil | ||
} | ||
|
||
// dirtyNext makes iter.dirtyIt go and update valid status. | ||
func (iter *UnionIter) dirtyNext() error { | ||
err := iter.dirtyIt.Next() | ||
iter.dirtyValid = iter.dirtyIt.Valid() | ||
return err | ||
} | ||
|
||
// snapshotNext makes iter.snapshotIt go and update valid status. | ||
func (iter *UnionIter) snapshotNext() error { | ||
err := iter.snapshotIt.Next() | ||
iter.snapshotValid = iter.snapshotIt.Valid() | ||
return err | ||
} | ||
|
||
func (iter *UnionIter) updateCur() error { | ||
iter.isValid = true | ||
for { | ||
if !iter.dirtyValid && !iter.snapshotValid { | ||
iter.isValid = false | ||
break | ||
} | ||
|
||
if !iter.dirtyValid { | ||
iter.curIsDirty = false | ||
break | ||
} | ||
|
||
if !iter.snapshotValid { | ||
iter.curIsDirty = true | ||
break | ||
} | ||
|
||
// both valid | ||
if iter.snapshotValid && iter.dirtyValid { | ||
snapshotKey := iter.snapshotIt.Key() | ||
dirtyKey := iter.dirtyIt.Key() | ||
cmp := bytes.Compare(dirtyKey, snapshotKey) | ||
if iter.reverse { | ||
cmp = -cmp | ||
} | ||
// if equal, means both have value | ||
if cmp == 0 { | ||
if err := iter.snapshotNext(); err != nil { | ||
return err | ||
} | ||
iter.curIsDirty = true | ||
break | ||
} else if cmp > 0 { | ||
// record from snapshot comes first | ||
iter.curIsDirty = false | ||
break | ||
} else { | ||
// record from dirty comes first | ||
iter.curIsDirty = true | ||
break | ||
} | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// Next implements the Iterator Next interface. | ||
func (iter *UnionIter) Next() error { | ||
var err error | ||
if !iter.curIsDirty { | ||
err = iter.snapshotNext() | ||
} else { | ||
err = iter.dirtyNext() | ||
} | ||
if err != nil { | ||
return err | ||
} | ||
err = iter.updateCur() | ||
return err | ||
} | ||
|
||
// Value implements the Iterator Value interface. | ||
// Multi columns | ||
func (iter *UnionIter) Value() []byte { | ||
if !iter.curIsDirty { | ||
return iter.snapshotIt.Value() | ||
} | ||
return iter.dirtyIt.Value() | ||
} | ||
|
||
// Key implements the Iterator Key interface. | ||
func (iter *UnionIter) Key() kv.Key { | ||
if !iter.curIsDirty { | ||
return iter.snapshotIt.Key() | ||
} | ||
return iter.dirtyIt.Key() | ||
} | ||
|
||
// Valid implements the Iterator Valid interface. | ||
func (iter *UnionIter) Valid() bool { | ||
return iter.isValid | ||
} | ||
|
||
// Close implements the Iterator Close interface. | ||
func (iter *UnionIter) Close() { | ||
if iter.snapshotIt != nil { | ||
iter.snapshotIt.Close() | ||
iter.snapshotIt = nil | ||
} | ||
if iter.dirtyIt != nil { | ||
iter.dirtyIt.Close() | ||
iter.dirtyIt = nil | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
// Copyright 2021 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package executor_test | ||
|
||
import ( | ||
. "github.com/pingcap/check" | ||
"github.com/pingcap/tidb/executor" | ||
"github.com/pingcap/tidb/kv" | ||
) | ||
|
||
var _ = Suite(&testUnionIterSuit{}) | ||
|
||
type testUnionIterSuit struct { | ||
} | ||
|
||
func (s *testUnionIterSuit) TestUnionIter(c *C) { | ||
// test iter normal cases, snap iter become invalid before dirty iter | ||
snapRecords := []*mockRecord{ | ||
r("k01", "v1"), | ||
r("k03", "v3"), | ||
r("k06", "v6"), | ||
r("k10", "v10"), | ||
r("k12", "v12"), | ||
r("k15", "v15"), | ||
r("k16", "v16"), | ||
} | ||
|
||
dirtyRecords := []*mockRecord{ | ||
r("k03", "x3"), | ||
r("k05", "x5"), | ||
r("k07", "x7"), | ||
r("k08", "x8"), | ||
} | ||
|
||
assertUnionIter(c, dirtyRecords, snapRecords, []*mockRecord{ | ||
r("k01", "v1"), | ||
r("k03", "x3"), | ||
r("k05", "x5"), | ||
r("k06", "v6"), | ||
r("k07", "x7"), | ||
r("k08", "x8"), | ||
r("k10", "v10"), | ||
r("k12", "v12"), | ||
r("k15", "v15"), | ||
r("k16", "v16"), | ||
}) | ||
|
||
// test iter normal cases, dirty iter become invalid before snap iter | ||
dirtyRecords = []*mockRecord{ | ||
r("k03", "x3"), | ||
r("k05", "x5"), | ||
r("k07", "x7"), | ||
r("k08", "x8"), | ||
r("k17", "x17"), | ||
r("k18", "x18"), | ||
} | ||
|
||
assertUnionIter(c, dirtyRecords, snapRecords, []*mockRecord{ | ||
r("k01", "v1"), | ||
r("k03", "x3"), | ||
r("k05", "x5"), | ||
r("k06", "v6"), | ||
r("k07", "x7"), | ||
r("k08", "x8"), | ||
r("k10", "v10"), | ||
r("k12", "v12"), | ||
r("k15", "v15"), | ||
r("k16", "v16"), | ||
r("k17", "x17"), | ||
r("k18", "x18"), | ||
}) | ||
} | ||
|
||
func assertUnionIter(c *C, dirtyRecords, snapRecords, expected []*mockRecord) { | ||
iter, err := executor.NewUnionIter(newMockIter(dirtyRecords), newMockIter(snapRecords), false) | ||
c.Assert(err, IsNil) | ||
assertIter(c, iter, expected) | ||
|
||
// assert reverse is true | ||
iter, err = executor.NewUnionIter(newMockIter(reverseRecords(dirtyRecords)), newMockIter(reverseRecords(snapRecords)), true) | ||
c.Assert(err, IsNil) | ||
assertIter(c, iter, reverseRecords(expected)) | ||
} | ||
|
||
func assertIter(c *C, iter kv.Iterator, expected []*mockRecord) { | ||
records := make([]*mockRecord, 0, len(expected)) | ||
for iter.Valid() { | ||
records = append(records, &mockRecord{iter.Key(), iter.Value()}) | ||
err := iter.Next() | ||
c.Assert(err, IsNil) | ||
} | ||
c.Assert(len(records), Equals, len(expected)) | ||
for idx, record := range records { | ||
c.Assert(record.key, BytesEquals, expected[idx].key) | ||
c.Assert(record.value, BytesEquals, expected[idx].value) | ||
} | ||
} | ||
|
||
func reverseRecords(records []*mockRecord) []*mockRecord { | ||
reversed := make([]*mockRecord, 0) | ||
for i := range records { | ||
reversed = append(reversed, records[len(records)-i-1]) | ||
} | ||
return reversed | ||
} | ||
|
||
type mockRecord struct { | ||
key []byte | ||
value []byte | ||
} | ||
|
||
func r(key, value string) *mockRecord { | ||
bKey := []byte(key) | ||
bValue := []byte(value) | ||
if value == "nil" { | ||
bValue = nil | ||
} | ||
|
||
return &mockRecord{bKey, bValue} | ||
} | ||
|
||
type mockIter struct { | ||
data []*mockRecord | ||
cur int | ||
} | ||
|
||
func newMockIter(records []*mockRecord) *mockIter { | ||
return &mockIter{ | ||
records, | ||
0, | ||
} | ||
} | ||
|
||
func (m *mockIter) Valid() bool { | ||
return m.cur >= 0 && m.cur < len(m.data) | ||
} | ||
|
||
func (m *mockIter) Key() kv.Key { | ||
return m.data[m.cur].key | ||
} | ||
|
||
func (m *mockIter) Value() []byte { | ||
return m.data[m.cur].value | ||
} | ||
|
||
func (m *mockIter) Next() error { | ||
if m.Valid() { | ||
m.cur += 1 | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (m *mockIter) Close() { | ||
m.cur = -1 | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4084,7 +4084,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as | |
|
||
var result LogicalPlan = ds | ||
dirty := tableHasDirtyContent(b.ctx, tableInfo) | ||
if dirty { | ||
if dirty || tableInfo.TempTableType == model.TempTableLocal { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You may need to update some comments. Such as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
us := LogicalUnionScan{handleCols: handleCols}.Init(b.ctx, b.getSelectOffset()) | ||
us.SetChildren(ds) | ||
result = us | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -20,6 +20,7 @@ import ( | |||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
"github.com/pingcap/errors" | ||||||||||||||||||||||||||||
"github.com/pingcap/parser/ast" | ||||||||||||||||||||||||||||
"github.com/pingcap/parser/model" | ||||||||||||||||||||||||||||
"github.com/pingcap/parser/mysql" | ||||||||||||||||||||||||||||
"github.com/pingcap/tidb/expression" | ||||||||||||||||||||||||||||
"github.com/pingcap/tidb/planner/property" | ||||||||||||||||||||||||||||
|
@@ -327,7 +328,7 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo, selfSchema * | |||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn { | ||||||||||||||||||||||||||||
if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn && ds.tableInfo.TempTableType != model.TempTableLocal { | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why skip index merge? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. UnionScan for index merge is just a TODO, so we must skip is because local temporary table always use UnionScan. Lines 308 to 320 in cca097d
|
||||||||||||||||||||||||||||
err := ds.generateAndPruneIndexMergePath(ds.indexMergeHints != nil) | ||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||
return nil, err | ||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnionIter
is moved to the client-go repo and is now internalhttps://github.com/tikv/client-go/blob/master/internal/unionstore/union_iter.go