Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

telemetry: sync the concurrent map access of built-in func usage #32618

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,6 @@ func (se *session) GetInfoSchema() sessionctx.InfoschemaMetaVersion {
return nil
}

// GetBuiltinFunctionUsage returns the BuiltinFunctionUsage of current Context, which is not thread safe.
// Use primitive map type to prevent circular import. Should convert it to telemetry.BuiltinFunctionUsage before using.
func (se *session) GetBuiltinFunctionUsage() map[string]uint32 {
return make(map[string]uint32)
}

// GetStmtStats implements the sessionctx.Context interface.
func (se *session) GetStmtStats() *stmtstats.StatementStats {
return nil
Expand Down
4 changes: 3 additions & 1 deletion planner/core/expression_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,9 @@ func (er *expressionRewriter) newFunction(funcName string, retType *types.FieldT
return
}
if scalarFunc, ok := ret.(*expression.ScalarFunction); ok {
telemetry.BuiltinFunctionsUsage(er.b.ctx.GetBuiltinFunctionUsage()).Inc(scalarFunc.Function.PbCode().String())
if collector, ok := er.b.ctx.(telemetry.BuiltinFunctionsUsageCollector); ok {
collector.CollectBuiltinFunctionsUsage(scalarFunc.Function.PbCode().String())
}
}
return
}
Expand Down
12 changes: 6 additions & 6 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ type session struct {

cache [1]ast.StmtNode

builtinFunctionUsage telemetry.BuiltinFunctionsUsage
builtinFunctionUsage *telemetry.BuiltinFunctionsUsage
// allowed when tikv disk full happened.
diskFullOpt kvrpcpb.DiskFullOpt

Expand Down Expand Up @@ -2526,7 +2526,7 @@ func (s *session) Close() {
if s.idxUsageCollector != nil {
s.idxUsageCollector.Delete()
}
telemetry.GlobalBuiltinFunctionsUsage.Collect(s.GetBuiltinFunctionUsage())
telemetry.GlobalBuiltinFunctionsUsage.Merge(s.builtinFunctionUsage)
bindValue := s.Value(bindinfo.SessionBindInfoKeyType)
if bindValue != nil {
bindValue.(*bindinfo.SessionHandle).Close()
Expand Down Expand Up @@ -2928,7 +2928,7 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
ddlOwnerChecker: dom.DDL().OwnerManager(),
client: store.GetClient(),
mppClient: store.GetMPPClient(),
builtinFunctionUsage: make(telemetry.BuiltinFunctionsUsage),
builtinFunctionUsage: telemetry.NewBuiltinFunctionsUsage(),
stmtStats: stmtstats.CreateStatementStats(),
}
if plannercore.PreparedPlanCacheEnabled() {
Expand Down Expand Up @@ -2962,7 +2962,7 @@ func CreateSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er
sessionVars: variable.NewSessionVars(),
client: store.GetClient(),
mppClient: store.GetMPPClient(),
builtinFunctionUsage: make(telemetry.BuiltinFunctionsUsage),
builtinFunctionUsage: telemetry.NewBuiltinFunctionsUsage(),
stmtStats: stmtstats.CreateStatementStats(),
}
if plannercore.PreparedPlanCacheEnabled() {
Expand Down Expand Up @@ -3415,8 +3415,8 @@ func (s *session) updateTelemetryMetric(es *executor.ExecStmt) {
}
}

func (s *session) GetBuiltinFunctionUsage() map[string]uint32 {
return s.builtinFunctionUsage
func (s *session) CollectBuiltinFunctionsUsage(funcName string) {
s.builtinFunctionUsage.Inc(funcName)
}

func (s *session) getSnapshotInterceptor() kv.SnapshotInterceptor {
Expand Down
3 changes: 0 additions & 3 deletions sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,6 @@ type Context interface {
StoreIndexUsage(tblID int64, idxID int64, rowsSelected int64)
// GetTxnWriteThroughputSLI returns the TxnWriteThroughputSLI.
GetTxnWriteThroughputSLI() *sli.TxnWriteThroughputSLI
// GetBuiltinFunctionUsage returns the BuiltinFunctionUsage of current Context, which is not thread safe.
// Use primitive map type to prevent circular import. Should convert it to telemetry.BuiltinFunctionUsage before using.
GetBuiltinFunctionUsage() map[string]uint32
// GetStmtStats returns stmtstats.StatementStats owned by implementation.
GetStmtStats() *stmtstats.StatementStats
}
Expand Down
85 changes: 47 additions & 38 deletions telemetry/data_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,59 +92,70 @@ type tiFlashUsageData struct {
ExchangePushDown uint64 `json:"exchangePushDown"`
}

// builtinFunctionsUsageCollector collects builtin functions usage information and dump it into windowData.
type builtinFunctionsUsageCollector struct {
// BuiltinFunctionsUsage is a map from ScalarFuncSig_name(string) to usage count(uint32)
type BuiltinFunctionsUsage struct {
usages map[string]uint32
sync.Mutex

// Should acquire lock to access this
usageData BuiltinFunctionsUsage
}

// Merge BuiltinFunctionsUsage data
func (b *builtinFunctionsUsageCollector) Collect(usageData BuiltinFunctionsUsage) {
// TODO(leiysky): use multi-worker to collect the usage information so we can make this asynchronous
b.Lock()
defer b.Unlock()
b.usageData.Merge(usageData)
}

// Dump BuiltinFunctionsUsage data
func (b *builtinFunctionsUsageCollector) Dump() map[string]uint32 {
b.Lock()
ret := b.usageData
b.usageData = make(map[string]uint32)
b.Unlock()

return ret
// NewBuiltinFunctionsUsage creates a BuiltinFunctionsUsage.
func NewBuiltinFunctionsUsage() *BuiltinFunctionsUsage {
return &BuiltinFunctionsUsage{
usages: make(map[string]uint32),
}
}

// BuiltinFunctionsUsage is a map from ScalarFuncSig_name(string) to usage count(uint32)
type BuiltinFunctionsUsage map[string]uint32

// Inc will increase the usage count of scalar function by 1
func (b BuiltinFunctionsUsage) Inc(scalarFuncSigName string) {
v, ok := b[scalarFuncSigName]
func (b *BuiltinFunctionsUsage) Inc(funcName string) {
b.Lock()
defer b.Unlock()
v, ok := b.usages[funcName]
if !ok {
b[scalarFuncSigName] = 1
b.usages[funcName] = 1
} else {
b[scalarFuncSigName] = v + 1
b.usages[funcName] = v + 1
}
}

// Merge BuiltinFunctionsUsage data
func (b BuiltinFunctionsUsage) Merge(usageData BuiltinFunctionsUsage) {
for k, v := range usageData {
prev, ok := b[k]
func mergeUsageMap(m1, m2 map[string]uint32) {
for k, v := range m2 {
prev, ok := m1[k]
if !ok {
b[k] = v
m1[k] = v
} else {
b[k] = prev + v
m1[k] = prev + v
}
}
}

// Merge BuiltinFunctionsUsage data
func (b *BuiltinFunctionsUsage) Merge(usageData *BuiltinFunctionsUsage) {
if usageData == nil {
return
}
b.Lock()
defer b.Unlock()
usageData.Lock()
defer usageData.Unlock()
mergeUsageMap(b.usages, usageData.usages)
}

// Dump BuiltinFunctionsUsage data
func (b *BuiltinFunctionsUsage) Dump() map[string]uint32 {
b.Lock()
defer b.Unlock()
ret := b.usages
b.usages = make(map[string]uint32)
return ret
}

// GlobalBuiltinFunctionsUsage is used to collect builtin functions usage information
var GlobalBuiltinFunctionsUsage = &builtinFunctionsUsageCollector{usageData: make(BuiltinFunctionsUsage)}
var GlobalBuiltinFunctionsUsage = NewBuiltinFunctionsUsage()

// BuiltinFunctionsUsageCollector is used to collect the usage of scalar function for telemetry.
type BuiltinFunctionsUsageCollector interface {
CollectBuiltinFunctionsUsage(funcName string)
}

var (
rotatedSubWindows []*windowData
Expand Down Expand Up @@ -300,9 +311,7 @@ func getWindowData() []*windowData {
thisWindow.SQLUsage.SQLTotal = rotatedSubWindows[i].SQLUsage.SQLTotal - startWindow.SQLUsage.SQLTotal
thisWindow.SQLUsage.SQLType = calDeltaSQLTypeMap(rotatedSubWindows[i].SQLUsage.SQLType, startWindow.SQLUsage.SQLType)

mergedBuiltinFunctionsUsage := BuiltinFunctionsUsage(thisWindow.BuiltinFunctionsUsage)
mergedBuiltinFunctionsUsage.Merge(BuiltinFunctionsUsage(rotatedSubWindows[i].BuiltinFunctionsUsage))
thisWindow.BuiltinFunctionsUsage = mergedBuiltinFunctionsUsage
mergeUsageMap(thisWindow.BuiltinFunctionsUsage, rotatedSubWindows[i].BuiltinFunctionsUsage)
aggregatedSubWindows++
i++
}
Expand Down
17 changes: 17 additions & 0 deletions telemetry/data_window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,20 @@ func TestBuiltinFunctionsUsage(t *testing.T) {
usage = telemetry.GlobalBuiltinFunctionsUsage.Dump()
require.Equal(t, map[string]uint32{"PlusInt": 1, "MinusInt": 1}, usage)
}

// https://github.com/pingcap/tidb/issues/32459.
func TestBuiltinFunctionsUsageJoinViews(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create view vw_dict as " +
"select a.table_schema, a.table_name as name, a.column_name, " +
"a.column_type, a.column_default, a.is_nullable, b.column_comment from " +
"information_schema.columns a left join information_schema.columns b on " +
"(a.table_name = b.table_name and a.column_name = b.column_name and b.table_schema = 'accountdb') " +
"where (a.table_schema = 'query_account') order by a.table_name, a.ordinal_position;")
tk.MustExec("create table t (a int, b int);")
tk.MustExec("select * from vw_dict where name = 't'")
}
5 changes: 0 additions & 5 deletions util/mock/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,6 @@ func (c *Context) GetInfoSchema() sessionctx.InfoschemaMetaVersion {
return nil
}

// GetBuiltinFunctionUsage implements sessionctx.Context GetBuiltinFunctionUsage interface.
func (c *Context) GetBuiltinFunctionUsage() map[string]uint32 {
return make(map[string]uint32)
}

// GetGlobalSysVar implements GlobalVarAccessor GetGlobalSysVar interface.
func (c *Context) GetGlobalSysVar(ctx sessionctx.Context, name string) (string, error) {
v := variable.GetSysVar(name)
Expand Down