Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add succ filed to slow log and fix shallow copy problem when parse slow log file. (#11417) #11421

Merged
merged 2 commits into from
Jul 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,10 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
if costTime < threshold {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, sql))
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, succ, sql))
} else {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, sql))
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, succ, sql))
metrics.TotalQueryProcHistogram.Observe(costTime.Seconds())
metrics.TotalCopProcHistogram.Observe(execDetail.ProcessTime.Seconds())
metrics.TotalCopWaitHistogram.Observe(execDetail.WaitTime.Seconds())
Expand Down
144 changes: 50 additions & 94 deletions infoschema/slow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/stringutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -58,7 +57,8 @@ var slowQueryCols = []columnInfo{
{variable.SlowLogCopWaitP90, mysql.TypeDouble, 22, 0, nil, nil},
{variable.SlowLogCopWaitMax, mysql.TypeDouble, 22, 0, nil, nil},
{variable.SlowLogMemMax, mysql.TypeLonglong, 20, 0, nil, nil},
{variable.SlowLogQuerySQLStr, mysql.TypeVarchar, 4096, 0, nil, nil},
{variable.SlowLogSucc, mysql.TypeTiny, 1, 0, nil, nil},
{variable.SlowLogQuerySQLStr, mysql.TypeLongBlob, types.UnspecifiedLength, 0, nil, nil},
}

func dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) {
Expand Down Expand Up @@ -137,24 +137,34 @@ func ParseSlowLog(tz *time.Location, reader *bufio.Reader) ([][]types.Datum, err
}

func getOneLine(reader *bufio.Reader) ([]byte, error) {
var resByte []byte
lineByte, isPrefix, err := reader.ReadLine()
if isPrefix {
// Need to read more data.
resByte = make([]byte, len(lineByte), len(lineByte)*2)
} else {
resByte = make([]byte, len(lineByte))
}
// Use copy here to avoid shallow copy problem.
copy(resByte, lineByte)
if err != nil {
return lineByte, err
return resByte, err
}

var tempLine []byte
for isPrefix {
tempLine, isPrefix, err = reader.ReadLine()
lineByte = append(lineByte, tempLine...)
resByte = append(resByte, tempLine...)

// Use the max value of max_allowed_packet to check the single line length.
if len(lineByte) > int(variable.MaxOfMaxAllowedPacket) {
return lineByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
if len(resByte) > int(variable.MaxOfMaxAllowedPacket) {
return resByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
}
if err != nil {
return lineByte, err
return resByte, err
}
}
return lineByte, err
return resByte, err
}

type slowQueryTuple struct {
Expand All @@ -181,77 +191,41 @@ type slowQueryTuple struct {
p90WaitTime float64
maxWaitTime float64
memMax int64
succ bool
sql string
}

func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string) error {
value = stringutil.Copy(value)
var err error
switch field {
case variable.SlowLogTimeStr:
t, err := ParseTime(value)
st.time, err = ParseTime(value)
if err != nil {
return err
break
}
if t.Location() != tz {
t = t.In(tz)
if st.time.Location() != tz {
st.time = st.time.In(tz)
}
st.time = t
case variable.SlowLogTxnStartTSStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.txnStartTs = num
st.txnStartTs, err = strconv.ParseUint(value, 10, 64)
case variable.SlowLogUserStr:
st.user = value
case variable.SlowLogConnIDStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.connID = num
st.connID, err = strconv.ParseUint(value, 10, 64)
case variable.SlowLogQueryTimeStr:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.queryTime = num
st.queryTime, err = strconv.ParseFloat(value, 64)
case execdetails.ProcessTimeStr:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.processTime = num
st.processTime, err = strconv.ParseFloat(value, 64)
case execdetails.WaitTimeStr:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.waitTime = num
st.waitTime, err = strconv.ParseFloat(value, 64)
case execdetails.BackoffTimeStr:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.backOffTime = num
st.backOffTime, err = strconv.ParseFloat(value, 64)
case execdetails.RequestCountStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.requestCount = num
st.requestCount, err = strconv.ParseUint(value, 10, 64)
case execdetails.TotalKeysStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.totalKeys = num
st.totalKeys, err = strconv.ParseUint(value, 10, 64)
case execdetails.ProcessKeysStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.processKeys = num
st.processKeys, err = strconv.ParseUint(value, 10, 64)
case variable.SlowLogDBStr:
st.db = value
case variable.SlowLogIndexIDsStr:
Expand All @@ -263,50 +237,27 @@ func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string)
case variable.SlowLogStatsInfoStr:
st.statsInfo = value
case variable.SlowLogCopProcAvg:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.avgProcessTime = num
st.avgProcessTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopProcP90:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.p90ProcessTime = num
st.p90ProcessTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopProcMax:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.maxProcessTime = num
st.maxProcessTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopWaitAvg:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.avgWaitTime = num
st.avgWaitTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopWaitP90:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.p90WaitTime = num
st.p90WaitTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopWaitMax:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.maxWaitTime = num
st.maxWaitTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogMemMax:
num, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.memMax = num
st.memMax, err = strconv.ParseInt(value, 10, 64)
case variable.SlowLogSucc:
st.succ, err = strconv.ParseBool(value)
case variable.SlowLogQuerySQLStr:
st.sql = value
}
if err != nil {
return errors.Wrap(err, "parse slow log failed `"+field+"` error")
}
return nil
}

Expand Down Expand Up @@ -339,6 +290,11 @@ func (st *slowQueryTuple) convertToDatumRow() []types.Datum {
record = append(record, types.NewFloat64Datum(st.p90WaitTime))
record = append(record, types.NewFloat64Datum(st.maxWaitTime))
record = append(record, types.NewIntDatum(st.memMax))
if st.succ {
record = append(record, types.NewIntDatum(1))
} else {
record = append(record, types.NewIntDatum(0))
}
record = append(record, types.NewStringDatum(st.sql))
return record
}
Expand Down
16 changes: 15 additions & 1 deletion infoschema/slow_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (s *testSuite) TestParseSlowLogFile(c *C) {
# Cop_proc_avg: 0.1 Cop_proc_p90: 0.2 Cop_proc_max: 0.03
# Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8
# Mem_max: 70724
# Succ: false
select * from t;`)
reader := bufio.NewReader(slowLog)
loc, err := time.LoadLocation("Asia/Shanghai")
Expand All @@ -53,7 +54,8 @@ select * from t;`)
}
recordString += str
}
expectRecordString := "2019-04-28 15:24:04.309074,405888132465033227,,0,0.216905,0.021,0,0,1,637,0,,,1,42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772,t1:1,t2:2,0.1,0.2,0.03,0.05,0.6,0.8,70724,select * from t;"
expectRecordString := "2019-04-28 15:24:04.309074,405888132465033227,,0,0.216905,0.021,0,0,1,637,0,,,1,42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772,t1:1,t2:2,0.1,0.2,0.03,0.05,0.6,0.8,70724,0,select * from t;"

