Skip to content

Commit

Permalink
*: support lowerBound and IterReserve for unionScan (#45315)
Browse files Browse the repository at this point in the history
close #45314
  • Loading branch information
Defined2014 authored Jul 18, 2023
1 parent d5b006d commit f1b9da1
Show file tree
Hide file tree
Showing 20 changed files with 223 additions and 116 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3863,8 +3863,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:wRqy8mHs5IocLn4pDvqrwGs4lc3wKhdDXxFyLi8kNbQ=",
version = "v2.0.8-0.20230711075855-e540aa3b9657",
sum = "h1:88oApJuTK/WiBnBw9cV+AgA20pNZVtVIyeh8fTm2ymo=",
version = "v2.0.8-0.20230714023607-2f119351bd5c",
)

go_repository(
Expand Down
2 changes: 1 addition & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ func getRangeEndKey(ctx *JobContext, store kv.Storage, priority int, keyPrefix k
}
snap.SetOption(kv.RequestSourceInternal, true)
snap.SetOption(kv.RequestSourceType, ctx.ddlJobSourceType())
it, err := snap.IterReverse(endKey.Next())
it, err := snap.IterReverse(endKey.Next(), nil)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
98 changes: 53 additions & 45 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func buildMemIndexReader(ctx context.Context, us *UnionScanExec, idxReader *Inde
for _, col := range idxReader.outputColumns {
outputOffset = append(outputOffset, col.Index)
}
if us.desc {
for i, j := 0, len(kvRanges)-1; i < j; i, j = i+1, j-1 {
kvRanges[i], kvRanges[j] = kvRanges[j], kvRanges[i]
}
}
return &memIndexReader{
ctx: us.Ctx(),
index: idxReader.index,
Expand Down Expand Up @@ -118,7 +123,7 @@ func (m *memIndexReader) getMemRows(ctx context.Context) ([][]types.Datum, error
}

mutableRow := chunk.MutRowFromTypes(m.retFieldTypes)
err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, func(key, value []byte) error {
err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, m.desc, func(key, value []byte) error {
data, err := m.decodeIndexKeyValue(key, value, tps)
if err != nil {
return err
Expand Down Expand Up @@ -147,11 +152,6 @@ func (m *memIndexReader) getMemRows(ctx context.Context) ([][]types.Datum, error
})
return m.addedRows, err
}

// TODO: After refine `IterReverse`, remove below logic and use `IterReverse` when do reverse scan.
if m.desc {
reverseDatumSlice(m.addedRows)
}
return m.addedRows, nil
}

Expand Down Expand Up @@ -233,6 +233,11 @@ func buildMemTableReader(ctx context.Context, us *UnionScanExec, kvRanges []kv.K
}
cd := NewRowDecoder(us.Ctx(), us.Schema(), us.table.Meta())
rd := rowcodec.NewByteDecoder(colInfo, pkColIDs, defVal, us.Ctx().GetSessionVars().Location())
if us.desc {
for i, j := 0, len(kvRanges)-1; i < j; i, j = i+1, j-1 {
kvRanges[i], kvRanges[j] = kvRanges[j], kvRanges[i]
}
}
return &memTableReader{
ctx: us.Ctx(),
table: us.table.Meta(),
Expand All @@ -259,6 +264,7 @@ type txnMemBufferIter struct {
idx int
curr kv.Iterator

reverse bool
cd *rowcodec.ChunkDecoder
chk *chunk.Chunk
datumRow []types.Datum
Expand All @@ -269,13 +275,18 @@ func (iter *txnMemBufferIter) Next() ([]types.Datum, error) {
for iter.idx < len(iter.kvRanges) {
if iter.curr == nil {
rg := iter.kvRanges[iter.idx]
tmp := iter.txn.GetMemBuffer().SnapshotIter(rg.StartKey, rg.EndKey)
snapCacheIter, err := getSnapIter(iter.ctx, iter.cacheTable, rg)
var tmp kv.Iterator
if !iter.reverse {
tmp = iter.txn.GetMemBuffer().SnapshotIter(rg.StartKey, rg.EndKey)
} else {
tmp = iter.txn.GetMemBuffer().SnapshotIterReverse(rg.EndKey, rg.StartKey)
}
snapCacheIter, err := getSnapIter(iter.ctx, iter.cacheTable, rg, iter.reverse)
if err != nil {
return nil, err
}
if snapCacheIter != nil {
tmp, err = transaction.NewUnionIter(tmp, snapCacheIter, false)
tmp, err = transaction.NewUnionIter(tmp, snapCacheIter, iter.reverse)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -352,8 +363,8 @@ func (iter *txnMemBufferIter) next() ([]types.Datum, error) {
}

func (m *memTableReader) getMemRowsIter(ctx context.Context) (memRowsIter, error) {
// txnMemBufferIter not supports desc and keepOrder + partitionTable.
if m.desc || (m.keepOrder && m.table.GetPartitionInfo() != nil) {
// txnMemBufferIter not supports keepOrder + partitionTable.
if m.keepOrder && m.table.GetPartitionInfo() != nil {
data, err := m.getMemRows(ctx)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -376,6 +387,7 @@ func (m *memTableReader) getMemRowsIter(ctx context.Context) (memRowsIter, error
cd: m.buffer.cd,
chk: chunk.New(m.retFieldTypes, 1, 1),
datumRow: make([]types.Datum, len(m.retFieldTypes)),
reverse: m.desc,
}, nil
}

Expand All @@ -388,7 +400,7 @@ func (m *memTableReader) getMemRows(ctx context.Context) ([][]types.Datum, error
for i, col := range m.columns {
m.offsets[i] = m.colIDs[col.ID]
}
err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, func(key, value []byte) error {
err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, m.desc, func(key, value []byte) error {
var err error
resultRows, err = m.decodeRecordKeyValue(key, value, &resultRows)
if err != nil {
Expand Down Expand Up @@ -418,11 +430,6 @@ func (m *memTableReader) getMemRows(ctx context.Context) ([][]types.Datum, error
})
return m.addedRows, err
}

// TODO: After refine `IterReverse`, remove below logic and use `IterReverse` when do reverse scan.
if m.desc {
reverseDatumSlice(m.addedRows)
}
return m.addedRows, nil
}

Expand Down Expand Up @@ -511,7 +518,7 @@ func (m *memTableReader) getRowData(handle kv.Handle, value []byte) ([][]byte, e
// getMemRowsHandle is called when memIndexMergeReader.partialPlans[i] is TableScan.
func (m *memTableReader) getMemRowsHandle() ([]kv.Handle, error) {
handles := make([]kv.Handle, 0, 16)
err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, func(key, value []byte) error {
err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, m.desc, func(key, value []byte) error {
handle, err := tablecodec.DecodeRowKey(key)
if err != nil {
return err
Expand All @@ -522,12 +529,6 @@ func (m *memTableReader) getMemRowsHandle() ([]kv.Handle, error) {
if err != nil {
return nil, err
}

if m.desc {
for i, j := 0, len(handles)-1; i < j; i, j = i+1, j-1 {
handles[i], handles[j] = handles[j], handles[i]
}
}
return handles, nil
}

Expand All @@ -541,20 +542,25 @@ func hasColVal(data [][]byte, colIDs map[int64]int, id int64) bool {

type processKVFunc func(key, value []byte) error

func iterTxnMemBuffer(ctx sessionctx.Context, cacheTable kv.MemBuffer, kvRanges []kv.KeyRange, fn processKVFunc) error {
func iterTxnMemBuffer(ctx sessionctx.Context, cacheTable kv.MemBuffer, kvRanges []kv.KeyRange, reverse bool, fn processKVFunc) error {
txn, err := ctx.Txn(true)
if err != nil {
return err
}

for _, rg := range kvRanges {
iter := txn.GetMemBuffer().SnapshotIter(rg.StartKey, rg.EndKey)
snapCacheIter, err := getSnapIter(ctx, cacheTable, rg)
var iter kv.Iterator
if !reverse {
iter = txn.GetMemBuffer().SnapshotIter(rg.StartKey, rg.EndKey)
} else {
iter = txn.GetMemBuffer().SnapshotIterReverse(rg.EndKey, rg.StartKey)
}
snapCacheIter, err := getSnapIter(ctx, cacheTable, rg, reverse)
if err != nil {
return err
}
if snapCacheIter != nil {
iter, err = transaction.NewUnionIter(iter, snapCacheIter, false)
iter, err = transaction.NewUnionIter(iter, snapCacheIter, reverse)
if err != nil {
return err
}
Expand All @@ -576,17 +582,25 @@ func iterTxnMemBuffer(ctx sessionctx.Context, cacheTable kv.MemBuffer, kvRanges
return nil
}

func getSnapIter(ctx sessionctx.Context, cacheTable kv.MemBuffer, rg kv.KeyRange) (kv.Iterator, error) {
var snapCacheIter kv.Iterator
func getSnapIter(ctx sessionctx.Context, cacheTable kv.MemBuffer, rg kv.KeyRange, reverse bool) (snapCacheIter kv.Iterator, err error) {
var cacheIter, snapIter kv.Iterator
tempTableData := ctx.GetSessionVars().TemporaryTableData
if tempTableData != nil {
snapIter, err := tempTableData.Iter(rg.StartKey, rg.EndKey)
if !reverse {
snapIter, err = tempTableData.Iter(rg.StartKey, rg.EndKey)
} else {
snapIter, err = tempTableData.IterReverse(rg.EndKey, rg.StartKey)
}
if err != nil {
return nil, err
}
snapCacheIter = snapIter
} else if cacheTable != nil {
cacheIter, err := cacheTable.Iter(rg.StartKey, rg.EndKey)
if !reverse {
cacheIter, err = cacheTable.Iter(rg.StartKey, rg.EndKey)
} else {
cacheIter, err = cacheTable.IterReverse(rg.EndKey, rg.StartKey)
}
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -595,15 +609,9 @@ func getSnapIter(ctx sessionctx.Context, cacheTable kv.MemBuffer, rg kv.KeyRange
return snapCacheIter, nil
}

func reverseDatumSlice(rows [][]types.Datum) {
for i, j := 0, len(rows)-1; i < j; i, j = i+1, j-1 {
rows[i], rows[j] = rows[j], rows[i]
}
}

func (m *memIndexReader) getMemRowsHandle() ([]kv.Handle, error) {
handles := make([]kv.Handle, 0, m.addedRowsLen)
err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, func(key, value []byte) error {
err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, m.desc, func(key, value []byte) error {
handle, err := tablecodec.DecodeIndexHandle(key, value, len(m.index.Columns))
if err != nil {
return err
Expand All @@ -626,12 +634,6 @@ func (m *memIndexReader) getMemRowsHandle() ([]kv.Handle, error) {
if err != nil {
return nil, err
}

if m.desc {
for i, j := 0, len(handles)-1; i < j; i, j = i+1, j-1 {
handles[i], handles[j] = handles[j], handles[i]
}
}
return handles, nil
}

Expand Down Expand Up @@ -731,6 +733,12 @@ func (m *memIndexLookUpReader) getMemRows(ctx context.Context) ([][]types.Datum,
return nil, nil
}

if m.desc {
for i, j := 0, len(tblKVRanges)-1; i < j; i, j = i+1, j-1 {
tblKVRanges[i], tblKVRanges[j] = tblKVRanges[j], tblKVRanges[i]
}
}

colIDs, pkColIDs, rd := getColIDAndPkColIDs(m.ctx, m.table, m.columns)
memTblReader := &memTableReader{
ctx: m.ctx,
Expand Down
69 changes: 69 additions & 0 deletions executor/union_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,9 +613,78 @@ c6 datetime);`)
b.StopTimer()
}

func BenchmarkUnionScanIndexReadDescRead(b *testing.B) {
store := testkit.CreateMockStore(b)

tk := testkit.NewTestKit(b, store)
tk.MustExec("use test")
tk.MustExec(`create table t(a int, b int, c int, primary key(a), index k(b))`)
tk.MustExec(`begin;`)
for i := 0; i < 100; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d, %d)", i, i, i))
}

tk.HasPlan("select b from t use index(k) where b > 50 order by b desc", "IndexReader")

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// indexReader
tk.MustExec("select b from t use index(k) where b > 50 order by b desc")
}
b.StopTimer()
}

func BenchmarkUnionScanTableReadDescRead(b *testing.B) {
store := testkit.CreateMockStore(b)

tk := testkit.NewTestKit(b, store)
tk.MustExec("use test")
tk.MustExec(`create table t(a int, b int, c int, primary key(a), index k(b))`)
tk.MustExec(`begin;`)
for i := 0; i < 100; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d, %d)", i, i, i))
}

tk.HasPlan("select * from t where a > 50 order by a desc", "TableReader")

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// tableReader
tk.MustExec("select * from t where a > 50 order by a desc")
}
b.StopTimer()
}

func BenchmarkUnionScanIndexLookUpDescRead(b *testing.B) {
store := testkit.CreateMockStore(b)

tk := testkit.NewTestKit(b, store)
tk.MustExec("use test")
tk.MustExec(`create table t(a int, b int, c int, primary key(a), index k(b))`)
tk.MustExec(`begin;`)
for i := 0; i < 100; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d, %d)", i, i, i))
}

tk.HasPlan("select * from t use index(k) where b > 50 order by b desc", "IndexLookUp")

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// indexLookUp
tk.MustExec("select * from t use index(k) where b > 50 order by b desc")
}
b.StopTimer()
}

func TestBenchDaily(t *testing.T) {
benchdaily.Run(
executor.BenchmarkReadLastLinesOfHugeLine,
BenchmarkUnionScanRead,
BenchmarkUnionScanIndexReadDescRead,
BenchmarkUnionScanTableReadDescRead,
BenchmarkUnionScanIndexLookUpDescRead,
)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.8-0.20230711075855-e540aa3b9657
github.com/tikv/client-go/v2 v2.0.8-0.20230714023607-2f119351bd5c
github.com/tikv/pd/client v0.0.0-20230613052906-7158cb319935
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -973,8 +973,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.8-0.20230711075855-e540aa3b9657 h1:wRqy8mHs5IocLn4pDvqrwGs4lc3wKhdDXxFyLi8kNbQ=
github.com/tikv/client-go/v2 v2.0.8-0.20230711075855-e540aa3b9657/go.mod h1:4KkKqjJgKlvvWMyNqdnAlYFfV4QjEj1fEb5Hb/FoT88=
github.com/tikv/client-go/v2 v2.0.8-0.20230714023607-2f119351bd5c h1:88oApJuTK/WiBnBw9cV+AgA20pNZVtVIyeh8fTm2ymo=
github.com/tikv/client-go/v2 v2.0.8-0.20230714023607-2f119351bd5c/go.mod h1:4KkKqjJgKlvvWMyNqdnAlYFfV4QjEj1fEb5Hb/FoT88=
github.com/tikv/pd/client v0.0.0-20230613052906-7158cb319935 h1:a5SATBxu/0Z6qNnz4KXDN91gDA06waaYcHM6dkb6lz4=
github.com/tikv/pd/client v0.0.0-20230613052906-7158cb319935/go.mod h1:YmNkj9UT8IjwFov9k3oquH0UgIUHniUaQT3jXKgZYbM=
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M=
Expand Down
6 changes: 3 additions & 3 deletions kv/interface_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (t *mockTxn) Iter(k Key, upperBound Key) (Iterator, error) {
return nil, nil
}

func (t *mockTxn) IterReverse(k Key) (Iterator, error) {
func (t *mockTxn) IterReverse(k Key, lowerBound Key) (Iterator, error) {
return nil, nil
}

Expand Down Expand Up @@ -299,8 +299,8 @@ func (s *mockSnapshot) Iter(k Key, upperBound Key) (Iterator, error) {
return s.store.Iter(k, upperBound)
}

func (s *mockSnapshot) IterReverse(k Key) (Iterator, error) {
return s.store.IterReverse(k)
func (s *mockSnapshot) IterReverse(k Key, lowerBound Key) (Iterator, error) {
return s.store.IterReverse(k, lowerBound)
}

func (s *mockSnapshot) SetOption(opt int, val interface{}) {}
Expand Down
Loading

0 comments on commit f1b9da1

Please sign in to comment.