Skip to content

Commit

Permalink
use plugin-owned logp.Logger
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Oct 11, 2022
1 parent c6ef85e commit 8291590
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 66 deletions.
83 changes: 41 additions & 42 deletions packetbeat/protos/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type dnsPlugin struct {

results protos.Reporter // Channel where results are pushed.
watcher procs.ProcessesWatcher

logger *logp.Logger
}

// Transport protocol.
Expand Down Expand Up @@ -218,7 +220,7 @@ func init() {
}

func New(testMode bool, results protos.Reporter, watcher procs.ProcessesWatcher, cfg *conf.C) (protos.Plugin, error) {
p := &dnsPlugin{}
p := &dnsPlugin{logger: logp.NewLogger("dns")}
config := defaultConfig
if !testMode {
if err := cfg.Unpack(&config); err != nil {
Expand All @@ -240,7 +242,7 @@ func (dns *dnsPlugin) init(results protos.Reporter, watcher procs.ProcessesWatch
func(k common.Key, v common.Value) {
trans, ok := v.(*dnsTransaction)
if !ok {
logp.Err("Expired value is not a *DnsTransaction.")
dns.logger.Error("Expired value is not a *DnsTransaction.")
return
}
dns.expireTransaction(trans)
Expand Down Expand Up @@ -291,14 +293,14 @@ func (dns *dnsPlugin) ConnectionTimeout() time.Duration {
}

func (dns *dnsPlugin) receivedDNSRequest(tuple *dnsTuple, msg *dnsMessage) {
logp.Debug("dns", "Processing query. %s", tuple)
dns.logger.Debugf("Processing query. %s", tuple)

trans := dns.deleteTransaction(tuple.hashable())
if trans != nil {
// This happens if a client puts multiple requests in flight
// with the same ID.
trans.notes = append(trans.notes, duplicateQueryMsg.Error())
logp.Debug("dns", "%v %s", duplicateQueryMsg, tuple)
dns.logger.Debugf("%v %s", duplicateQueryMsg, tuple)
dns.publishTransaction(trans)
dns.deleteTransaction(trans.tuple.hashable())
}
Expand All @@ -307,21 +309,21 @@ func (dns *dnsPlugin) receivedDNSRequest(tuple *dnsTuple, msg *dnsMessage) {

if tuple.transport == transportUDP && (msg.data.IsEdns0() != nil) && msg.length > maxDNSPacketSize {
trans.notes = append(trans.notes, udpPacketTooLarge.Error())
logp.Debug("dns", "%v", udpPacketTooLarge)
dns.logger.Debugf("%v", udpPacketTooLarge)
}

dns.transactions.Put(tuple.hashable(), trans)
trans.request = msg
}

func (dns *dnsPlugin) receivedDNSResponse(tuple *dnsTuple, msg *dnsMessage) {
logp.Debug("dns", "Processing response. %s", tuple)
dns.logger.Debugf("Processing response. %s", tuple)

trans := dns.getTransaction(tuple.revHashable())
if trans == nil {
trans = newTransaction(msg.ts, tuple.reverse(), msg.cmdlineTuple.Reverse())
trans.notes = append(trans.notes, orphanedResponse.Error())
logp.Debug("dns", "%v %s", orphanedResponse, tuple)
dns.logger.Debugf("%v %s", orphanedResponse, tuple)
unmatchedResponses.Add(1)
}

Expand All @@ -331,7 +333,7 @@ func (dns *dnsPlugin) receivedDNSResponse(tuple *dnsTuple, msg *dnsMessage) {
respIsEdns := msg.data.IsEdns0() != nil
if !respIsEdns && msg.length > maxDNSPacketSize {
trans.notes = append(trans.notes, udpPacketTooLarge.responseError())
logp.Debug("dns", "%s", udpPacketTooLarge.responseError())
dns.logger.Debugf("%s", udpPacketTooLarge.responseError())
}

request := trans.request
Expand All @@ -341,10 +343,10 @@ func (dns *dnsPlugin) receivedDNSResponse(tuple *dnsTuple, msg *dnsMessage) {
switch {
case reqIsEdns && !respIsEdns:
trans.notes = append(trans.notes, respEdnsNoSupport.Error())
logp.Debug("dns", "%v %s", respEdnsNoSupport, tuple)
dns.logger.Debugf("%v %s", respEdnsNoSupport, tuple)
case !reqIsEdns && respIsEdns:
trans.notes = append(trans.notes, respEdnsUnexpected.Error())
logp.Debug("dns", "%v %s", respEdnsUnexpected, tuple)
dns.logger.Debugf("%v %s", respEdnsUnexpected, tuple)
}
}
}
Expand All @@ -358,7 +360,7 @@ func (dns *dnsPlugin) publishTransaction(t *dnsTransaction) {
return
}

logp.Debug("dns", "Publishing transaction. %s", &t.tuple)
dns.logger.Debugf("Publishing transaction. %s", &t.tuple)

evt, pbf := pb.NewBeatEvent(t.ts)

Expand Down Expand Up @@ -387,18 +389,17 @@ func (dns *dnsPlugin) publishTransaction(t *dnsTransaction) {
fields["query"] = dnsQuestionToString(t.request.data.Question[0])
fields["resource"] = t.request.data.Question[0].Name
}
addDNSToMapStr(dnsEvent, pbf, t.response.data, dns.includeAuthorities,
dns.includeAdditionals)
addDNSToMapStr(dnsEvent, pbf, t.response.data, dns.includeAuthorities, dns.includeAdditionals, dns.logger)

if t.response.data.Rcode == 0 {
fields["status"] = common.OK_STATUS
}

if dns.sendRequest {
fields["request"] = dnsToString(t.request.data)
fields["request"] = dnsToString(t.request.data, dns.logger)
}
if dns.sendResponse {
fields["response"] = dnsToString(t.response.data)
fields["response"] = dnsToString(t.response.data, dns.logger)
}
} else if t.request != nil {
pbf.Source.Bytes = int64(t.request.length)
Expand All @@ -410,11 +411,10 @@ func (dns *dnsPlugin) publishTransaction(t *dnsTransaction) {
fields["query"] = dnsQuestionToString(t.request.data.Question[0])
fields["resource"] = t.request.data.Question[0].Name
}
addDNSToMapStr(dnsEvent, pbf, t.request.data, dns.includeAuthorities,
dns.includeAdditionals)
addDNSToMapStr(dnsEvent, pbf, t.request.data, dns.includeAuthorities, dns.includeAdditionals, dns.logger)

if dns.sendRequest {
fields["request"] = dnsToString(t.request.data)
fields["request"] = dnsToString(t.request.data, dns.logger)
}
} else if t.response != nil {
pbf.Destination.Bytes = int64(t.response.length)
Expand All @@ -426,10 +426,9 @@ func (dns *dnsPlugin) publishTransaction(t *dnsTransaction) {
fields["query"] = dnsQuestionToString(t.response.data.Question[0])
fields["resource"] = t.response.data.Question[0].Name
}
addDNSToMapStr(dnsEvent, pbf, t.response.data, dns.includeAuthorities,
dns.includeAdditionals)
addDNSToMapStr(dnsEvent, pbf, t.response.data, dns.includeAuthorities, dns.includeAdditionals, dns.logger)
if dns.sendResponse {
fields["response"] = dnsToString(t.response.data)
fields["response"] = dnsToString(t.response.data, dns.logger)
}
}

Expand All @@ -438,13 +437,13 @@ func (dns *dnsPlugin) publishTransaction(t *dnsTransaction) {

func (dns *dnsPlugin) expireTransaction(t *dnsTransaction) {
t.notes = append(t.notes, noResponse.Error())
logp.Debug("dns", "%v %s", noResponse, &t.tuple)
dns.logger.Debugf("%v %s", noResponse, &t.tuple)
dns.publishTransaction(t)
unmatchedRequests.Add(1)
}

// Adds the DNS message data to the supplied MapStr.
func addDNSToMapStr(m mapstr.M, pbf *pb.Fields, dns *mkdns.Msg, authority bool, additional bool) {
func addDNSToMapStr(m mapstr.M, pbf *pb.Fields, dns *mkdns.Msg, authority bool, additional bool, logger *logp.Logger) {
m["id"] = dns.Id
m["op_code"] = dnsOpCodeToString(dns.Opcode)

Expand Down Expand Up @@ -526,7 +525,7 @@ func addDNSToMapStr(m mapstr.M, pbf *pb.Fields, dns *mkdns.Msg, authority bool,
m["answers_count"] = len(dns.Answer)
if len(dns.Answer) > 0 {
var resolvedIPs []string
m["answers"], resolvedIPs = rrsToMapStrs(dns.Answer, true)
m["answers"], resolvedIPs = rrsToMapStrs(dns.Answer, true, logger)
if len(resolvedIPs) > 0 {
m["resolved_ip"] = resolvedIPs
pbf.AddIP(resolvedIPs...)
Expand All @@ -535,7 +534,7 @@ func addDNSToMapStr(m mapstr.M, pbf *pb.Fields, dns *mkdns.Msg, authority bool,

m["authorities_count"] = len(dns.Ns)
if authority && len(dns.Ns) > 0 {
m["authorities"], _ = rrsToMapStrs(dns.Ns, false)
m["authorities"], _ = rrsToMapStrs(dns.Ns, false, logger)
}

if rrOPT != nil {
Expand All @@ -544,7 +543,7 @@ func addDNSToMapStr(m mapstr.M, pbf *pb.Fields, dns *mkdns.Msg, authority bool,
m["additionals_count"] = len(dns.Extra)
}
if additional && len(dns.Extra) > 0 {
rrsMapStrs, _ := rrsToMapStrs(dns.Extra, false)
rrsMapStrs, _ := rrsToMapStrs(dns.Extra, false, logger)
// We do not want OPT RR to appear in the 'additional' section,
// that's why rrsMapStrs could be empty even though len(dns.Extra) > 0
if len(rrsMapStrs) > 0 {
Expand Down Expand Up @@ -589,13 +588,13 @@ func optToMapStr(rrOPT *mkdns.OPT) mapstr.M {

// rrsToMapStr converts an slice of RR's to an slice of MapStr's and optionally
// returns a list of the IP addresses found in the resource records.
func rrsToMapStrs(records []mkdns.RR, ipList bool) ([]mapstr.M, []string) {
func rrsToMapStrs(records []mkdns.RR, ipList bool, logger *logp.Logger) ([]mapstr.M, []string) {
var allIPs []string
mapStrSlice := make([]mapstr.M, 0, len(records))
for _, rr := range records {
rrHeader := rr.Header()

mapStr, ips := rrToMapStr(rr, ipList)
mapStr, ips := rrToMapStr(rr, ipList, logger)
if len(mapStr) == 0 { // OPT pseudo-RR returns an empty MapStr
continue
}
Expand All @@ -618,11 +617,11 @@ func rrsToMapStrs(records []mkdns.RR, ipList bool) ([]mapstr.M, []string) {
//
// TODO An improvement would be to replace 'data' by the real field name
// It would require some changes in unit tests
func rrToString(rr mkdns.RR) string {
func rrToString(rr mkdns.RR, logger *logp.Logger) string {
var st string
var keys []string

mapStr, _ := rrToMapStr(rr, false)
mapStr, _ := rrToMapStr(rr, false, logger)
data, ok := mapStr["data"]
delete(mapStr, "data")

Expand Down Expand Up @@ -655,7 +654,7 @@ func rrToString(rr mkdns.RR) string {
return b.String()
}

func rrToMapStr(rr mkdns.RR, ipList bool) (mapstr.M, []string) {
func rrToMapStr(rr mkdns.RR, ipList bool, logger *logp.Logger) (mapstr.M, []string) {
mapStr := mapstr.M{}
rrType := rr.Header().Rrtype

Expand All @@ -670,17 +669,17 @@ func rrToMapStr(rr mkdns.RR, ipList bool) (mapstr.M, []string) {
switch x := rr.(type) {
default:
// We don't have special handling for this type
logp.Debug("dns", "No special handling for RR type %s", dnsTypeToString(rrType))
logger.Debugf("No special handling for RR type %s", dnsTypeToString(rrType))
unsupportedRR := new(mkdns.RFC3597)
err := unsupportedRR.ToRFC3597(x)
if err == nil {
rData, err := hexStringToString(unsupportedRR.Rdata)
mapStr["data"] = rData
if err != nil {
logp.Debug("dns", "%v", err)
logger.Debugf("%v", err)
}
} else {
logp.Debug("dns", "Rdata for the unhandled RR type %s could not be fetched", dnsTypeToString(rrType))
logger.Debugf("Rdata for the unhandled RR type %s could not be fetched", dnsTypeToString(rrType))
}

// Don't attempt to render IPs for answers that are incomplete.
Expand Down Expand Up @@ -734,11 +733,11 @@ func rrToMapStr(rr mkdns.RR, ipList bool) (mapstr.M, []string) {
mapStr["data"] = trimRightDot(x.Ptr)
case *mkdns.RFC3597:
// Miekg/dns lib doesn't handle this type
logp.Debug("dns", "Unknown RR type %s", dnsTypeToString(rrType))
logger.Debugf("Unknown RR type %s", dnsTypeToString(rrType))
rData, err := hexStringToString(x.Rdata)
mapStr["data"] = rData
if err != nil {
logp.Debug("dns", "%v", err)
logger.Debugf("%v", err)
}
case *mkdns.RRSIG:
mapStr["type_covered"] = dnsTypeToString(x.TypeCovered)
Expand Down Expand Up @@ -780,16 +779,16 @@ func dnsQuestionToString(q mkdns.Question) string {

// rrsToString converts an array of RR's to a
// string.
func rrsToString(r []mkdns.RR) string {
func rrsToString(r []mkdns.RR, logger *logp.Logger) string {
var rrStrs []string
for _, rr := range r {
rrStrs = append(rrStrs, rrToString(rr))
rrStrs = append(rrStrs, rrToString(rr, logger))
}
return strings.Join(rrStrs, "; ")
}

// dnsToString converts a DNS message to a string.
func dnsToString(dns *mkdns.Msg) string {
func dnsToString(dns *mkdns.Msg, logger *logp.Logger) string {
var msgType string
if dns.Response {
msgType = "response"
Expand Down Expand Up @@ -833,17 +832,17 @@ func dnsToString(dns *mkdns.Msg) string {

if len(dns.Answer) > 0 {
a = append(a, fmt.Sprintf("ANSWER %s",
rrsToString(dns.Answer)))
rrsToString(dns.Answer, logger)))
}

if len(dns.Ns) > 0 {
a = append(a, fmt.Sprintf("AUTHORITY %s",
rrsToString(dns.Ns)))
rrsToString(dns.Ns, logger)))
}

if len(dns.Extra) > 0 {
a = append(a, fmt.Sprintf("ADDITIONAL %s",
rrsToString(dns.Extra)))
rrsToString(dns.Extra, logger)))
}

return strings.Join(a, "; ")
Expand Down
24 changes: 12 additions & 12 deletions packetbeat/protos/dns/dns_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ type dnsConnectionData struct {
}

func (dns *dnsPlugin) Parse(pkt *protos.Packet, tcpTuple *common.TCPTuple, dir uint8, private protos.ProtocolData) protos.ProtocolData {
defer logp.Recover("Dns ParseTcp")
defer dns.logger.Recover("Dns ParseTcp")

logp.Debug("dns", "Parsing packet addressed with %s of length %d.", &pkt.Tuple, len(pkt.Payload))
dns.logger.Debugf("dns", "Parsing packet addressed with %s of length %d.", &pkt.Tuple, len(pkt.Payload))

conn := ensureDNSConnection(private)
conn := ensureDNSConnection(private, dns.logger)

conn = dns.doParse(conn, pkt, tcpTuple, dir)
if conn == nil {
Expand All @@ -67,18 +67,18 @@ func (dns *dnsPlugin) Parse(pkt *protos.Packet, tcpTuple *common.TCPTuple, dir u
return conn
}

func ensureDNSConnection(private protos.ProtocolData) *dnsConnectionData {
func ensureDNSConnection(private protos.ProtocolData, logger *logp.Logger) *dnsConnectionData {
if private == nil {
return &dnsConnectionData{}
}

conn, ok := private.(*dnsConnectionData)
if !ok {
logp.Warn("Dns connection data type error, create new one")
logger.Warn("Dns connection data type error, create new one")
return &dnsConnectionData{}
}
if conn == nil {
logp.Warn("Unexpected: dns connection data not set, create new one")
logger.Warn("Unexpected: dns connection data not set, create new one")
return &dnsConnectionData{}
}

Expand All @@ -99,23 +99,23 @@ func (dns *dnsPlugin) doParse(conn *dnsConnectionData, pkt *protos.Packet, tcpTu

stream.rawData = append(stream.rawData, payload...)
if len(stream.rawData) > tcp.TCPMaxDataInStream {
logp.Debug("dns", "Stream data too large, dropping DNS stream")
dns.logger.Debugf("dns", "Stream data too large, dropping DNS stream")
conn.data[dir] = nil
return conn
}
}
decodedData, err := stream.handleTCPRawData()
if err != nil {
if err == incompleteMsg { //nolint:errorlint // incompleteMsg is not wrapped.
logp.Debug("dns", "Waiting for more raw data")
dns.logger.Debugf("dns", "Waiting for more raw data")
return conn
}

if dir == tcp.TCPDirectionReverse {
dns.publishResponseError(conn, err)
}

logp.Debug("dns", "%v addresses %s, length %d", err, tcpTuple, len(stream.rawData))
dns.logger.Debugf("dns", "%v addresses %s, length %d", err, tcpTuple, len(stream.rawData))

// This means that malformed requests or responses are being sent...
// TODO: publish the situation also if Request
Expand Down Expand Up @@ -187,7 +187,7 @@ func (dns *dnsPlugin) ReceivedFin(tcpTuple *common.TCPTuple, dir uint8, private
dns.publishResponseError(conn, err)
}

logp.Debug("dns", "%v addresses %s, length %d", err, tcpTuple, len(stream.rawData))
dns.logger.Debugf("dns", "%v addresses %s, length %d", err, tcpTuple, len(stream.rawData))

return conn
}
Expand Down Expand Up @@ -216,8 +216,8 @@ func (dns *dnsPlugin) GapInStream(tcpTuple *common.TCPTuple, dir uint8, nbytes i
dns.publishResponseError(conn, err)
}

logp.Debug("dns", "%v addresses %s, length %d", err, tcpTuple, len(stream.rawData))
logp.Debug("dns", "Dropping the stream %s", tcpTuple)
dns.logger.Debugf("dns", "%v addresses %s, length %d", err, tcpTuple, len(stream.rawData))
dns.logger.Debugf("dns", "Dropping the stream %s", tcpTuple)

// drop the stream because it is binary Data and it would be unexpected to have a decodable message later
return private, true
Expand Down
Loading

0 comments on commit 8291590

Please sign in to comment.