c.Assert(expectRecordString, Equals, recordString)

// fix sql contain '# ' bug
Expand All @@ -67,6 +69,7 @@ select a# from t;
# Is_internal: true
# Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772
# Stats: t1:1,t2:2
# Succ: false
select * from t;
`)
reader = bufio.NewReader(slowLog)
Expand Down Expand Up @@ -110,6 +113,17 @@ select * from t;
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader)
c.Assert(err, IsNil)

// Add parse error check.
slowLog = bytes.NewBufferString(
`# Time: 2019-04-28T15:24:04.309074+08:00
# Succ: abc
select * from t;
`)
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "parse slow log failed `Succ` error: strconv.ParseBool: parsing \"abc\": invalid syntax")
}

func (s *testSuite) TestSlowLogParseTime(c *C) {
Expand Down
24 changes: 21 additions & 3 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,16 +502,34 @@ func (s *testSuite) TestSlowQuery(c *C) {
# Cop_proc_avg: 0.1 Cop_proc_p90: 0.2 Cop_proc_max: 0.03
# Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8
# Mem_max: 70724
# Succ: true
select * from t_slim;`))
c.Assert(f.Close(), IsNil)
c.Assert(f.Sync(), IsNil)
c.Assert(err, IsNil)

tk.MustExec(fmt.Sprintf("set @@tidb_slow_query_file='%v'", slowLogFileName))
tk.MustExec("set time_zone = '+08:00';")
re := tk.MustQuery("select * from information_schema.slow_query")
re.Check(testutil.RowsWithSep("|",
"2019-02-12 19:33:56.571953|406315658548871171|root@127.0.0.1|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|0.05|0.6|0.8|70724|select * from t_slim;"))
"2019-02-12 19:33:56.571953|406315658548871171|root@127.0.0.1|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|0.05|0.6|0.8|70724|1|select * from t_slim;"))
tk.MustExec("set time_zone = '+00:00';")
re = tk.MustQuery("select * from information_schema.slow_query")
re.Check(testutil.RowsWithSep("|", "2019-02-12 11:33:56.571953|406315658548871171|root@127.0.0.1|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|0.05|0.6|0.8|70724|select * from t_slim;"))
re.Check(testutil.RowsWithSep("|", "2019-02-12 11:33:56.571953|406315658548871171|root@127.0.0.1|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|0.05|0.6|0.8|70724|1|select * from t_slim;"))

