Skip to content

Commit

Permalink
*: fast path point select (#6937)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Jul 30, 2018
1 parent 2fee494 commit 343cb84
Show file tree
Hide file tree
Showing 16 changed files with 704 additions and 39 deletions.
2 changes: 2 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p plan.Plan
case *plan.PhysicalTableReader:
tableScan := v.TablePlans[0].(*plan.PhysicalTableScan)
return len(tableScan.Ranges) == 1 && tableScan.Ranges[0].IsPoint(ctx.GetSessionVars().StmtCtx)
case *plan.PointGetPlan:
return true
default:
return false
}
Expand Down
10 changes: 6 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
return b.buildExecute(v)
case *plan.Explain:
return b.buildExplain(v)
case *plan.PointGetPlan:
return b.buildPointGet(v)
case *plan.Insert:
return b.buildInsert(v)
case *plan.LoadData:
Expand Down Expand Up @@ -1030,7 +1032,7 @@ func (b *executorBuilder) buildProjection(v *plan.PhysicalProjection) Executor {
// If the calculation row count for this Projection operator is smaller
// than a Chunk size, we turn back to the un-parallel Projection
// implementation to reduce the goroutine overhead.
if v.StatsInfo().Count() < int64(b.ctx.GetSessionVars().MaxChunkSize) {
if int64(v.StatsCount()) < int64(b.ctx.GetSessionVars().MaxChunkSize) {
e.numWorkers = 0
}
return e
Expand Down Expand Up @@ -1483,7 +1485,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plan.PhysicalTableReader) (*
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc)
} else {
e.feedback = statistics.NewQueryFeedback(ts.Table.ID, ts.Hist, ts.StatsInfo().Count(), ts.Desc)
e.feedback = statistics.NewQueryFeedback(ts.Table.ID, ts.Hist, int64(ts.StatsCount()), ts.Desc)
}
collect := e.feedback.CollectFeedback(len(ts.Ranges))
e.dagPB.CollectRangeCounts = &collect
Expand Down Expand Up @@ -1540,7 +1542,7 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plan.PhysicalIndexReader) (*
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc)
} else {
e.feedback = statistics.NewQueryFeedback(is.Table.ID, is.Hist, is.StatsInfo().Count(), is.Desc)
e.feedback = statistics.NewQueryFeedback(is.Table.ID, is.Hist, int64(is.StatsCount()), is.Desc)
}
collect := e.feedback.CollectFeedback(len(is.Ranges))
e.dagPB.CollectRangeCounts = &collect
Expand Down Expand Up @@ -1609,7 +1611,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plan.PhysicalIndexLook
if containsLimit(indexReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc)
} else {
e.feedback = statistics.NewQueryFeedback(is.Table.ID, is.Hist, is.StatsInfo().Count(), is.Desc)
e.feedback = statistics.NewQueryFeedback(is.Table.ID, is.Hist, int64(is.StatsCount()), is.Desc)
}
// do not collect the feedback for table request.
collectTable := false
Expand Down
2 changes: 1 addition & 1 deletion executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func isExpensiveQuery(p plan.Plan) bool {

func isPhysicalPlanExpensive(p plan.PhysicalPlan) bool {
expensiveRowThreshold := int64(config.GetGlobalConfig().Log.ExpensiveThreshold)
if p.StatsInfo().Count() > expensiveRowThreshold {
if int64(p.StatsCount()) > expensiveRowThreshold {
return true
}

Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1659,7 +1659,7 @@ func (s *testSuite) TestAdapterStatement(c *C) {
c.Check(stmt.OriginText(), Equals, "create table test.t (a int)")
}

func (s *testSuite) TestPointGet(c *C) {
func (s *testSuite) TestIsPointGet(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use mysql")
ctx := tk.Se.(sessionctx.Context)
Expand Down
200 changes: 200 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"github.com/juju/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"golang.org/x/net/context"
)

func (b *executorBuilder) buildPointGet(p *plan.PointGetPlan) Executor {
return &PointGetExecutor{
ctx: b.ctx,
schema: p.Schema(),
tblInfo: p.TblInfo,
idxInfo: p.IndexInfo,
idxVals: p.IndexValues,
handle: p.Handle,
startTS: b.getStartTS(),
}
}

// PointGetExecutor executes point select query.
type PointGetExecutor struct {
ctx sessionctx.Context
schema *expression.Schema
tps []*types.FieldType
tblInfo *model.TableInfo
handle int64
idxInfo *model.IndexInfo
idxVals []types.Datum
startTS uint64
snapshot kv.Snapshot
done bool
}

// Open implements the Executor interface.
func (e *PointGetExecutor) Open(context.Context) error {
return nil
}

// Close implements the Executor interface.
func (e *PointGetExecutor) Close() error {
return nil
}

// Next implements the Executor interface.
func (e *PointGetExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
}
e.done = true
var err error
e.snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.startTS})
if err != nil {
return errors.Trace(err)
}
if e.idxInfo != nil {
idxKey, err1 := e.encodeIndexKey()
if err1 != nil {
return errors.Trace(err1)
}
handleVal, err1 := e.get(idxKey)
if err1 != nil && !kv.ErrNotExist.Equal(err1) {
return errors.Trace(err1)
}
if len(handleVal) == 0 {
return nil
}
e.handle, err1 = tables.DecodeHandle(handleVal)
if err1 != nil {
return errors.Trace(err1)
}
}
key := tablecodec.EncodeRowKeyWithHandle(e.tblInfo.ID, e.handle)
val, err := e.get(key)
if err != nil && !kv.ErrNotExist.Equal(err) {
return errors.Trace(err)
}
if len(val) == 0 {
if e.idxInfo != nil {
return kv.ErrNotExist.Gen("inconsistent extra index %s, handle %d not found in table",
e.idxInfo.Name.O, e.handle)
}
return nil
}
return e.decodeRowValToChunk(val, chk)
}

func (e *PointGetExecutor) encodeIndexKey() ([]byte, error) {
for i := range e.idxVals {
colInfo := e.tblInfo.Columns[e.idxInfo.Columns[i].Offset]
casted, err := table.CastValue(e.ctx, e.idxVals[i], colInfo)
if err != nil {
return nil, errors.Trace(err)
}
e.idxVals[i] = casted
}
encodedIdxVals, err := codec.EncodeKey(e.ctx.GetSessionVars().StmtCtx, nil, e.idxVals...)
if err != nil {
return nil, errors.Trace(err)
}
return tablecodec.EncodeIndexSeekKey(e.tblInfo.ID, e.idxInfo.ID, encodedIdxVals), nil
}

func (e *PointGetExecutor) get(key kv.Key) (val []byte, err error) {
txn := e.ctx.Txn()
if txn != nil && txn.Valid() && !txn.IsReadOnly() {
return txn.Get(key)
}
return e.snapshot.Get(key)
}

func (e *PointGetExecutor) decodeRowValToChunk(rowVal []byte, chk *chunk.Chunk) error {
colIDs := make(map[int64]int, e.schema.Len())
for i, col := range e.schema.Columns {
colIDs[col.ID] = i
}
colVals, err := tablecodec.CutRowNew(rowVal, colIDs)
if err != nil {
return errors.Trace(err)
}
decoder := codec.NewDecoder(chk, e.ctx.GetSessionVars().Location())
for id, offset := range colIDs {
if e.tblInfo.PKIsHandle && mysql.HasPriKeyFlag(e.schema.Columns[offset].RetType.Flag) {
chk.AppendInt64(offset, e.handle)
continue
}
if id == model.ExtraHandleID {
chk.AppendInt64(offset, e.handle)
continue
}
colVal := colVals[offset]
if len(colVal) == 0 {
colInfo := getColInfoByID(e.tblInfo, id)
d, err1 := table.GetColOriginDefaultValue(e.ctx, colInfo)
if err1 != nil {
return errors.Trace(err1)
}
chk.AppendDatum(offset, &d)
continue
}
_, err = decoder.DecodeOne(colVals[offset], offset, e.schema.Columns[offset].RetType)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
for _, col := range tbl.Columns {
if col.ID == colID {
return col
}
}
return nil
}

// Schema implements the Executor interface.
func (e *PointGetExecutor) Schema() *expression.Schema {
return e.schema
}

func (e *PointGetExecutor) retTypes() []*types.FieldType {
if e.tps == nil {
e.tps = make([]*types.FieldType, e.schema.Len())
for i := range e.schema.Columns {
e.tps[i] = e.schema.Columns[i].RetType
}
}
return e.tps
}

func (e *PointGetExecutor) newChunk() *chunk.Chunk {
return chunk.NewChunkWithCapacity(e.retTypes(), 1)
}
38 changes: 38 additions & 0 deletions executor/point_get_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test

import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/testkit"
)

func (s *testSuite) TestPointGet(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table point (id int primary key, c int, d varchar(10), unique c_d (c, d))")
tk.MustExec("insert point values (1, 1, 'a')")
tk.MustExec("insert point values (2, 2, 'b')")
tk.MustQuery("select * from point where id = 1 and c = 0").Check(testkit.Rows())
tk.MustQuery("select * from point where id < 0 and c = 1 and d = 'b'").Check(testkit.Rows())
result, err := tk.Exec("select id as ident from point where id = 1")
c.Assert(err, IsNil)
fields := result.Fields()
c.Assert(fields[0].ColumnAsName.O, Equals, "ident")

tk.MustExec("CREATE TABLE tab3(pk INTEGER PRIMARY KEY, col0 INTEGER, col1 FLOAT, col2 TEXT, col3 INTEGER, col4 FLOAT, col5 TEXT);")
tk.MustExec("CREATE UNIQUE INDEX idx_tab3_0 ON tab3 (col4);")
tk.MustExec("INSERT INTO tab3 VALUES(0,854,111.96,'mguub',711,966.36,'snwlo');")
tk.MustQuery("SELECT ALL * FROM tab3 WHERE col4 = 85;").Check(testkit.Rows())
}
7 changes: 6 additions & 1 deletion expression/simple_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ func ParseSimpleExpr(ctx sessionctx.Context, exprStr string, tableInfo *model.Ta
return nil, errors.Trace(err)
}
expr := stmts[0].(*ast.SelectStmt).Fields.Fields[0].Expr
rewriter := &simpleRewriter{tbl: tableInfo, ctx: ctx}
return RewriteSimpleExpr(ctx, tableInfo, expr)
}

