Skip to content

Commit

Permalink
*:Speed up parse slow-log when query slow_query #15371 (#19139) (#20556)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
Signed-off-by: jyz0309 <45495947@qq.com>
  • Loading branch information
ti-srebot authored Oct 21, 2020
1 parent b1d20a2 commit ba9fe30
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 101 deletions.
255 changes: 164 additions & 91 deletions executor/slow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@ package executor
import (
"bufio"
"context"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -64,7 +67,6 @@ func (e *slowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Conte
}
e.initializeAsyncParsing(ctx, sctx)
}

rows, retrieved, err := e.dataForSlowLog(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -126,16 +128,8 @@ func (e *slowQueryRetriever) parseDataForSlowLog(ctx context.Context, sctx sessi
close(e.parsedSlowLogCh)
return
}

reader := bufio.NewReader(e.files[0].file)
for e.fileIdx < len(e.files) {
rows, err := e.parseSlowLog(sctx, reader, 1024)
select {
case <-ctx.Done():
break
case e.parsedSlowLogCh <- parsedSlowLog{rows, err}:
}
}
e.parseSlowLog(ctx, sctx, reader, 64)
close(e.parsedSlowLogCh)
}

Expand All @@ -144,25 +138,28 @@ func (e *slowQueryRetriever) dataForSlowLog(ctx context.Context) ([][]types.Datu
slowLog parsedSlowLog
ok bool
)
select {
case slowLog, ok = <-e.parsedSlowLogCh:
case <-ctx.Done():
return nil, false, ctx.Err()
}
if !ok {
// When e.parsedSlowLogCh is closed, the slow log data is retrieved.
return nil, true, nil
}

rows, err := slowLog.rows, slowLog.err
if err != nil {
return nil, false, err
}
if e.table.Name.L == strings.ToLower(infoschema.ClusterTableSlowLog) {
rows, err := infoschema.AppendHostInfoToRows(rows)
return rows, false, err
for {
select {
case slowLog, ok = <-e.parsedSlowLogCh:
case <-ctx.Done():
return nil, false, ctx.Err()
}
if !ok {
return nil, true, nil
}
rows, err := slowLog.rows, slowLog.err
if err != nil {
return nil, false, err
}
if len(rows) == 0 {
continue
}
if e.table.Name.L == strings.ToLower(infoschema.ClusterTableSlowLog) {
rows, err := infoschema.AppendHostInfoToRows(rows)
return rows, false, err
}
return rows, false, nil
}
return rows, false, nil
}

type slowLogChecker struct {
Expand All @@ -186,35 +183,144 @@ func (sc *slowLogChecker) isTimeValid(t types.Time) bool {
return true
}

// TODO: optimize for parse huge log-file.
func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, maxRow int) ([][]types.Datum, error) {
var rows [][]types.Datum
var st *slowQueryTuple
startFlag := false
tz := ctx.GetSessionVars().Location()
for {
if len(rows) >= maxRow {
return rows, nil
func getOneLine(reader *bufio.Reader) ([]byte, error) {
var resByte []byte
lineByte, isPrefix, err := reader.ReadLine()
if isPrefix {
// Need to read more data.
resByte = make([]byte, len(lineByte), len(lineByte)*2)
} else {
resByte = make([]byte, len(lineByte))
}
// Use copy here to avoid shallow copy problem.
copy(resByte, lineByte)
if err != nil {
return resByte, err
}

var tempLine []byte
for isPrefix {
tempLine, isPrefix, err = reader.ReadLine()
resByte = append(resByte, tempLine...)
// Use the max value of max_allowed_packet to check the single line length.
if len(resByte) > int(variable.MaxOfMaxAllowedPacket) {
return resByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
}
e.fileLine++
lineByte, err := getOneLine(reader)
if err != nil {
if err == io.EOF {
e.fileIdx++
e.fileLine = 0
if e.fileIdx >= len(e.files) {
return rows, nil
return resByte, err
}
}
return resByte, err
}

type offset struct {
offset int
length int
}

func (e *slowQueryRetriever) getBatchLog(reader *bufio.Reader, offset *offset, num int) ([]string, error) {
var line string
log := make([]string, 0, num)
var err error
for i := 0; i < num; i++ {
for {
e.fileLine++
lineByte, err := getOneLine(reader)
if err != nil {
if err == io.EOF {
e.fileIdx++
e.fileLine = 0
if e.fileIdx >= len(e.files) {
return log, nil
}
offset.length = len(log)
reader.Reset(e.files[e.fileIdx].file)
continue
}
reader.Reset(e.files[e.fileIdx].file)
continue
return log, err
}
return rows, err
line = string(hack.String(lineByte))
log = append(log, line)
if strings.HasSuffix(line, variable.SlowLogSQLSuffixStr) {
if strings.HasPrefix(line, "use") {
continue
}
break
}
}
}
return log, err
}

func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.Context, reader *bufio.Reader, logNum int) {
var wg sync.WaitGroup
offset := offset{offset: 0, length: 0}
// To limit the num of go routine
ch := make(chan int, sctx.GetSessionVars().Concurrency.DistSQLScanConcurrency)
defer close(ch)
for {
log, err := e.getBatchLog(reader, &offset, logNum)
if err != nil {
e.parsedSlowLogCh <- parsedSlowLog{nil, err}
break
}
start := offset
wg.Add(1)
ch <- 1
go func() {
defer wg.Done()
result, err := e.parseLog(sctx, log, start)
if err != nil {
e.parsedSlowLogCh <- parsedSlowLog{nil, err}
} else {
e.parsedSlowLogCh <- parsedSlowLog{result, err}
}
<-ch
}()
// Read the next file, offset = 0
if e.fileIdx >= len(e.files) {
break
}
offset.offset = e.fileLine
offset.length = 0
select {
case <-ctx.Done():
break
default:
}
}
wg.Wait()
}

func getLineIndex(offset offset, index int) int {
var fileLine int
if offset.length <= index {
fileLine = index - offset.length + 1
} else {
fileLine = offset.offset + index + 1
}
return fileLine
}

func (e *slowQueryRetriever) parseLog(ctx sessionctx.Context, log []string, offset offset) (data [][]types.Datum, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("%s", r)
}
line := string(hack.String(lineByte))
// Check slow log entry start flag.
}()
failpoint.Inject("errorMockParseSlowLogPanic", func(val failpoint.Value) {
if val.(bool) {
panic("panic test")
}
})
var st *slowQueryTuple
tz := ctx.GetSessionVars().Location()
startFlag := false
for index, line := range log {
fileLine := getLineIndex(offset, index)
if !startFlag && strings.HasPrefix(line, variable.SlowLogStartPrefixStr) {
st = &slowQueryTuple{}
valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], e.fileLine, e.checker)
valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], fileLine, e.checker)
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
Expand All @@ -224,17 +330,14 @@ func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio.
}
continue
}

