Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Make use of the upperBound of ticlient's kv_scan interface to ensure no overbound scan will happen (#8081) #8310

Merged
merged 6 commits into from
Nov 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package ddl

import (
"math"
"time"

"github.com/juju/errors"
Expand Down Expand Up @@ -304,7 +305,7 @@ func (d *ddl) addTableColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI
for {
startTime := time.Now()
handles = handles[:0]
err = iterateSnapshotRows(d.store, t, version, seekHandle,
err = iterateSnapshotRows(d.store, t, version, seekHandle, math.MaxInt64, true,
func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) {
handles = append(handles, h)
if len(handles) == defaultBatchCnt {
Expand Down
8 changes: 2 additions & 6 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package ddl

import (
"bytes"
"encoding/hex"
"fmt"
"math"
Expand Down Expand Up @@ -155,7 +154,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
finish := true
dr.keys = dr.keys[:0]
err := kv.RunInNewTxn(dr.d.store, false, func(txn kv.Transaction) error {
iter, err := txn.Iter(oldStartKey, nil)
iter, err := txn.Iter(oldStartKey, r.EndKey)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -165,10 +164,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
if !iter.Valid() {
break
}
finish = bytes.Compare(iter.Key(), r.EndKey) >= 0
if finish {
break
}
finish = false
dr.keys = append(dr.keys, iter.Key().Clone())
newStartKey = iter.Key().Next()

Expand Down
22 changes: 18 additions & 4 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgInde
// taskDone means that the added handle is out of taskRange.endHandle.
taskDone := false
oprStartTime := time.Now()
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.table, txn.StartTS(), taskRange.startHandle,
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.table, txn.StartTS(), taskRange.startHandle, taskRange.endHandle, taskRange.endIncluded,
func(handle int64, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
w.logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in fetchRowColVals", 0)
Expand Down Expand Up @@ -1000,16 +1000,30 @@ func allocateIndexID(tblInfo *model.TableInfo) int64 {
// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error)

func iterateSnapshotRows(store kv.Storage, t table.Table, version uint64, seekHandle int64, fn recordIterFunc) error {
func iterateSnapshotRows(store kv.Storage, t table.Table, version uint64, startHandle int64, endHandle int64, endIncluded bool, fn recordIterFunc) error {
ver := kv.Version{Ver: version}

snap, err := store.GetSnapshot(ver)
snap.SetPriority(kv.PriorityLow)
if err != nil {
return errors.Trace(err)
}
firstKey := t.RecordKey(seekHandle)
it, err := snap.Iter(firstKey, nil)
firstKey := t.RecordKey(startHandle)

// Calculate the exclusive upper bound
var upperBound kv.Key
if endIncluded {
if endHandle == math.MaxInt64 {
upperBound = t.RecordKey(endHandle).PrefixNext()
} else {
// PrefixNext is time costing. Try to avoid it if possible.
upperBound = t.RecordKey(endHandle + 1)
}
} else {
upperBound = t.RecordKey(endHandle)
}

it, err := snap.Iter(firstKey, upperBound)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (d *ddl) getReorgInfo(t *meta.Meta, job *model.Job, tbl table.Table) (*reor
}

// Get the first handle of this table.
err = iterateSnapshotRows(d.store, tbl, ver.Ver, math.MinInt64,
err = iterateSnapshotRows(d.store, tbl, ver.Ver, math.MinInt64, math.MaxInt64, true,
func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) {
info.Handle = h
return false, nil
Expand Down
6 changes: 6 additions & 0 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func (s *Scanner) Next() error {
continue
}
}
current := s.cache[s.idx]
if len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0 {
s.eof = true
s.Close()
return nil
}
if err := s.resolveCurrentLock(bo); err != nil {
s.Close()
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion structure/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (t *TxStructure) HClear(key []byte) error {

func (t *TxStructure) iterateHash(key []byte, fn func(k []byte, v []byte) error) error {
dataPrefix := t.hashDataKeyPrefix(key)
it, err := t.reader.Iter(dataPrefix, nil)
it, err := t.reader.Iter(dataPrefix, dataPrefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down
8 changes: 5 additions & 3 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues

// Drop removes the KV index from store.
func (c *index) Drop(rm kv.RetrieverMutator) error {
it, err := rm.Iter(c.prefix, nil)
it, err := rm.Iter(c.prefix, c.prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -261,7 +261,8 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues
if err != nil {
return nil, false, errors.Trace(err)
}
it, err := r.Iter(key, nil)
upperBound := c.prefix.PrefixNext()
it, err := r.Iter(key, upperBound)
if err != nil {
return nil, false, errors.Trace(err)
}
Expand All @@ -275,7 +276,8 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues

// SeekFirst returns an iterator which points to the first entry of the KV index.
func (c *index) SeekFirst(r kv.Retriever) (iter table.IndexIterator, err error) {
it, err := r.Iter(c.prefix, nil)
upperBound := c.prefix.PrefixNext()
it, err := r.Iter(c.prefix, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,8 @@ func (t *Table) buildIndexForRow(ctx sessionctx.Context, rm kv.RetrieverMutator,
// IterRecords implements table.Table IterRecords interface.
func (t *Table) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols []*table.Column,
fn table.RecordIterFunc) error {
it, err := ctx.Txn().Iter(startKey, nil)
prefix := t.RecordPrefix()
it, err := ctx.Txn().Iter(startKey, prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand All @@ -717,7 +718,6 @@ func (t *Table) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols []*tab
for _, col := range cols {
colMap[col.ID] = &col.FieldType
}
prefix := t.RecordPrefix()
defaultVals := make([]types.Datum, len(cols))
for it.Valid() && it.Key().HasPrefix(prefix) {
// first kv pair is row lock information.
Expand Down Expand Up @@ -831,7 +831,7 @@ func (t *Table) RebaseAutoID(ctx sessionctx.Context, newBase int64, isSetStep bo
// Seek implements table.Table Seek interface.
func (t *Table) Seek(ctx sessionctx.Context, h int64) (int64, bool, error) {
seekKey := tablecodec.EncodeRowKeyWithHandle(t.ID, h)
iter, err := ctx.Txn().Iter(seekKey, nil)
iter, err := ctx.Txn().Iter(seekKey, t.RecordPrefix().PrefixNext())
if !iter.Valid() || !iter.Key().HasPrefix(t.RecordPrefix()) {
// No more records in the table, skip to the end.
return 0, false, nil
Expand Down
6 changes: 4 additions & 2 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,10 @@ func rowWithCols(sessCtx sessionctx.Context, txn kv.Retriever, t table.Table, h

func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Table, startKey kv.Key, cols []*table.Column,
fn table.RecordIterFunc) error {
it, err := retriever.Iter(startKey, nil)
prefix := t.RecordPrefix()
keyUpperBound := prefix.PrefixNext()

it, err := retriever.Iter(startKey, keyUpperBound)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -557,7 +560,6 @@ func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Tab
for _, col := range cols {
colMap[col.ID] = &col.FieldType
}
prefix := t.RecordPrefix()
for it.Valid() && it.Key().HasPrefix(prefix) {
// first kv pair is row lock information.
// TODO: check valid lock
Expand Down
4 changes: 2 additions & 2 deletions util/prefix_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

// ScanMetaWithPrefix scans metadata with the prefix.
func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Key, []byte) bool) error {
iter, err := retriever.Iter(prefix, nil)
iter, err := retriever.Iter(prefix, prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -56,7 +56,7 @@ func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Ke
// DelKeyWithPrefix deletes keys with prefix.
func DelKeyWithPrefix(rm kv.RetrieverMutator, prefix kv.Key) error {
var keys []kv.Key
iter, err := rm.Iter(prefix, nil)
iter, err := rm.Iter(prefix, prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down