// RewriteSimpleExpr rewrites simple ast.ExprNode to expression.Expression.
func RewriteSimpleExpr(ctx sessionctx.Context, tbl *model.TableInfo, expr ast.ExprNode) (Expression, error) {
rewriter := &simpleRewriter{tbl: tbl, ctx: ctx}
expr.Accept(rewriter)
if rewriter.err != nil {
return nil, errors.Trace(rewriter.err)
Expand Down
2 changes: 1 addition & 1 deletion plan/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (e *Explain) explainPlanInRowFormat(p PhysicalPlan, taskType, indent string
// operator id, task type, operator info, and the estemated row count.
func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent string, isLastChild bool) {
operatorInfo := p.ExplainInfo()
count := string(strconv.AppendFloat([]byte{}, p.StatsInfo().count, 'f', 2, 64))
count := string(strconv.AppendFloat([]byte{}, p.statsInfo().count, 'f', 2, 64))
row := []string{e.prettyIdentifier(p.ExplainID(), indent, isLastChild), count, taskType, operatorInfo}
e.Rows = append(e.Rows, row)
}
Expand Down
4 changes: 4 additions & 0 deletions plan/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type logicalOptRule interface {
// Optimize does optimization and creates a Plan.
// The node must be prepared first.
func Optimize(ctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema) (Plan, error) {
fp := tryFastPlan(ctx, node)
if fp != nil {
return fp, nil
}
ctx.GetSessionVars().PlanID = 0
ctx.GetSessionVars().PlanColumnID = 0
builder := &planBuilder{
Expand Down
13 changes: 8 additions & 5 deletions plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ type Plan interface {

context() sessionctx.Context

// StatsInfo will return the statsInfo for this plan.
StatsInfo() *statsInfo
// statsInfo will return the statsInfo for this plan.
statsInfo() *statsInfo
}

// taskType is the type of execution task.
Expand Down Expand Up @@ -91,7 +91,7 @@ func (p *requiredProp) enforceProperty(tsk task, ctx sessionctx.Context) task {
}
tsk = finishCopTask(ctx, tsk)
sortReqProp := &requiredProp{taskTp: rootTaskType, cols: p.cols, expectedCnt: math.MaxFloat64}
sort := PhysicalSort{ByItems: make([]*ByItems, 0, len(p.cols))}.init(ctx, tsk.plan().StatsInfo(), sortReqProp)
sort := PhysicalSort{ByItems: make([]*ByItems, 0, len(p.cols))}.init(ctx, tsk.plan().statsInfo(), sortReqProp)
for _, col := range p.cols {
sort.ByItems = append(sort.ByItems, &ByItems{col, p.desc})
}
Expand Down Expand Up @@ -229,6 +229,9 @@ type PhysicalPlan interface {
// getChildReqProps gets the required property by child index.
getChildReqProps(idx int) *requiredProp

// StatsCount returns the count of statsInfo for this plan.
StatsCount() float64

// Get all the children.
Children() []PhysicalPlan

Expand Down Expand Up @@ -349,8 +352,8 @@ func (p *basePlan) ID() int {
return p.id
}

// StatsInfo implements the Plan interface.
func (p *basePlan) StatsInfo() *statsInfo {
// statsInfo implements the Plan interface.
func (p *basePlan) statsInfo() *statsInfo {
return p.stats
}

Expand Down
Loading

0 comments on commit 343cb84

Please sign in to comment.