if startFlag {
// Parse slow log field.
if strings.HasPrefix(line, variable.SlowLogRowPrefixStr) {
line = line[len(variable.SlowLogRowPrefixStr):]
if strings.HasPrefix(line, variable.SlowLogPrevStmtPrefix) {
st.prevStmt = line[len(variable.SlowLogPrevStmtPrefix):]
} else if strings.HasPrefix(line, variable.SlowLogUserAndHostStr+variable.SlowLogSpaceMarkStr) {
// the user and hostname field has a special format, for example, # User@Host: root[root] @ localhost [127.0.0.1]
value := line[len(variable.SlowLogUserAndHostStr+variable.SlowLogSpaceMarkStr):]
valid, err := st.setFieldValue(tz, variable.SlowLogUserAndHostStr, value, e.fileLine, e.checker)
valid, err := st.setFieldValue(tz, variable.SlowLogUserAndHostStr, value, fileLine, e.checker)
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
Expand All @@ -249,7 +352,7 @@ func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio.
if strings.HasSuffix(field, ":") {
field = field[:len(field)-1]
}
valid, err := st.setFieldValue(tz, field, fieldValues[i+1], e.fileLine, e.checker)
valid, err := st.setFieldValue(tz, field, fieldValues[i+1], fileLine, e.checker)
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
Expand All @@ -266,53 +369,22 @@ func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio.
// please see https://github.com/pingcap/tidb/issues/17846 for more details.
continue
}

// Get the sql string, and mark the start flag to false.
_, err = st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), e.fileLine, e.checker)
_, err := st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), fileLine, e.checker)
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
if e.checker.hasPrivilege(st.user) {
rows = append(rows, st.convertToDatumRow())
data = append(data, st.convertToDatumRow())
}
startFlag = false
} else {
startFlag = false
}
}
}
}

func getOneLine(reader *bufio.Reader) ([]byte, error) {
var resByte []byte
lineByte, isPrefix, err := reader.ReadLine()
if isPrefix {
// Need to read more data.
resByte = make([]byte, len(lineByte), len(lineByte)*2)
} else {
resByte = make([]byte, len(lineByte))
}
// Use copy here to avoid shallow copy problem.
copy(resByte, lineByte)
if err != nil {
return resByte, err
}

var tempLine []byte
for isPrefix {
tempLine, isPrefix, err = reader.ReadLine()
resByte = append(resByte, tempLine...)

// Use the max value of max_allowed_packet to check the single line length.
if len(resByte) > int(variable.MaxOfMaxAllowedPacket) {
return resByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
}
if err != nil {
return resByte, err
}
}
return resByte, err
return data, nil
}

type slowQueryTuple struct {
Expand Down Expand Up @@ -514,12 +586,13 @@ func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string,
st.preprocSubQueryTime, err = strconv.ParseFloat(value, 64)
}
if err != nil {
return valid, errors.Wrap(err, "Parse slow log at line "+strconv.FormatInt(int64(lineNum), 10)+" failed. Field: `"+field+"`, error")
return valid, fmt.Errorf("Parse slow log at line " + strconv.FormatInt(int64(lineNum), 10) + " failed. Field: `" + field + "`, error: " + err.Error())
}
return valid, err
}

func (st *slowQueryTuple) convertToDatumRow() []types.Datum {
// Build the slow query result
record := make([]types.Datum, 0, 64)
record = append(record, types.NewTimeDatum(st.time))
record = append(record, types.NewUintDatum(st.txnStartTs))
Expand Down Expand Up @@ -775,6 +848,6 @@ func (e *slowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) {
}

func (e *slowQueryRetriever) initializeAsyncParsing(ctx context.Context, sctx sessionctx.Context) {
e.parsedSlowLogCh = make(chan parsedSlowLog, 1)
e.parsedSlowLogCh = make(chan parsedSlowLog, 100)
go e.parseDataForSlowLog(ctx, sctx)
}
Loading

0 comments on commit ba9fe30

Please sign in to comment.