Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: Support read operation for local temporary table #26353

Merged
merged 13 commits into from
Jul 20, 2021
Merged
14 changes: 14 additions & 0 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,22 @@ func iterTxnMemBuffer(ctx sessionctx.Context, kvRanges []kv.KeyRange, fn process
if err != nil {
return err
}

tempTableData := ctx.GetSessionVars().TemporaryTableData
for _, rg := range kvRanges {
iter := txn.GetMemBuffer().SnapshotIter(rg.StartKey, rg.EndKey)
if tempTableData != nil {
snapIter, err := tempTableData.Iter(rg.StartKey, rg.EndKey)
if err != nil {
return err
}

iter, err = NewUnionIter(iter, snapIter, false)
if err != nil {
return err
}
}

for ; iter.Valid(); err = iter.Next() {
if err != nil {
return err
Expand Down
159 changes: 159 additions & 0 deletions executor/union_iter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"bytes"

"github.com/pingcap/tidb/kv"
)

// UnionIter implements kv.Iterator
type UnionIter struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnionIter is moved to the client-go repo and is now internal
https://github.com/tikv/client-go/blob/master/internal/unionstore/union_iter.go

dirtyIt kv.Iterator
snapshotIt kv.Iterator

dirtyValid bool
snapshotValid bool

curIsDirty bool
isValid bool
reverse bool
}

// NewUnionIter returns a union iterator for BufferStore.
func NewUnionIter(dirtyIt kv.Iterator, snapshotIt kv.Iterator, reverse bool) (*UnionIter, error) {
it := &UnionIter{
dirtyIt: dirtyIt,
snapshotIt: snapshotIt,
dirtyValid: dirtyIt.Valid(),
snapshotValid: snapshotIt.Valid(),
reverse: reverse,
}
err := it.updateCur()
if err != nil {
return nil, err
}
return it, nil
}

// dirtyNext makes iter.dirtyIt go and update valid status.
func (iter *UnionIter) dirtyNext() error {
err := iter.dirtyIt.Next()
iter.dirtyValid = iter.dirtyIt.Valid()
return err
}

// snapshotNext makes iter.snapshotIt go and update valid status.
func (iter *UnionIter) snapshotNext() error {
err := iter.snapshotIt.Next()
iter.snapshotValid = iter.snapshotIt.Valid()
return err
}

func (iter *UnionIter) updateCur() error {
iter.isValid = true
for {
if !iter.dirtyValid && !iter.snapshotValid {
iter.isValid = false
break
}

if !iter.dirtyValid {
iter.curIsDirty = false
break
}

if !iter.snapshotValid {
iter.curIsDirty = true
break
}

// both valid
if iter.snapshotValid && iter.dirtyValid {
snapshotKey := iter.snapshotIt.Key()
dirtyKey := iter.dirtyIt.Key()
cmp := bytes.Compare(dirtyKey, snapshotKey)
if iter.reverse {
cmp = -cmp
}
// if equal, means both have value
if cmp == 0 {
if err := iter.snapshotNext(); err != nil {
return err
}
iter.curIsDirty = true
break
} else if cmp > 0 {
// record from snapshot comes first
iter.curIsDirty = false
break
} else {
// record from dirty comes first
iter.curIsDirty = true
break
}
}
}
return nil
}

// Next implements the Iterator Next interface.
func (iter *UnionIter) Next() error {
var err error
if !iter.curIsDirty {
err = iter.snapshotNext()
} else {
err = iter.dirtyNext()
}
if err != nil {
return err
}
err = iter.updateCur()
return err
}

// Value implements the Iterator Value interface.
// Multi columns
func (iter *UnionIter) Value() []byte {
if !iter.curIsDirty {
return iter.snapshotIt.Value()
}
return iter.dirtyIt.Value()
}

// Key implements the Iterator Key interface.
func (iter *UnionIter) Key() kv.Key {
if !iter.curIsDirty {
return iter.snapshotIt.Key()
}
return iter.dirtyIt.Key()
}

// Valid implements the Iterator Valid interface.
func (iter *UnionIter) Valid() bool {
return iter.isValid
}

// Close implements the Iterator Close interface.
func (iter *UnionIter) Close() {
if iter.snapshotIt != nil {
iter.snapshotIt.Close()
iter.snapshotIt = nil
}
if iter.dirtyIt != nil {
iter.dirtyIt.Close()
iter.dirtyIt = nil
}
}
167 changes: 167 additions & 0 deletions executor/union_iter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test

import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
)

var _ = Suite(&testUnionIterSuit{})

type testUnionIterSuit struct {
}

func (s *testUnionIterSuit) TestUnionIter(c *C) {
// test iter normal cases, snap iter become invalid before dirty iter
snapRecords := []*mockRecord{
r("k01", "v1"),
r("k03", "v3"),
r("k06", "v6"),
r("k10", "v10"),
r("k12", "v12"),
r("k15", "v15"),
r("k16", "v16"),
}

dirtyRecords := []*mockRecord{
r("k03", "x3"),
r("k05", "x5"),
r("k07", "x7"),
r("k08", "x8"),
}

assertUnionIter(c, dirtyRecords, snapRecords, []*mockRecord{
r("k01", "v1"),
r("k03", "x3"),
r("k05", "x5"),
r("k06", "v6"),
r("k07", "x7"),
r("k08", "x8"),
r("k10", "v10"),
r("k12", "v12"),
r("k15", "v15"),
r("k16", "v16"),
})

// test iter normal cases, dirty iter become invalid before snap iter
dirtyRecords = []*mockRecord{
r("k03", "x3"),
r("k05", "x5"),
r("k07", "x7"),
r("k08", "x8"),
r("k17", "x17"),
r("k18", "x18"),
}

assertUnionIter(c, dirtyRecords, snapRecords, []*mockRecord{
r("k01", "v1"),
r("k03", "x3"),
r("k05", "x5"),
r("k06", "v6"),
r("k07", "x7"),
r("k08", "x8"),
r("k10", "v10"),
r("k12", "v12"),
r("k15", "v15"),
r("k16", "v16"),
r("k17", "x17"),
r("k18", "x18"),
})
}

func assertUnionIter(c *C, dirtyRecords, snapRecords, expected []*mockRecord) {
iter, err := executor.NewUnionIter(newMockIter(dirtyRecords), newMockIter(snapRecords), false)
c.Assert(err, IsNil)
assertIter(c, iter, expected)

// assert reverse is true
iter, err = executor.NewUnionIter(newMockIter(reverseRecords(dirtyRecords)), newMockIter(reverseRecords(snapRecords)), true)
c.Assert(err, IsNil)
assertIter(c, iter, reverseRecords(expected))
}

func assertIter(c *C, iter kv.Iterator, expected []*mockRecord) {
records := make([]*mockRecord, 0, len(expected))
for iter.Valid() {
records = append(records, &mockRecord{iter.Key(), iter.Value()})
err := iter.Next()
c.Assert(err, IsNil)
}
c.Assert(len(records), Equals, len(expected))
for idx, record := range records {
c.Assert(record.key, BytesEquals, expected[idx].key)
c.Assert(record.value, BytesEquals, expected[idx].value)
}
}

func reverseRecords(records []*mockRecord) []*mockRecord {
reversed := make([]*mockRecord, 0)
for i := range records {
reversed = append(reversed, records[len(records)-i-1])
}
return reversed
}

type mockRecord struct {
key []byte
value []byte
}

func r(key, value string) *mockRecord {
bKey := []byte(key)
bValue := []byte(value)
if value == "nil" {
bValue = nil
}

return &mockRecord{bKey, bValue}
}

type mockIter struct {
data []*mockRecord
cur int
}

func newMockIter(records []*mockRecord) *mockIter {
return &mockIter{
records,
0,
}
}

func (m *mockIter) Valid() bool {
return m.cur >= 0 && m.cur < len(m.data)
}

func (m *mockIter) Key() kv.Key {
return m.data[m.cur].key
}

func (m *mockIter) Value() []byte {
return m.data[m.cur].value
}

func (m *mockIter) Next() error {
if m.Valid() {
m.cur += 1
}

return nil
}

func (m *mockIter) Close() {
m.cur = -1
}
2 changes: 1 addition & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4084,7 +4084,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as

var result LogicalPlan = ds
dirty := tableHasDirtyContent(b.ctx, tableInfo)
if dirty {
if dirty || tableInfo.TempTableType == model.TempTableLocal {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may need to update some comments. Such as LogicalUnionScan: "LogicalUnionScan is only used in non read-only txn.".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

us := LogicalUnionScan{handleCols: handleCols}.Init(b.ctx, b.getSelectOffset())
us.SetChildren(ds)
result = us
Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ type LogicalMemTable struct {
QueryTimeRange QueryTimeRange
}

// LogicalUnionScan is only used in non read-only txn.
// LogicalUnionScan is used in non read-only txn or for scanning a local temporary table whose snapshot data is located in memory
type LogicalUnionScan struct {
baseLogicalPlan

Expand Down
3 changes: 2 additions & 1 deletion planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/planner/property"
Expand Down Expand Up @@ -327,7 +328,7 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo, selfSchema *
}
}
}
if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn {
if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn && ds.tableInfo.TempTableType != model.TempTableLocal {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why skip index merge?

Copy link
Collaborator Author

@lcwangchao lcwangchao Jul 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnionScan for index merge is just a TODO, so we must skip is because local temporary table always use UnionScan.

tidb/planner/core/stats.go

Lines 308 to 320 in cca097d

// TODO: implement UnionScan + IndexMerge
isReadOnlyTxn := true
txn, err := ds.ctx.Txn(false)
if err != nil {
return nil, err
}
if txn.Valid() && !txn.IsReadOnly() {
isReadOnlyTxn = false
}
// Consider the IndexMergePath. Now, we just generate `IndexMergePath` in DNF case.
isPossibleIdxMerge := len(ds.pushedDownConds) > 0 && len(ds.possibleAccessPaths) > 1
sessionAndStmtPermission := (ds.ctx.GetSessionVars().GetEnableIndexMerge() || len(ds.indexMergeHints) > 0) && !ds.ctx.GetSessionVars().StmtCtx.NoIndexMergeHint
// If there is an index path, we current do not consider `IndexMergePath`.

err := ds.generateAndPruneIndexMergePath(ds.indexMergeHints != nil)
if err != nil {
return nil, err
Expand Down
Loading