// Test for long query.
_, err = f.Write([]byte(`
# Time: 2019-02-13T19:33:56.571953+08:00
`))
c.Assert(err, IsNil)
sql := "select * from "
for len(sql) < 5000 {
sql += "abcdefghijklmnopqrstuvwxyz_1234567890_qwertyuiopasdfghjklzxcvbnm"
}
sql += ";"
_, err = f.Write([]byte(sql))
c.Assert(err, IsNil)
c.Assert(f.Close(), IsNil)
re = tk.MustQuery("select query from information_schema.slow_query order by time desc limit 1")
rows := re.Rows()
c.Assert(rows[0][0], Equals, sql)
}
6 changes: 5 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,8 @@ const (
SlowLogCopWaitMax = "Cop_wait_max"
// SlowLogMemMax is the max number bytes of memory used in this statement.
SlowLogMemMax = "Mem_max"
// SlowLogSucc is used to indicate whether this sql execute successfully.
SlowLogSucc = "Succ"
)

// SlowLogFormat uses for formatting slow log.
Expand All @@ -852,9 +854,10 @@ const (
// # Cop_process: Avg_time: 1s P90_time: 2s Max_time: 3s Max_addr: 10.6.131.78
// # Cop_wait: Avg_time: 10ms P90_time: 20ms Max_time: 30ms Max_Addr: 10.6.131.79
// # Memory_max: 4096
// # Succ: true
// select * from t_slim;
func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDetail execdetails.ExecDetails, indexIDs string, digest string,
statsInfos map[string]uint64, copTasks *stmtctx.CopTasksDetails, memMax int64, sql string) string {
statsInfos map[string]uint64, copTasks *stmtctx.CopTasksDetails, memMax int64, succ bool, sql string) string {
var buf bytes.Buffer
execDetailStr := execDetail.String()
buf.WriteString(SlowLogPrefixStr + SlowLogTxnStartTSStr + SlowLogSpaceMarkStr + strconv.FormatUint(txnTS, 10) + "\n")
Expand Down Expand Up @@ -912,6 +915,7 @@ func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDe
if memMax > 0 {
buf.WriteString(SlowLogPrefixStr + SlowLogMemMax + SlowLogSpaceMarkStr + strconv.FormatInt(memMax, 10) + "\n")
}
buf.WriteString(SlowLogPrefixStr + SlowLogSucc + SlowLogSpaceMarkStr + strconv.FormatBool(succ) + "\n")
if len(sql) == 0 {
sql = ";"
}
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
# Cop_proc_avg: 1 Cop_proc_p90: 2 Cop_proc_max: 3
# Cop_wait_avg: 0.01 Cop_wait_p90: 0.02 Cop_wait_max: 0.03
# Mem_max: 2333
# Succ: true
select * from t;`
sql := "select * from t"
digest := parser.DigestHash(sql)
logString := seVar.SlowLogFormat(txnTS, costTime, execDetail, "[1,2]", digest, statsInfos, copTasks, memMax, sql)
logString := seVar.SlowLogFormat(txnTS, costTime, execDetail, "[1,2]", digest, statsInfos, copTasks, memMax, true, sql)
c.Assert(logString, Equals, resultString)
}