Skip to content

Commit

Permalink
Merge branch 'master' into range-limit-upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Oct 27, 2022
2 parents 978f4dc + 18698d1 commit 99694ad
Show file tree
Hide file tree
Showing 15 changed files with 110 additions and 16 deletions.
1 change: 1 addition & 0 deletions br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ go_test(
flaky = True,
deps = [
"//br/pkg/conn",
"//br/pkg/errors",
"//br/pkg/metautil",
"//br/pkg/restore",
"//br/pkg/storage",
Expand Down
6 changes: 5 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,10 +509,14 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
}
maxExecutionTime := getMaxExecutionTime(sctx)
// Update processinfo, ShowProcess() will use it.
pi.SetProcessInfo(sql, time.Now(), cmd, maxExecutionTime)
if a.Ctx.GetSessionVars().StmtCtx.StmtType == "" {
a.Ctx.GetSessionVars().StmtCtx.StmtType = ast.GetStmtLabel(a.StmtNode)
}
// Since maxExecutionTime is used only for query statement, here we limit it affect scope.
if !a.IsReadOnly(a.Ctx.GetSessionVars()) {
maxExecutionTime = 0
}
pi.SetProcessInfo(sql, time.Now(), cmd, maxExecutionTime)
}

failpoint.Inject("mockDelayInnerSessionExecute", func() {
Expand Down
4 changes: 3 additions & 1 deletion expression/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package expression

import (
"context"
"strings"
"sync"

Expand Down Expand Up @@ -104,7 +105,7 @@ func (c *extensionFuncClass) getFunction(ctx sessionctx.Context, args []Expressi
return nil, err
}
bf.tp.SetFlen(c.flen)
sig := &extensionFuncSig{bf, c.funcDef}
sig := &extensionFuncSig{context.TODO(), bf, c.funcDef}
return sig, nil
}

Expand Down Expand Up @@ -137,6 +138,7 @@ func (c *extensionFuncClass) checkPrivileges(ctx sessionctx.Context) error {
var _ extension.FunctionContext = &extensionFuncSig{}

type extensionFuncSig struct {
context.Context
baseBuiltinFunc
extension.FunctionDef
}
Expand Down
3 changes: 3 additions & 0 deletions extension/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"function.go",
"manifest.go",
"registry.go",
"session.go",
"util.go",
],
importpath = "github.com/pingcap/tidb/extension",
Expand All @@ -15,7 +16,9 @@ go_library(
"//sessionctx/variable",
"//types",
"//util/chunk",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@io_etcd_go_etcd_client_v3//:client",
],
)

Expand Down
1 change: 1 addition & 0 deletions extension/extensionimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ go_library(
"//util/chunk",
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@io_etcd_go_etcd_client_v3//:client",
],
)
19 changes: 18 additions & 1 deletion extension/extensionimpl/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
clientv3 "go.etcd.io/etcd/client/v3"
)

type bootstrapContext struct {
context.Context

sqlExecutor sqlexec.SQLExecutor
etcdCli *clientv3.Client
sessionPool extension.SessionPool
}

func (c *bootstrapContext) ExecuteSQL(ctx context.Context, sql string) (rows []chunk.Row, err error) {
Expand All @@ -51,6 +55,14 @@ func (c *bootstrapContext) ExecuteSQL(ctx context.Context, sql string) (rows []c
return sqlexec.DrainRecordSet(ctx, rs, 8)
}

func (c *bootstrapContext) EtcdClient() *clientv3.Client {
return c.etcdCli
}

func (c *bootstrapContext) SessionPool() extension.SessionPool {
return c.sessionPool
}

// Bootstrap bootstrap all extensions
func Bootstrap(ctx context.Context, do *domain.Domain) error {
extensions, err := extension.GetExtensions()
Expand All @@ -74,5 +86,10 @@ func Bootstrap(ctx context.Context, do *domain.Domain) error {
return errors.Errorf("type '%T' cannot be casted to 'sqlexec.SQLExecutor'", sctx)
}

return extensions.Bootstrap(&bootstrapContext{ctx, executor})
return extensions.Bootstrap(&bootstrapContext{
Context: ctx,
sessionPool: pool,
sqlExecutor: executor,
etcdCli: do.GetEtcdClient(),
})
}
3 changes: 3 additions & 0 deletions extension/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
package extension

import (
"context"

"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
)

// FunctionContext is a interface to provide context to the custom function
type FunctionContext interface {
context.Context
EvalArgs(row chunk.Row) ([]types.Datum, error)
}

Expand Down
12 changes: 12 additions & 0 deletions extension/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@ package extension
import (
"context"

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
clientv3 "go.etcd.io/etcd/client/v3"
)

// SessionPool is the pool for session
type SessionPool interface {
Get() (pools.Resource, error)
Put(pools.Resource)
}

// Option represents an option to initialize an extension
type Option func(m *Manifest)

Expand Down Expand Up @@ -66,6 +74,10 @@ type BootstrapContext interface {
context.Context
// ExecuteSQL is used to execute a sql
ExecuteSQL(ctx context.Context, sql string) ([]chunk.Row, error)
// EtcdClient returns the etcd client
EtcdClient() *clientv3.Client
// SessionPool returns the session pool of domain
SessionPool() SessionPool
}

// WithBootstrap specifies the bootstrap func of an extension
Expand Down
66 changes: 53 additions & 13 deletions planner/core/plan_cost_ver2.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tipb/go-tipb"
)

