Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: move Statement and RecordSet from ast to sqlexec package #7970

Merged
merged 2 commits into from
Oct 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 0 additions & 40 deletions ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (

"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"golang.org/x/net/context"
)

// Node is the basic element of the AST.
Expand Down Expand Up @@ -130,22 +128,6 @@ type ResultField struct {
Referenced bool
}

// RecordSet is an abstract result set interface to help get data from Plan.
type RecordSet interface {
// Fields gets result fields.
Fields() []*ResultField

// Next reads records into chunk.
Next(ctx context.Context, chk *chunk.Chunk) error

// NewChunk creates a new chunk with initial capacity.
NewChunk() *chunk.Chunk

// Close closes the underlying iterator, call Next after Close will
// restart the iteration.
Close() error
}

// ResultSetNode interface has a ResultFields property, represents a Node that returns result set.
// Implementations include SelectStmt, SubqueryExpr, TableSource, TableName and Join.
type ResultSetNode interface {
Expand All @@ -159,28 +141,6 @@ type SensitiveStmtNode interface {
SecureText() string
}

// Statement is an interface for SQL execution.
// NOTE: all Statement implementations must be safe for
// concurrent using by multiple goroutines.
// If the Exec method requires any Execution domain local data,
// they must be held out of the implementing instance.
type Statement interface {
// OriginText gets the origin SQL text.
OriginText() string

// Exec executes SQL and gets a Recordset.
Exec(ctx context.Context) (RecordSet, error)

// IsPrepared returns whether this statement is prepared statement.
IsPrepared() bool

// IsReadOnly returns if the statement is read only. For example: SelectStmt without lock.
IsReadOnly() bool

// RebuildPlan rebuilds the plan of the statement.
RebuildPlan() (schemaVersion int64, err error)
}

// Visitor visits a Node.
type Visitor interface {
// Enter is called before children nodes are visited.
Expand Down
3 changes: 2 additions & 1 deletion ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pkg/errors"
Expand Down Expand Up @@ -229,7 +230,7 @@ func (s *testStateChangeSuite) test(c *C, tableName, alterTableSQL string, testI
type stateCase struct {
session session.Session
rawStmt ast.StmtNode
stmt ast.Statement
stmt sqlexec.Statement
expectedExecErr error
expectedCompileErr error
}
Expand Down
11 changes: 6 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
Expand All @@ -45,7 +46,7 @@ type processinfoSetter interface {
SetProcessInfo(string, time.Time, byte)
}

// recordSet wraps an executor, implements ast.RecordSet interface
// recordSet wraps an executor, implements sqlexec.RecordSet interface
type recordSet struct {
fields []*ast.ResultField
executor Executor
Expand Down Expand Up @@ -122,7 +123,7 @@ func (a *recordSet) Close() error {
return errors.Trace(err)
}

// ExecStmt implements the ast.Statement interface, it builds a planner.Plan to an ast.Statement.
// ExecStmt implements the sqlexec.Statement interface, it builds a planner.Plan to an sqlexec.Statement.
type ExecStmt struct {
// InfoSchema stores a reference to the schema information.
InfoSchema infoschema.InfoSchema
Expand Down Expand Up @@ -183,8 +184,8 @@ func (a *ExecStmt) RebuildPlan() (int64, error) {

// Exec builds an Executor from a plan. If the Executor doesn't return result,
// like the INSERT, UPDATE statements, it executes in this function, if the Executor returns
// result, execution is done after this function returns, in the returned ast.RecordSet Next method.
func (a *ExecStmt) Exec(ctx context.Context) (ast.RecordSet, error) {
// result, execution is done after this function returns, in the returned sqlexec.RecordSet Next method.
func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
a.StartTime = time.Now()
sctx := a.Ctx
if _, ok := a.Plan.(*plannercore.Analyze); ok && sctx.GetSessionVars().InRestrictedSQL {
Expand Down Expand Up @@ -247,7 +248,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (ast.RecordSet, error) {
}, nil
}

func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor) (ast.RecordSet, error) {
func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) {
// Check if "tidb_snapshot" is set for the write executors.
// In history read mode, we can not do write operations.
switch e.(type) {
Expand Down
2 changes: 1 addition & 1 deletion executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (e *DeallocateExec) Next(ctx context.Context, chk *chunk.Chunk) error {
}

// CompileExecutePreparedStmt compiles a session Execute command to a stmt.Statement.
func CompileExecutePreparedStmt(ctx sessionctx.Context, ID uint32, args ...interface{}) (ast.Statement, error) {
func CompileExecutePreparedStmt(ctx sessionctx.Context, ID uint32, args ...interface{}) (sqlexec.Statement, error) {
execStmt := &ast.ExecuteStmt{ExecID: ID}
if err := ResetContextOfStmt(ctx, execStmt); err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion server/driver_tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/auth"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -344,7 +345,7 @@ func (tc *TiDBContext) GetSessionVars() *variable.SessionVars {
}

type tidbResultSet struct {
recordSet ast.RecordSet
recordSet sqlexec.RecordSet
columns []*ColumnInfo
rows []chunk.Row
closed bool
Expand Down
4 changes: 2 additions & 2 deletions session/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"testing"
"time"

"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/sqlexec"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -83,7 +83,7 @@ func prepareJoinBenchData(se Session, colType string, valueFormat string, valueC
mustExecute(se, "commit")
}

func readResult(ctx context.Context, rs ast.RecordSet, count int) {
func readResult(ctx context.Context, rs sqlexec.RecordSet, count int) {
chk := rs.NewChunk()
for count > 0 {
err := rs.Next(ctx, chk)
Expand Down
27 changes: 14 additions & 13 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/pingcap/tidb/util/charset"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-binlog"
"github.com/pkg/errors"
Expand All @@ -63,17 +64,17 @@ import (
// Session context
type Session interface {
sessionctx.Context
Status() uint16 // Flag of current status, such as autocommit.
LastInsertID() uint64 // LastInsertID is the last inserted auto_increment ID.
AffectedRows() uint64 // Affected rows by latest executed stmt.
Execute(context.Context, string) ([]ast.RecordSet, error) // Execute a sql statement.
String() string // String is used to debug.
Status() uint16 // Flag of current status, such as autocommit.
LastInsertID() uint64 // LastInsertID is the last inserted auto_increment ID.
AffectedRows() uint64 // Affected rows by latest executed stmt.
Execute(context.Context, string) ([]sqlexec.RecordSet, error) // Execute a sql statement.
String() string // String is used to debug.
CommitTxn(context.Context) error
RollbackTxn(context.Context) error
// PrepareStmt executes prepare statement in binary protocol.
PrepareStmt(sql string) (stmtID uint32, paramCount int, fields []*ast.ResultField, err error)
// ExecutePreparedStmt executes a prepared statement.
ExecutePreparedStmt(ctx context.Context, stmtID uint32, param ...interface{}) (ast.RecordSet, error)
ExecutePreparedStmt(ctx context.Context, stmtID uint32, param ...interface{}) (sqlexec.RecordSet, error)
DropPreparedStmt(stmtID uint32) error
SetClientCapability(uint32) // Set client capability flags.
SetConnectionID(uint64)
Expand All @@ -97,7 +98,7 @@ var (

type stmtRecord struct {
stmtID uint32
st ast.Statement
st sqlexec.Statement
stmtCtx *stmtctx.StatementContext
params []interface{}
}
Expand All @@ -108,7 +109,7 @@ type StmtHistory struct {
}

// Add appends a stmt to history list.
func (h *StmtHistory) Add(stmtID uint32, st ast.Statement, stmtCtx *stmtctx.StatementContext, params ...interface{}) {
func (h *StmtHistory) Add(stmtID uint32, st sqlexec.Statement, stmtCtx *stmtctx.StatementContext, params ...interface{}) {
s := &stmtRecord{
stmtID: stmtID,
st: st,
Expand Down Expand Up @@ -613,7 +614,7 @@ func createSessionWithDomainFunc(store kv.Storage) func(*domain.Domain) (pools.R
}
}

func drainRecordSet(ctx context.Context, se *session, rs ast.RecordSet) ([]chunk.Row, error) {
func drainRecordSet(ctx context.Context, se *session, rs sqlexec.RecordSet) ([]chunk.Row, error) {
var rows []chunk.Row
chk := rs.NewChunk()
for {
Expand Down Expand Up @@ -728,7 +729,7 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte) {
s.processInfo.Store(pi)
}

func (s *session) executeStatement(ctx context.Context, connID uint64, stmtNode ast.StmtNode, stmt ast.Statement, recordSets []ast.RecordSet) ([]ast.RecordSet, error) {
func (s *session) executeStatement(ctx context.Context, connID uint64, stmtNode ast.StmtNode, stmt sqlexec.Statement, recordSets []sqlexec.RecordSet) ([]sqlexec.RecordSet, error) {
s.SetValue(sessionctx.QueryString, stmt.OriginText())
if _, ok := stmtNode.(ast.DDLNode); ok {
s.SetValue(sessionctx.LastExecuteDDL, true)
Expand Down Expand Up @@ -757,15 +758,15 @@ func (s *session) executeStatement(ctx context.Context, connID uint64, stmtNode
return recordSets, nil
}

func (s *session) Execute(ctx context.Context, sql string) (recordSets []ast.RecordSet, err error) {
func (s *session) Execute(ctx context.Context, sql string) (recordSets []sqlexec.RecordSet, err error) {
if recordSets, err = s.execute(ctx, sql); err != nil {
err = errors.Trace(err)
s.sessionVars.StmtCtx.AppendError(err)
}
return
}

func (s *session) execute(ctx context.Context, sql string) (recordSets []ast.RecordSet, err error) {
func (s *session) execute(ctx context.Context, sql string) (recordSets []sqlexec.RecordSet, err error) {
s.PrepareTxnCtx(ctx)
connID := s.sessionVars.ConnectionID
err = s.loadCommonGlobalVariablesIfNeeded()
Expand Down Expand Up @@ -908,7 +909,7 @@ func checkArgs(args ...interface{}) error {
}

// ExecutePreparedStmt executes a prepared statement.
func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args ...interface{}) (ast.RecordSet, error) {
func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args ...interface{}) (sqlexec.RecordSet, error) {
err := checkArgs(args...)
if err != nil {
return nil, errors.Trace(err)
Expand Down
11 changes: 6 additions & 5 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
Expand Down Expand Up @@ -134,16 +135,16 @@ func Parse(ctx sessionctx.Context, src string) ([]ast.StmtNode, error) {
}

// Compile is safe for concurrent use by multiple goroutines.
func Compile(ctx context.Context, sctx sessionctx.Context, stmtNode ast.StmtNode) (ast.Statement, error) {
func Compile(ctx context.Context, sctx sessionctx.Context, stmtNode ast.StmtNode) (sqlexec.Statement, error) {
compiler := executor.Compiler{Ctx: sctx}
stmt, err := compiler.Compile(ctx, stmtNode)
return stmt, errors.Trace(err)
}

// runStmt executes the ast.Statement and commit or rollback the current transaction.
func runStmt(ctx context.Context, sctx sessionctx.Context, s ast.Statement) (ast.RecordSet, error) {
// runStmt executes the sqlexec.Statement and commit or rollback the current transaction.
func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) (sqlexec.RecordSet, error) {
var err error
var rs ast.RecordSet
var rs sqlexec.RecordSet
se := sctx.(*session)
rs, err = s.Exec(ctx)
// All the history should be added here.
Expand Down Expand Up @@ -194,7 +195,7 @@ func GetHistory(ctx sessionctx.Context) *StmtHistory {
}

// GetRows4Test gets all the rows from a RecordSet, only used for test.
func GetRows4Test(ctx context.Context, sctx sessionctx.Context, rs ast.RecordSet) ([]chunk.Row, error) {
func GetRows4Test(ctx context.Context, sctx sessionctx.Context, rs sqlexec.RecordSet) ([]chunk.Row, error) {
if rs == nil {
return nil, nil
}
Expand Down
6 changes: 3 additions & 3 deletions session/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/auth"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/testleak"
"github.com/pkg/errors"
"golang.org/x/net/context"
Expand Down Expand Up @@ -195,7 +195,7 @@ func removeStore(c *C, dbPath string) {
os.RemoveAll(dbPath)
}

func exec(se Session, sql string, args ...interface{}) (ast.RecordSet, error) {
func exec(se Session, sql string, args ...interface{}) (sqlexec.RecordSet, error) {
ctx := context.Background()
if len(args) == 0 {
rs, err := se.Execute(ctx, sql)
Expand All @@ -215,7 +215,7 @@ func exec(se Session, sql string, args ...interface{}) (ast.RecordSet, error) {
return rs, nil
}

func mustExecSQL(c *C, se Session, sql string, args ...interface{}) ast.RecordSet {
func mustExecSQL(c *C, se Session, sql string, args ...interface{}) sqlexec.RecordSet {
rs, err := exec(se, sql, args...)
c.Assert(err, IsNil)
return rs
Expand Down
3 changes: 1 addition & 2 deletions statistics/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package statistics
import (
"fmt"

"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
Expand Down Expand Up @@ -119,7 +118,7 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfo *model.ColumnInfo)
// If we didn't update anything by last SQL, it means the stats of this table does not exist.
if h.mu.ctx.GetSessionVars().StmtCtx.AffectedRows() > 0 {
// By this step we can get the count of this table, then we can sure the count and repeats of bucket.
var rs []ast.RecordSet
var rs []sqlexec.RecordSet
rs, err = exec.Execute(ctx, fmt.Sprintf("select count from mysql.stats_meta where table_id = %d", physicalID))
if len(rs) > 0 {
defer terror.Call(rs[0].Close)
Expand Down
3 changes: 2 additions & 1 deletion statistics/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tipb/go-tipb"
"github.com/pkg/errors"
"golang.org/x/net/context"
Expand Down Expand Up @@ -132,7 +133,7 @@ func (c *SampleCollector) collect(sc *stmtctx.StatementContext, d types.Datum) e
// Also, if primary key is handle, it will directly build histogram for it.
type SampleBuilder struct {
Sc *stmtctx.StatementContext
RecordSet ast.RecordSet
RecordSet sqlexec.RecordSet
ColLen int // ColLen is the number of columns need to be sampled.
PkBuilder *SortedBuilder
MaxBucketSize int64
Expand Down
4 changes: 2 additions & 2 deletions statistics/sample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/sqlexec"
)

var _ = Suite(&testSampleSuite{})

type testSampleSuite struct {
count int
rs ast.RecordSet
rs sqlexec.RecordSet
}

func (s *testSampleSuite) SetUpSuite(c *C) {
Expand Down
Loading