Skip to content

Commit

Permalink
*: Make use of the upperBound of ticlient's kv_scan interface to ensu…
Browse files Browse the repository at this point in the history
…re no overbound scan will happen (#8081)
  • Loading branch information
MyonKeminta authored and zz-jason committed Nov 9, 2018
1 parent 29f14d4 commit 09c6bff
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 22 deletions.
8 changes: 2 additions & 6 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package ddl

import (
"bytes"
"encoding/hex"
"fmt"
"math"
Expand Down Expand Up @@ -154,7 +153,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
finish := true
dr.keys = dr.keys[:0]
err := kv.RunInNewTxn(dr.store, false, func(txn kv.Transaction) error {
iter, err := txn.Iter(oldStartKey, nil)
iter, err := txn.Iter(oldStartKey, r.EndKey)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -164,10 +163,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
if !iter.Valid() {
break
}
finish = bytes.Compare(iter.Key(), r.EndKey) >= 0
if finish {
break
}
finish = false
dr.keys = append(dr.keys, iter.Key().Clone())
newStartKey = iter.Key().Next()

Expand Down
22 changes: 18 additions & 4 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgInde
// taskDone means that the added handle is out of taskRange.endHandle.
taskDone := false
oprStartTime := startTime
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startHandle,
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startHandle, taskRange.endHandle, taskRange.endIncluded,
func(handle int64, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
w.logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in fetchRowColVals", 0)
Expand Down Expand Up @@ -1207,16 +1207,30 @@ func allocateIndexID(tblInfo *model.TableInfo) int64 {
// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error)

func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version uint64, seekHandle int64, fn recordIterFunc) error {
func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version uint64, startHandle int64, endHandle int64, endIncluded bool, fn recordIterFunc) error {
ver := kv.Version{Ver: version}

snap, err := store.GetSnapshot(ver)
snap.SetPriority(priority)
if err != nil {
return errors.Trace(err)
}
firstKey := t.RecordKey(seekHandle)
it, err := snap.Iter(firstKey, nil)
firstKey := t.RecordKey(startHandle)

// Calculate the exclusive upper bound
var upperBound kv.Key
if endIncluded {
if endHandle == math.MaxInt64 {
upperBound = t.RecordKey(endHandle).PrefixNext()
} else {
// PrefixNext is time costing. Try to avoid it if possible.
upperBound = t.RecordKey(endHandle + 1)
}
} else {
upperBound = t.RecordKey(endHandle)
}

it, err := snap.Iter(firstKey, upperBound)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func getTableRange(d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, prior
startHandle = math.MinInt64
endHandle = math.MaxInt64
// Get the start handle of this partition.
err = iterateSnapshotRows(d.store, priority, tbl, snapshotVer, math.MinInt64,
err = iterateSnapshotRows(d.store, priority, tbl, snapshotVer, math.MinInt64, math.MaxInt64, true,
func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) {
startHandle = h
return false, nil
Expand Down
5 changes: 5 additions & 0 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func (s *Scanner) Next() error {
}

current := s.cache[s.idx]
if len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0 {
s.eof = true
s.Close()
return nil
}
// Try to resolve the lock
if current.GetError() != nil {
// 'current' would be modified if the lock being resolved
Expand Down
2 changes: 1 addition & 1 deletion structure/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (t *TxStructure) HClear(key []byte) error {

func (t *TxStructure) iterateHash(key []byte, fn func(k []byte, v []byte) error) error {
dataPrefix := t.hashDataKeyPrefix(key)
it, err := t.reader.Iter(dataPrefix, nil)
it, err := t.reader.Iter(dataPrefix, dataPrefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down
8 changes: 5 additions & 3 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues

// Drop removes the KV index from store.
func (c *index) Drop(rm kv.RetrieverMutator) error {
it, err := rm.Iter(c.prefix, nil)
it, err := rm.Iter(c.prefix, c.prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -270,7 +270,8 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues
return nil, false, errors.Trace(err)
}

it, err := r.Iter(key, nil)
upperBound := c.prefix.PrefixNext()
it, err := r.Iter(key, upperBound)
if err != nil {
return nil, false, errors.Trace(err)
}
Expand All @@ -284,7 +285,8 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues

// SeekFirst returns an iterator which points to the first entry of the KV index.
func (c *index) SeekFirst(r kv.Retriever) (iter table.IndexIterator, err error) {
it, err := r.Iter(c.prefix, nil)
upperBound := c.prefix.PrefixNext()
it, err := r.Iter(c.prefix, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,8 @@ func (t *tableCommon) buildIndexForRow(ctx sessionctx.Context, rm kv.RetrieverMu
// IterRecords implements table.Table IterRecords interface.
func (t *tableCommon) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols []*table.Column,
fn table.RecordIterFunc) error {
it, err := ctx.Txn().Iter(startKey, nil)
prefix := t.RecordPrefix()
it, err := ctx.Txn().Iter(startKey, prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand All @@ -798,7 +799,6 @@ func (t *tableCommon) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols
for _, col := range cols {
colMap[col.ID] = &col.FieldType
}
prefix := t.RecordPrefix()
defaultVals := make([]types.Datum, len(cols))
for it.Valid() && it.Key().HasPrefix(prefix) {
// first kv pair is row lock information.
Expand Down Expand Up @@ -912,7 +912,7 @@ func (t *tableCommon) RebaseAutoID(ctx sessionctx.Context, newBase int64, isSetS
// Seek implements table.Table Seek interface.
func (t *tableCommon) Seek(ctx sessionctx.Context, h int64) (int64, bool, error) {
seekKey := tablecodec.EncodeRowKeyWithHandle(t.physicalTableID, h)
iter, err := ctx.Txn().Iter(seekKey, nil)
iter, err := ctx.Txn().Iter(seekKey, t.RecordPrefix().PrefixNext())
if !iter.Valid() || !iter.Key().HasPrefix(t.RecordPrefix()) {
// No more records in the table, skip to the end.
return 0, false, nil
Expand Down
6 changes: 4 additions & 2 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,10 @@ func rowWithCols(sessCtx sessionctx.Context, txn kv.Retriever, t table.Table, h
// genExprs use to calculate generated column value.
func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Table, startKey kv.Key, cols []*table.Column,
fn table.RecordIterFunc, genExprs map[model.TableColumnID]expression.Expression) error {
it, err := retriever.Iter(startKey, nil)
prefix := t.RecordPrefix()
keyUpperBound := prefix.PrefixNext()

it, err := retriever.Iter(startKey, keyUpperBound)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -651,7 +654,6 @@ func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Tab

log.Debugf("startKey:%q, key:%q, value:%q", startKey, it.Key(), it.Value())
rowDecoder := makeRowDecoder(t, cols, genExprs)
prefix := t.RecordPrefix()
for it.Valid() && it.Key().HasPrefix(prefix) {
// first kv pair is row lock information.
// TODO: check valid lock
Expand Down
4 changes: 2 additions & 2 deletions util/prefix_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

// ScanMetaWithPrefix scans metadata with the prefix.
func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Key, []byte) bool) error {
iter, err := retriever.Iter(prefix, nil)
iter, err := retriever.Iter(prefix, prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -56,7 +56,7 @@ func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Ke
// DelKeyWithPrefix deletes keys with prefix.
func DelKeyWithPrefix(rm kv.RetrieverMutator, prefix kv.Key) error {
var keys []kv.Key
iter, err := rm.Iter(prefix, nil)
iter, err := rm.Iter(prefix, prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down

0 comments on commit 09c6bff

Please sign in to comment.