Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor logging in pgsql module #12151

Merged
merged 1 commit into from
May 10, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Refactor logging in pgsql module
Guard debug logging statements with "isDebug" checks. And switch the module over to using named loggers.

Fixes #12150
andrewkroh committed May 9, 2019
commit 52a05dd2908c16db0fa341b6689779ef43024f74
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
@@ -122,6 +122,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Prevent duplicate packet loss error messages in HTTP events. {pull}10709[10709]
- Avoid reporting unknown MongoDB opcodes more than once. {pull}10878[10878]
- Fixed a memory leak when using process monitoring under Windows. {pull}12100[12100]
- Improved debug logging efficiency in PGQSL module. {issue}12150[12150]

*Winlogbeat*

98 changes: 47 additions & 51 deletions packetbeat/protos/pgsql/parse.go
Original file line number Diff line number Diff line change
@@ -22,7 +22,6 @@ import (
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

var (
@@ -34,7 +33,7 @@ var (
)

func (pgsql *pgsqlPlugin) pgsqlMessageParser(s *pgsqlStream) (bool, bool) {
debugf("pgsqlMessageParser, off=%v", s.parseOffset)
pgsql.debugf("pgsqlMessageParser, off=%v", s.parseOffset)

var ok, complete bool

@@ -46,22 +45,22 @@ func (pgsql *pgsqlPlugin) pgsqlMessageParser(s *pgsqlStream) (bool, bool) {
case pgsqlExtendedQueryState:
ok, complete = pgsql.parseMessageExtendedQuery(s)
default:
logp.Critical("Pgsql invalid parser state")
pgsql.log.Error("Pgsql invalid parser state")
}

detailedf("pgsqlMessageParser return: ok=%v, complete=%v, off=%v",
pgsql.detailf("pgsqlMessageParser return: ok=%v, complete=%v, off=%v",
ok, complete, s.parseOffset)

return ok, complete
}

func (pgsql *pgsqlPlugin) parseMessageStart(s *pgsqlStream) (bool, bool) {
detailedf("parseMessageStart")
pgsql.detailf("parseMessageStart")

m := s.message

for len(s.data[s.parseOffset:]) >= 5 {
isSpecial, length, command := isSpecialPgsqlCommand(s.data[s.parseOffset:])
isSpecial, length, command := pgsql.isSpecialCommand(s.data[s.parseOffset:])
if !isSpecial {
return pgsql.parseCommand(s)
}
@@ -71,7 +70,7 @@ func (pgsql *pgsqlPlugin) parseMessageStart(s *pgsqlStream) (bool, bool) {

// check buffer available
if len(s.data[s.parseOffset:]) <= length {
detailedf("Wait for more data 1")
pgsql.detailf("Wait for more data 1")
return true, false
}

@@ -103,7 +102,7 @@ func (pgsql *pgsqlPlugin) parseCommand(s *pgsqlStream) (bool, bool) {
m := s.message

// one byte reply to SSLRequest
detailedf("Reply for SSLRequest %c", typ)
pgsql.detailf("Reply for SSLRequest %c", typ)
m.start = s.parseOffset
s.parseOffset++
m.end = s.parseOffset
@@ -118,15 +117,15 @@ func (pgsql *pgsqlPlugin) parseCommand(s *pgsqlStream) (bool, bool) {
length := readLength(s.data[s.parseOffset+1:])
if length < 4 {
// length should include the size of itself (int32)
detailedf("Invalid pgsql command length.")
pgsql.detailf("Invalid pgsql command length.")
return false, false
}
if len(s.data[s.parseOffset:]) <= length {
detailedf("Wait for more data")
pgsql.detailf("Wait for more data")
return true, false
}

detailedf("Pgsql type %c, length=%d", typ, length)
pgsql.detailf("Pgsql type %c, length=%d", typ, length)

switch typ {
case 'Q':
@@ -147,7 +146,7 @@ func (pgsql *pgsqlPlugin) parseCommand(s *pgsqlStream) (bool, bool) {
return pgsql.parseExtResp(s, length)
default:
if !pgsqlValidType(typ) {
detailedf("invalid frame type: '%c'", typ)
pgsql.detailf("invalid frame type: '%c'", typ)
return false, false
}
return pgsql.parseSkipMessage(s, length)
@@ -172,7 +171,7 @@ func (pgsql *pgsqlPlugin) parseSimpleQuery(s *pgsqlStream, length int) (bool, bo
m.query = query

m.toExport = true
detailedf("Simple Query: %s", m.query)
pgsql.detailf("Simple Query: %s", m.query)
return true, true
}

@@ -184,12 +183,12 @@ func (pgsql *pgsqlPlugin) parseRowDescription(s *pgsqlStream, length int) (bool,
m.isOK = true
m.toExport = true

err := pgsqlFieldsParser(s, s.data[s.parseOffset+5:s.parseOffset+length+1])
err := pgsql.parseFields(s, s.data[s.parseOffset+5:s.parseOffset+length+1])
if err != nil {
detailedf("fields parse failed with: %v", err)
pgsql.detailf("parseFields failed with: %v", err)
return false, false
}
detailedf("Fields: %s", m.fields)
pgsql.detailf("Fields: %s", m.fields)

s.parseOffset++ //type
s.parseOffset += length //length
@@ -218,7 +217,7 @@ func (pgsql *pgsqlPlugin) parseEmptyQueryResponse(s *pgsqlStream) (bool, bool) {

m := s.message

detailedf("EmptyQueryResponse")
pgsql.detailf("EmptyQueryResponse")
m.start = s.parseOffset
m.isOK = true
m.isRequest = false
@@ -245,7 +244,7 @@ func (pgsql *pgsqlPlugin) parseCommandComplete(s *pgsqlStream, length int) (bool
return false, false
}

detailedf("CommandComplete length=%d, tag=%s", length, name)
pgsql.detailf("CommandComplete length=%d, tag=%s", length, name)

s.parseOffset += length
m.end = s.parseOffset
@@ -269,7 +268,7 @@ func (pgsql *pgsqlPlugin) parseReadyForQuery(s *pgsqlStream, length int) (bool,

func (pgsql *pgsqlPlugin) parseErrorResponse(s *pgsqlStream, length int) (bool, bool) {
// ErrorResponse
detailedf("ErrorResponse")
pgsql.detailf("ErrorResponse")

m := s.message
m.start = s.parseOffset
@@ -278,7 +277,7 @@ func (pgsql *pgsqlPlugin) parseErrorResponse(s *pgsqlStream, length int) (bool,
m.toExport = true

s.parseOffset++ //type
pgsqlErrorParser(s, s.data[s.parseOffset+4:s.parseOffset+length])
pgsql.parseError(s, s.data[s.parseOffset+4:s.parseOffset+length])

s.parseOffset += length //length
m.end = s.parseOffset
@@ -289,7 +288,7 @@ func (pgsql *pgsqlPlugin) parseErrorResponse(s *pgsqlStream, length int) (bool,

func (pgsql *pgsqlPlugin) parseExtReq(s *pgsqlStream, length int) (bool, bool) {
// Ready for query -> Parse for an extended query request
detailedf("Parse")
pgsql.detailf("Parse")

m := s.message
m.start = s.parseOffset
@@ -303,11 +302,11 @@ func (pgsql *pgsqlPlugin) parseExtReq(s *pgsqlStream, length int) (bool, bool) {

query, err := common.ReadString(s.data[m.start+6:])
if err != nil {
detailedf("Invalid extended query request")
pgsql.detailf("Invalid extended query request")
return false, false
}
m.query = query
detailedf("Parse in an extended query request: %s", m.query)
pgsql.detailf("Parse in an extended query request: %s", m.query)

// Ignore SET statement
if strings.HasPrefix(m.query, "SET ") {
@@ -319,7 +318,7 @@ func (pgsql *pgsqlPlugin) parseExtReq(s *pgsqlStream, length int) (bool, bool) {

func (pgsql *pgsqlPlugin) parseExtResp(s *pgsqlStream, length int) (bool, bool) {
// Sync -> Parse completion for an extended query response
detailedf("ParseCompletion")
pgsql.detailf("ParseCompletion")

m := s.message
m.start = s.parseOffset
@@ -329,7 +328,7 @@ func (pgsql *pgsqlPlugin) parseExtResp(s *pgsqlStream, length int) (bool, bool)

s.parseOffset++ //type
s.parseOffset += length
detailedf("Parse completion in an extended query response")
pgsql.detailf("Parse completion in an extended query response")
s.parseState = pgsqlGetDataState
return pgsql.parseMessageData(s)
}
@@ -349,7 +348,7 @@ func (pgsql *pgsqlPlugin) parseSkipMessage(s *pgsqlStream, length int) (bool, bo
return true, true
}

func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error {
func (pgsql *pgsqlPlugin) parseFields(s *pgsqlStream, buf []byte) error {
m := s.message

if len(buf) < 2 {
@@ -359,7 +358,6 @@ func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error {
// read field count (int16)
off := 2
fieldCount := readCount(buf)
detailedf("Row Description field count=%d", fieldCount)

fields := []string{}
fieldsFormat := []byte{}
@@ -400,8 +398,6 @@ func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error {
format := common.BytesNtohs(buf[off : off+2])
off += 2
fieldsFormat = append(fieldsFormat, byte(format))

detailedf("Field name=%s, format=%d", fieldName, format)
}

if off < len(buf) {
@@ -411,13 +407,13 @@ func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error {
m.fields = fields
m.fieldsFormat = fieldsFormat
if m.numberOfFields != fieldCount {
logp.Err("Missing fields from RowDescription. Expected %d. Received %d",
pgsql.log.Errorf("Missing fields from RowDescription. Expected %d. Received %d",
fieldCount, m.numberOfFields)
}
return nil
}

func pgsqlErrorParser(s *pgsqlStream, buf []byte) {
func (pgsql *pgsqlPlugin) parseError(s *pgsqlStream, buf []byte) {
m := s.message
off := 0
for off < len(buf) {
@@ -430,7 +426,7 @@ func pgsqlErrorParser(s *pgsqlStream, buf []byte) {
// read field value(string)
val, err := common.ReadString(buf[off+1:])
if err != nil {
logp.Err("Failed to read the column field")
pgsql.log.Error("Failed to read the column field")
break
}
off += len(val) + 2
@@ -444,11 +440,11 @@ func pgsqlErrorParser(s *pgsqlStream, buf []byte) {
m.errorSeverity = val
}
}
detailedf("%s %s %s", m.errorSeverity, m.errorCode, m.errorInfo)
pgsql.detailf("%s %s %s", m.errorSeverity, m.errorCode, m.errorInfo)
}

func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) {
detailedf("parseMessageData")
pgsql.detailf("parseMessageData")

// The response to queries that return row sets contains:
// RowDescription
@@ -466,12 +462,12 @@ func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) {
length := readLength(s.data[s.parseOffset+1:])
if length < 4 {
// length should include the size of itself (int32)
detailedf("Invalid pgsql command length.")
pgsql.detailf("Invalid pgsql command length.")
return false, false
}
if len(s.data[s.parseOffset:]) <= length {
// wait for more
detailedf("Wait for more data")
pgsql.detailf("Wait for more data")
return true, false
}

@@ -491,17 +487,17 @@ func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) {

name, err := pgsqlString(s.data[s.parseOffset+4:], length-4)
if err != nil {
detailedf("pgsql string invalid")
pgsql.detailf("pgsql string invalid")
return false, false
}

detailedf("CommandComplete length=%d, tag=%s", length, name)
pgsql.detailf("CommandComplete length=%d, tag=%s", length, name)
s.parseOffset += length
m.end = s.parseOffset
m.size = uint64(m.end - m.start)
s.parseState = pgsqlStartState

detailedf("Rows: %s", m.rows)
pgsql.detailf("Rows: %s", m.rows)

return true, true
case '2':
@@ -515,7 +511,7 @@ func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) {
return pgsql.parseRowDescription(s, length)
default:
// shouldn't happen -> return error
logp.Warn("Pgsql parser expected data message, but received command of type %v", typ)
pgsql.log.Warnf("Pgsql parser expected data message, but received command of type %v", typ)
s.parseState = pgsqlStartState
return false, false
}
@@ -530,7 +526,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error {
// read field count (int16)
off := 2
fieldCount := readCount(buf)
detailedf("DataRow field count=%d", fieldCount)
pgsql.detailf("DataRow field count=%d", fieldCount)

rows := []string{}
rowLength := 0
@@ -545,7 +541,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error {
off += 4

if columnLength > 0 && columnLength > len(buf[off:]) {
logp.Err("Pgsql invalid column_length=%v, buffer_length=%v, i=%v",
pgsql.log.Errorf("Pgsql invalid column_length=%v, buffer_length=%v, i=%v",
columnLength, len(buf[off:]), i)
return errInvalidLength
}
@@ -568,7 +564,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error {
rowLength += len(columnValue)
}

detailedf("Value %s, length=%d, off=%d", string(columnValue), columnLength, off)
pgsql.detailf("Value %s, length=%d, off=%d", string(columnValue), columnLength, off)
}

if off < len(buf) {
@@ -584,7 +580,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error {
}

func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool) {
detailedf("parseMessageExtendedQuery")
pgsql.detailf("parseMessageExtendedQuery")

// An extended query request contains:
// Parse
@@ -603,12 +599,12 @@ func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool)
length := readLength(s.data[s.parseOffset+1:])
if length < 4 {
// length should include the size of itself (int32)
detailedf("Invalid pgsql command length.")
pgsql.detailf("Invalid pgsql command length.")
return false, false
}
if len(s.data[s.parseOffset:]) <= length {
// wait for more
detailedf("Wait for more data")
pgsql.detailf("Wait for more data")
return true, false
}

@@ -647,7 +643,7 @@ func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool)
return true, true
default:
// shouldn't happen -> return error
logp.Warn("Pgsql parser expected extended query message, but received command of type %v", typ)
pgsql.log.Warnf("Pgsql parser expected extended query message, but received command of type %v", typ)
s.parseState = pgsqlStartState
return false, false
}
@@ -656,7 +652,7 @@ func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool)
return true, false
}

func isSpecialPgsqlCommand(data []byte) (bool, int, int) {
func (pgsql *pgsqlPlugin) isSpecialCommand(data []byte) (bool, int, int) {
if len(data) < 8 {
// 8 bytes required
return false, 0, 0
@@ -670,15 +666,15 @@ func isSpecialPgsqlCommand(data []byte) (bool, int, int) {

if length == 16 && code == 80877102 {
// Cancel Request
logp.Debug("pgsqldetailed", "Cancel Request, length=%d", length)
pgsql.debugf("Cancel Request, length=%d", length)
return true, length, cancelRequest
} else if length == 8 && code == 80877103 {
// SSL Request
logp.Debug("pgsqldetailed", "SSL Request, length=%d", length)
pgsql.debugf("SSL Request, length=%d", length)
return true, length, sslRequest
} else if code == 196608 {
// Startup Message
logp.Debug("pgsqldetailed", "Startup Message, length=%d", length)
pgsql.debugf("Startup Message, length=%d", length)
return true, length, startupMessage
}
return false, 0, 0
47 changes: 32 additions & 15 deletions packetbeat/protos/pgsql/pgsql.go
Original file line number Diff line number Diff line change
@@ -30,9 +30,13 @@ import (
"github.com/elastic/beats/packetbeat/procs"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"

"go.uber.org/zap"
)

type pgsqlPlugin struct {
log, debug, detail *logp.Logger
isDebug, isDetail bool

// config
ports []int
@@ -125,11 +129,6 @@ var (
errInvalidLength = errors.New("invalid length")
)

var (
debugf = logp.MakeDebug("pgsql")
detailedf = logp.MakeDebug("pgsqldetailed")
)

var (
unmatchedResponses = monitoring.NewInt(nil, "pgsql.unmatched_responses")
)
@@ -160,6 +159,11 @@ func New(
func (pgsql *pgsqlPlugin) init(results protos.Reporter, config *pgsqlConfig) error {
pgsql.setFromConfig(config)

pgsql.log = logp.NewLogger("pgsql")
pgsql.debug = logp.NewLogger("pgsql", zap.AddCallerSkip(1))
pgsql.detail = logp.NewLogger("pgsqldetailed", zap.AddCallerSkip(1))
pgsql.isDebug, pgsql.isDetail = logp.IsDebug("pgsql"), logp.IsDebug("pgsqldetailed")

pgsql.transactions = common.NewCache(
pgsql.transactionTimeout,
protos.DefaultTransactionHashSize)
@@ -187,6 +191,20 @@ func (pgsql *pgsqlPlugin) getTransaction(k common.HashableTCPTuple) []*pgsqlTran
return nil
}

//go:inline
func (pgsql *pgsqlPlugin) debugf(format string, v ...interface{}) {
if pgsql.isDebug {
pgsql.debug.Debugf(format, v...)
}
}

//go:inline
func (pgsql *pgsqlPlugin) detailf(format string, v ...interface{}) {
if pgsql.isDetail {
pgsql.detail.Debugf(format, v...)
}
}

func (pgsql *pgsqlPlugin) GetPorts() []int {
return pgsql.ports
}
@@ -237,13 +255,13 @@ func (pgsql *pgsqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple,
data: pkt.Payload,
message: &pgsqlMessage{ts: pkt.Ts},
}
logp.Debug("pgsqldetailed", "New stream created")
pgsql.detailf("New stream created")
} else {
// concatenate bytes
priv.data[dir].data = append(priv.data[dir].data, pkt.Payload...)
logp.Debug("pgsqldetailed", "Len data: %d cap data: %d", len(priv.data[dir].data), cap(priv.data[dir].data))
pgsql.detailf("Len data: %d cap data: %d", len(priv.data[dir].data), cap(priv.data[dir].data))
if len(priv.data[dir].data) > tcp.TCPMaxDataInStream {
debugf("Stream data too large, dropping TCP stream")
pgsql.debugf("Stream data too large, dropping TCP stream")
priv.data[dir] = nil
return priv
}
@@ -262,12 +280,11 @@ func (pgsql *pgsqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple,
}

ok, complete := pgsql.pgsqlMessageParser(priv.data[dir])
//logp.Debug("pgsqldetailed", "MessageParser returned ok=%v complete=%v", ok, complete)
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.data[dir] = nil
debugf("Ignore Postgresql message. Drop tcp stream. Try parsing with the next segment")
pgsql.debugf("Ignore Postgresql message. Drop tcp stream. Try parsing with the next segment")
return priv
}

@@ -333,7 +350,7 @@ func (pgsql *pgsqlPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8,
// next layer but mark it as incomplete.
stream := pgsqlData.data[dir]
if messageHasEnoughData(stream.message) {
debugf("Message not complete, but sending to the next layer")
pgsql.debugf("Message not complete, but sending to the next layer")
m := stream.message
m.toExport = true
m.end = stream.parseOffset
@@ -378,7 +395,7 @@ func (pgsql *pgsqlPlugin) receivedPgsqlRequest(msg *pgsqlMessage) {
// separated by ';'
queries := pgsqlQueryParser(msg.query)

logp.Debug("pgsqldetailed", "Queries (%d) :%s", len(queries), queries)
pgsql.debugf("Queries (%d) :%s", len(queries), queries)

transList := pgsql.getTransaction(tuple.Hashable())
if transList == nil {
@@ -414,7 +431,7 @@ func (pgsql *pgsqlPlugin) receivedPgsqlResponse(msg *pgsqlMessage) {
tuple := msg.tcpTuple
transList := pgsql.getTransaction(tuple.Hashable())
if transList == nil || len(transList) == 0 {
debugf("Response from unknown transaction. Ignoring.")
pgsql.debugf("Response from unknown transaction. Ignoring.")
unmatchedResponses.Add(1)
return
}
@@ -424,7 +441,7 @@ func (pgsql *pgsqlPlugin) receivedPgsqlResponse(msg *pgsqlMessage) {

// check if the request was received
if trans.pgsql == nil {
debugf("Response from unknown transaction. Ignoring.")
pgsql.debugf("Response from unknown transaction. Ignoring.")
unmatchedResponses.Add(1)
return
}
@@ -449,7 +466,7 @@ func (pgsql *pgsqlPlugin) receivedPgsqlResponse(msg *pgsqlMessage) {

pgsql.publishTransaction(trans)

debugf("Postgres transaction completed: %s\n%s", trans.pgsql, trans.responseRaw)
pgsql.debugf("Postgres transaction completed: %s\n%s", trans.pgsql, trans.responseRaw)
}

func (pgsql *pgsqlPlugin) publishTransaction(t *pgsqlTransaction) {