func getPlanCost(p PhysicalPlan, taskType property.TaskType, option *PlanCostOption) (float64, error) {
Expand Down Expand Up @@ -329,9 +331,7 @@ func (p *PhysicalSort) getPlanCostVer2(taskType property.TaskType, option *PlanC
memQuota > 0 && // mem-quota is set
rowSize*rows > float64(memQuota) // exceed the mem-quota

sortCPUCost := newCostVer2(option, cpuFactor,
rows*math.Log2(rows)*float64(len(p.ByItems))*cpuFactor.Value,
"sortCPU(%v*log2(%v)*%v*%v)", rows, rows, len(p.ByItems), cpuFactor)
sortCPUCost := orderCostVer2(option, rows, rows, p.ByItems, cpuFactor)

var sortMemCost, sortDiskCost costVer2
if !spill {
Expand Down Expand Up @@ -373,9 +373,7 @@ func (p *PhysicalTopN) getPlanCostVer2(taskType property.TaskType, option *PlanC
cpuFactor := getTaskCPUFactorVer2(p, taskType)
memFactor := getTaskMemFactorVer2(p, taskType)

topNCPUCost := newCostVer2(option, cpuFactor,
rows*math.Log2(N)*float64(len(p.ByItems))*cpuFactor.Value,
"topCPU(%v*%v*%v*%v)", rows, math.Log2(N), len(p.ByItems), cpuFactor)
topNCPUCost := orderCostVer2(option, rows, N, p.ByItems, cpuFactor)
topNMemCost := newCostVer2(option, memFactor,
N*rowSize*memFactor.Value,
"topMem(%v*%v*%v)", N, rowSize, memFactor)
Expand Down Expand Up @@ -491,7 +489,8 @@ func (p *PhysicalHashJoin) getPlanCostVer2(taskType property.TaskType, option *P
buildRows := getCardinality(build, option.CostFlag)
probeRows := getCardinality(probe, option.CostFlag)
buildRowSize := getAvgRowSize(build.Stats(), build.Schema())
concurrency := float64(p.Concurrency)
tidbConcurrency := float64(p.Concurrency)
mppConcurrency := float64(3) // TODO: remove this empirical value
cpuFactor := getTaskCPUFactorVer2(p, taskType)
memFactor := getTaskMemFactorVer2(p, taskType)

Expand All @@ -510,8 +509,13 @@ func (p *PhysicalHashJoin) getPlanCostVer2(taskType property.TaskType, option *P
return zeroCostVer2, err
}

p.planCostVer2 = sumCostVer2(buildChildCost, probeChildCost, buildHashCost, buildFilterCost,
divCostVer2(sumCostVer2(probeFilterCost, probeHashCost), concurrency))
if taskType == property.MppTaskType { // BCast or Shuffle Join, use mppConcurrency
p.planCostVer2 = sumCostVer2(buildChildCost, probeChildCost,
divCostVer2(sumCostVer2(buildHashCost, buildFilterCost, probeHashCost, probeFilterCost), mppConcurrency))
} else { // TiDB HashJoin
p.planCostVer2 = sumCostVer2(buildChildCost, probeChildCost, buildHashCost, buildFilterCost,
divCostVer2(sumCostVer2(probeFilterCost, probeHashCost), tidbConcurrency))
}
p.planCostInit = true
return p.planCostVer2.label(p), nil
}
Expand Down Expand Up @@ -630,8 +634,16 @@ func (p *PhysicalExchangeReceiver) getPlanCostVer2(taskType property.TaskType, o
rows := getCardinality(p, option.CostFlag)
rowSize := getAvgRowSize(p.stats, p.Schema())
netFactor := getTaskNetFactorVer2(p, taskType)
isBCast := false
if sender, ok := p.children[0].(*PhysicalExchangeSender); ok {
isBCast = sender.ExchangeType == tipb.ExchangeType_Broadcast
}
numNode := float64(3) // TODO: remove this empirical value

netCost := netCostVer2(option, rows, rowSize, netFactor)
if isBCast {
netCost = mulCostVer2(netCost, numNode)
}
childCost, err := p.children[0].getPlanCostVer2(taskType, option)
if err != nil {
return zeroCostVer2, err
Expand Down Expand Up @@ -695,9 +707,10 @@ func netCostVer2(option *PlanCostOption, rows, rowSize float64, netFactor costVe
}

func filterCostVer2(option *PlanCostOption, rows float64, filters []expression.Expression, cpuFactor costVer2Factor) costVer2 {
numFuncs := numFunctions(filters)
return newCostVer2(option, cpuFactor,
rows*float64(len(filters))*cpuFactor.Value,
"cpu(%v*filters(%v)*%v)", rows, len(filters), cpuFactor)
rows*float64(numFuncs)*cpuFactor.Value,
"cpu(%v*filters(%v)*%v)", rows, numFuncs, cpuFactor)
}

func aggCostVer2(option *PlanCostOption, rows float64, aggFuncs []*aggregation.AggFuncDesc, cpuFactor costVer2Factor) costVer2 {
Expand All @@ -708,9 +721,36 @@ func aggCostVer2(option *PlanCostOption, rows float64, aggFuncs []*aggregation.A
}

func groupCostVer2(option *PlanCostOption, rows float64, groupItems []expression.Expression, cpuFactor costVer2Factor) costVer2 {
numFuncs := numFunctions(groupItems)
return newCostVer2(option, cpuFactor,
rows*float64(len(groupItems))*cpuFactor.Value,
"group(%v*cols(%v)*%v)", rows, len(groupItems), cpuFactor)
rows*float64(numFuncs)*cpuFactor.Value,
"group(%v*cols(%v)*%v)", rows, numFuncs, cpuFactor)
}

func numFunctions(exprs []expression.Expression) int {
num := 0
for _, e := range exprs {
if _, ok := e.(*expression.ScalarFunction); ok {
num++
}
}
return num
}

func orderCostVer2(option *PlanCostOption, rows, N float64, byItems []*util.ByItems, cpuFactor costVer2Factor) costVer2 {
numFuncs := 0
for _, byItem := range byItems {
if _, ok := byItem.Expr.(*expression.ScalarFunction); ok {
numFuncs++
}
}
exprCost := newCostVer2(option, cpuFactor,
rows*float64(numFuncs)*cpuFactor.Value,
"exprCPU(%v*%v*%v)", rows, numFuncs, cpuFactor)
orderCost := newCostVer2(option, cpuFactor,
rows*math.Log2(N)*cpuFactor.Value,
"orderCPU(%v*log(%v)*%v)", rows, N, cpuFactor)
return sumCostVer2(exprCost, orderCost)
}

func hashBuildCostVer2(option *PlanCostOption, buildRows, buildRowSize float64, keys []expression.Expression, cpuFactor, memFactor costVer2Factor) costVer2 {
Expand Down
2 changes: 2 additions & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//errno",
"//executor",
"//expression",
"//extension",
"//infoschema",
"//kv",
"//meta",
Expand Down Expand Up @@ -151,6 +152,7 @@ go_test(
"//errno",
"//executor",
"//expression",
"//extension",
"//infoschema",
"//kv",
"//meta",
Expand Down
5 changes: 5 additions & 0 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,11 @@ func TestConnExecutionTimeout(t *testing.T) {

err = cc.handleQuery(context.Background(), "select /*+ MAX_EXECUTION_TIME(100)*/ * FROM testTable2 WHERE SLEEP(1);")
require.NoError(t, err)

tk.MustExec("set @@max_execution_time = 500;")

err = cc.handleQuery(context.Background(), "alter table testTable2 add index idx(age);")
require.NoError(t, err)
}

func TestShutDown(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//errno",
"//executor",
"//expression",
"//extension",
"//extension/extensionimpl",
"//infoschema",
"//kv",
Expand Down
1 change: 1 addition & 0 deletions sessionctx/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/sessionctx",
visibility = ["//visibility:public"],
deps = [
"//extension",
"//kv",
"//metrics",
"//parser/model",
Expand Down
1 change: 1 addition & 0 deletions testkit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//kv",
"//parser/ast",
"//parser/terror",
"//planner/core",
"//session",
"//session/txninfo",
"//sessionctx/variable",
Expand Down
1 change: 1 addition & 0 deletions util/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
importpath = "github.com/pingcap/tidb/util/mock",
visibility = ["//visibility:public"],
deps = [
"//extension",
"//kv",
"//parser/ast",
"//parser/model",
Expand Down

0 comments on commit 99694ad

Please sign in to comment.