diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 80fead3fb4c..00383fb51cf 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -243,6 +243,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Packetbeat* +- Prevent incorrect use of AMQP protocol parsing from causing silent failure. {pull}29017[29017] +- Fix error handling in MongoDB protocol parsing. {pull}29017[29017] *Winlogbeat* diff --git a/packetbeat/cmd/root.go b/packetbeat/cmd/root.go index 808de836fb3..6032089c9fc 100644 --- a/packetbeat/cmd/root.go +++ b/packetbeat/cmd/root.go @@ -50,7 +50,7 @@ var RootCmd *cmd.BeatsRootCmd // PacketbeatSettings contains the default settings for packetbeat func PacketbeatSettings() instance.Settings { - var runFlags = pflag.NewFlagSet(Name, pflag.ExitOnError) + runFlags := pflag.NewFlagSet(Name, pflag.ExitOnError) runFlags.AddGoFlag(flag.CommandLine.Lookup("I")) runFlags.AddGoFlag(flag.CommandLine.Lookup("t")) runFlags.AddGoFlag(flag.CommandLine.Lookup("O")) diff --git a/packetbeat/config/agent.go b/packetbeat/config/agent.go index 36e3c977b44..34df1721597 100644 --- a/packetbeat/config/agent.go +++ b/packetbeat/config/agent.go @@ -66,7 +66,7 @@ func (i agentInput) addProcessorsAndIndex(cfg *common.Config) (*common.Config, e mergeConfig, err := common.NewConfigFrom(common.MapStr{ "index": datastreamConfig.Datastream.Type + "-" + datastreamConfig.Datastream.Dataset + "-" + namespace, "processors": append([]common.MapStr{ - common.MapStr{ + { "add_fields": common.MapStr{ "target": "data_stream", "fields": common.MapStr{ @@ -76,7 +76,7 @@ func (i agentInput) addProcessorsAndIndex(cfg *common.Config) (*common.Config, e }, }, }, - common.MapStr{ + { "add_fields": common.MapStr{ "target": "event", "fields": common.MapStr{ diff --git a/packetbeat/decoder/decoder.go b/packetbeat/decoder/decoder.go index 50e8f595481..ae8eeb9d84b 100644 --- a/packetbeat/decoder/decoder.go +++ b/packetbeat/decoder/decoder.go @@ -87,7 +87,8 @@ func New( d := Decoder{ flows: f, decoders: make(map[gopacket.LayerType]gopacket.DecodingLayer), - icmp4Proc: icmp4, icmp6Proc: icmp6, tcpProc: tcp, udpProc: udp} + icmp4Proc: icmp4, icmp6Proc: icmp6, tcpProc: tcp, udpProc: udp, + } d.stD1Q.init(&d.d1q[0], &d.d1q[1]) d.stIP4.init(&d.ip4[0], &d.ip4[1]) d.stIP6.init(&d.ip6[0], &d.ip6[1]) diff --git a/packetbeat/flows/flows_test.go b/packetbeat/flows/flows_test.go index 601613e61d1..9cee1c59e2d 100644 --- a/packetbeat/flows/flows_test.go +++ b/packetbeat/flows/flows_test.go @@ -56,12 +56,16 @@ func TestFlowsCounting(t *testing.T) { assert.NoError(t, err) uint1, err := module.NewUint("uint1") + assert.NoError(t, err) uint2, err := module.NewUint("uint2") + assert.NoError(t, err) int1, err := module.NewInt("int1") + assert.NoError(t, err) int2, err := module.NewInt("int2") + assert.NoError(t, err) float1, err := module.NewFloat("float1") + assert.NoError(t, err) float2, err := module.NewFloat("float2") - assert.NoError(t, err) pub := &flowsChan{make(chan []beat.Event, 1)} diff --git a/packetbeat/flows/worker.go b/packetbeat/flows/worker.go index f0080cf68d2..b339a75fa4d 100644 --- a/packetbeat/flows/worker.go +++ b/packetbeat/flows/worker.go @@ -107,7 +107,7 @@ func makeWorker( if align > 0 { // round time to nearest 10 seconds for alignment aligned := time.Unix(((time.Now().Unix()+(align-1))/align)*align, 0) - waitStart := aligned.Sub(time.Now()) + waitStart := time.Until(aligned) debugf("worker wait start(%v): %v", aligned, waitStart) if cont := w.sleep(waitStart); !cont { return diff --git a/packetbeat/flows/worker_test.go b/packetbeat/flows/worker_test.go index 3bec75f2fe3..520d107f9ab 100644 --- a/packetbeat/flows/worker_test.go +++ b/packetbeat/flows/worker_test.go @@ -33,10 +33,8 @@ import ( "github.com/elastic/beats/v7/packetbeat/procs" ) -var ( - // Use `go test -data` to update sample event files. - dataFlag = flag.Bool("data", false, "Write updated data.json files") -) +// Use `go test -data` to update sample event files. +var dataFlag = flag.Bool("data", false, "Write updated data.json files") func TestCreateEvent(t *testing.T) { logp.TestingSetup() @@ -124,7 +122,7 @@ func TestCreateEvent(t *testing.T) { t.Fatal(err) } - if err := ioutil.WriteFile("../_meta/sample_outputs/flow.json", output, 0644); err != nil { + if err := ioutil.WriteFile("../_meta/sample_outputs/flow.json", output, 0o644); err != nil { t.Fatal(err) } } diff --git a/packetbeat/pb/event.go b/packetbeat/pb/event.go index 683e22253be..4ba7e4acf15 100644 --- a/packetbeat/pb/event.go +++ b/packetbeat/pb/event.go @@ -237,14 +237,14 @@ func (f *Fields) ComputeValues(localIPs []net.IP, internalNetworks []string) err } // network.community_id - switch { - case f.Network.Transport == "udp": + switch f.Network.Transport { + case "udp": flow.Protocol = 17 - case f.Network.Transport == "tcp": + case "tcp": flow.Protocol = 6 - case f.Network.Transport == "icmp": + case "icmp": flow.Protocol = 1 - case f.Network.Transport == "ipv6-icmp": + case "ipv6-icmp": flow.Protocol = 58 } flow.ICMP.Type = f.ICMPType diff --git a/packetbeat/processor/add_kubernetes_metadata/indexers.go b/packetbeat/processor/add_kubernetes_metadata/indexers.go index 474f111e97f..3e547e3a8c1 100644 --- a/packetbeat/processor/add_kubernetes_metadata/indexers.go +++ b/packetbeat/processor/add_kubernetes_metadata/indexers.go @@ -26,14 +26,14 @@ func init() { // Register default indexers cfg := common.NewConfig() - //Add IP Port Indexer as a default indexer + // Add IP Port Indexer as a default indexer kubernetes.Indexing.AddDefaultIndexerConfig(kubernetes.IPPortIndexerName, *cfg) formatCfg, err := common.NewConfigFrom(map[string]interface{}{ "format": "%{[ip]}:%{[port]}", }) if err == nil { - //Add field matcher with field to lookup as metricset.host + // Add field matcher with field to lookup as metricset.host kubernetes.Indexing.AddDefaultMatcherConfig(kubernetes.FieldFormatMatcherName, *formatCfg) } } diff --git a/packetbeat/procs/procs.go b/packetbeat/procs/procs.go index bf3daab9ff2..a3d031ec72f 100644 --- a/packetbeat/procs/procs.go +++ b/packetbeat/procs/procs.go @@ -213,7 +213,7 @@ func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) { if logp.HasSelector("procsdetailed") { start := time.Now() defer func() { - logp.Debug("procsdetailed", "updateMap() took %v", time.Now().Sub(start)) + logp.Debug("procsdetailed", "updateMap() took %v", time.Since(start)) }() } diff --git a/packetbeat/procs/procs_linux_test.go b/packetbeat/procs/procs_linux_test.go index 39c8a937741..e36bd50a208 100644 --- a/packetbeat/procs/procs_linux_test.go +++ b/packetbeat/procs/procs_linux_test.go @@ -39,14 +39,14 @@ func createFakeDirectoryStructure(prefix string, files []testProcFile) error { var err error for _, file := range files { dir := filepath.Dir(file.path) - err = os.MkdirAll(filepath.Join(prefix, dir), 0755) + err = os.MkdirAll(filepath.Join(prefix, dir), 0o755) if err != nil { return err } if !file.isLink { err = ioutil.WriteFile(filepath.Join(prefix, file.path), - []byte(file.contents), 0644) + []byte(file.contents), 0o644) if err != nil { return err } diff --git a/packetbeat/procs/procs_windows.go b/packetbeat/procs/procs_windows.go index 50807691efa..ee3eee1a28d 100644 --- a/packetbeat/procs/procs_windows.go +++ b/packetbeat/procs/procs_windows.go @@ -42,13 +42,17 @@ type extractor interface { Size() int } -type callbackFn func(net.IP, uint16, int) -type extractorFactory func(fn callbackFn) extractor +type ( + callbackFn func(net.IP, uint16, int) + extractorFactory func(fn callbackFn) extractor +) -type tcpRowOwnerPIDExtractor callbackFn -type tcp6RowOwnerPIDExtractor callbackFn -type udpRowOwnerPIDExtractor callbackFn -type udp6RowOwnerPIDExtractor callbackFn +type ( + tcpRowOwnerPIDExtractor callbackFn + tcp6RowOwnerPIDExtractor callbackFn + udpRowOwnerPIDExtractor callbackFn + udp6RowOwnerPIDExtractor callbackFn +) var tablesByTransport = map[applayer.Transport][]struct { family uint32 diff --git a/packetbeat/procs/procs_windows_test.go b/packetbeat/procs/procs_windows_test.go index 51cc8391d68..4abfe359b71 100644 --- a/packetbeat/procs/procs_windows_test.go +++ b/packetbeat/procs/procs_windows_test.go @@ -42,21 +42,33 @@ func TestParseTableRaw(t *testing.T) { expected []portProcMapping mustErr bool }{ - {"Empty table IPv4", IPv4, - "00000000", nil, false}, - {"Empty table IPv6", IPv6, - "00000000", nil, false}, - {"Short table (no length)", IPv4, - "000000", nil, true}, - {"Short table (partial entry)", IPv6, - "01000000AAAAAAAAAAAAAAAAAAAA", nil, true}, - {"One entry (IPv4)", IPv4, + { + "Empty table IPv4", IPv4, + "00000000", nil, false, + }, + { + "Empty table IPv6", IPv6, + "00000000", nil, false, + }, + { + "Short table (no length)", IPv4, + "000000", nil, true, + }, + { + "Short table (partial entry)", IPv6, + "01000000AAAAAAAAAAAAAAAAAAAA", nil, true, + }, + { + "One entry (IPv4)", IPv4, "01000000" + "77777777AAAAAAAA12340000BBBBBBBBFFFF0000CCCCCCCC", []portProcMapping{ {endpoint: endpoint{address: "170.170.170.170", port: 0x1234}, pid: int(pid)}, - }, false}, - {"Two entries (IPv6)", IPv6, + }, + false, + }, + { + "Two entries (IPv6)", IPv6, "02000000" + // First entry "11112222333344445555666677778888F0F0F0F0" + @@ -76,7 +88,9 @@ func TestParseTableRaw(t *testing.T) { []portProcMapping{ {endpoint: endpoint{address: "1111:2222:3333:4444:5555:6666:7777:8888", port: 0xABCD}, pid: 1}, {endpoint: endpoint{address: "aaaa:aaaa:aaaa:aaaa:aaaa:aaaa:aaaa:aaaa", port: 0}, pid: 0xffff}, - }, false}, + }, + false, + }, } { msg := fmt.Sprintf("Test case #%d: %s", idx+1, testCase.name) table, err := hex.DecodeString(testCase.raw) diff --git a/packetbeat/procs/zsyscall_windows.go b/packetbeat/procs/zsyscall_windows.go index b313698211e..f938c08f792 100644 --- a/packetbeat/procs/zsyscall_windows.go +++ b/packetbeat/procs/zsyscall_windows.go @@ -34,9 +34,7 @@ const ( errnoERROR_IO_PENDING = 997 ) -var ( - errERROR_IO_PENDING error = syscall.Errno(errnoERROR_IO_PENDING) -) +var errERROR_IO_PENDING error = syscall.Errno(errnoERROR_IO_PENDING) // errnoErr returns common boxed Errno values, to prevent // allocations at runtime. diff --git a/packetbeat/protos/amqp/amqp.go b/packetbeat/protos/amqp/amqp.go index f0c1e26dd28..2402b9150e3 100644 --- a/packetbeat/protos/amqp/amqp.go +++ b/packetbeat/protos/amqp/amqp.go @@ -50,7 +50,7 @@ type amqpPlugin struct { results protos.Reporter watcher procs.ProcessesWatcher - //map containing functions associated with different method numbers + // map containing functions associated with different method numbers methodMap map[codeClass]map[codeMethod]amqpMethod } @@ -87,7 +87,7 @@ func (amqp *amqpPlugin) init(results protos.Reporter, watcher procs.ProcessesWat amqp.initMethodMap() amqp.setFromConfig(config) - if amqp.hideConnectionInformation == false { + if !amqp.hideConnectionInformation { amqp.addConnectionMethods() } amqp.transactions = common.NewCache( @@ -194,8 +194,8 @@ func (amqp *amqpPlugin) ConnectionTimeout() time.Duration { } func (amqp *amqpPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple, - dir uint8, private protos.ProtocolData) protos.ProtocolData { - + dir uint8, private protos.ProtocolData, +) protos.ProtocolData { defer logp.Recover("ParseAmqp exception") detailedf("Parse method triggered") @@ -284,7 +284,7 @@ func (amqp *amqpPlugin) handleAmqpRequest(msg *amqpMessage) { } else { trans.request = msg.method } - //length = message + 4 bytes header + frame end octet + // length = message + 4 bytes header + frame end octet trans.bytesIn = msg.bodySize + 12 if msg.fields != nil { trans.amqp = msg.fields @@ -292,9 +292,9 @@ func (amqp *amqpPlugin) handleAmqpRequest(msg *amqpMessage) { trans.amqp = common.MapStr{} } - //if error or exception, publish it now. sometimes client or server never send - //an ack message and the error is lost. Also, if nowait flag set, don't expect - //any response and publish + // if error or exception, publish it now. sometimes client or server never send + // an ack message and the error is lost. Also, if nowait flag set, don't expect + // any response and publish if isAsynchronous(trans) { amqp.publishTransaction(trans) debugf("Amqp transaction completed") @@ -317,9 +317,9 @@ func (amqp *amqpPlugin) handleAmqpResponse(msg *amqpMessage) { return } - //length = message + 4 bytes class/method + frame end octet + header + // length = message + 4 bytes class/method + frame end octet + header trans.bytesOut = msg.bodySize + 12 - //merge the both fields from request and response + // merge the both fields from request and response trans.amqp.Update(msg.fields) trans.response = common.OK_STATUS @@ -344,8 +344,8 @@ func (amqp *amqpPlugin) handleAmqpResponse(msg *amqpMessage) { func (amqp *amqpPlugin) expireTransaction(trans *amqpTransaction) { debugf("Transaction expired") - //possibility of a connection.close or channel.close method that didn't get an - //ok answer. Let's publish it. + // possibility of a connection.close or channel.close method that didn't get an + // ok answer. Let's publish it. if isCloseError(trans) { trans.notes = append(trans.notes, "Close-ok method not received by sender") amqp.publishTransaction(trans) @@ -354,8 +354,8 @@ func (amqp *amqpPlugin) expireTransaction(trans *amqpTransaction) { amqp.transactions.Delete(trans.tuple.Hashable()) } -//This method handles published messages from clients. Being an async -//process, the method, header and body frames are regrouped in one transaction +// This method handles published messages from clients. Being an async +// process, the method, header and body frames are regrouped in one transaction func (amqp *amqpPlugin) handlePublishing(client *amqpMessage) { tuple := client.tcpTuple trans := amqp.getTransaction(tuple.Hashable()) @@ -369,8 +369,8 @@ func (amqp *amqpPlugin) handlePublishing(client *amqpMessage) { trans.src, trans.dst = common.MakeEndpointPair(client.tcpTuple.BaseTuple, client.cmdlineTuple) trans.method = client.method - //for publishing and delivering, bytes in and out represent the length of the - //message itself + // for publishing and delivering, bytes in and out represent the length of the + // message itself trans.bytesIn = client.bodySize if client.bodySize > uint64(amqp.maxBodyLength) { @@ -384,13 +384,13 @@ func (amqp *amqpPlugin) handlePublishing(client *amqpMessage) { trans.amqp = client.fields amqp.publishTransaction(trans) debugf("Amqp transaction completed") - //delete trans from map + // delete trans from map amqp.transactions.Delete(trans.tuple.Hashable()) } -//This method handles delivered messages via basic.deliver and basic.get-ok AND -//returned messages to clients. Being an async process, the method, header and -//body frames are regrouped in one transaction +// This method handles delivered messages via basic.deliver and basic.get-ok AND +// returned messages to clients. Being an async process, the method, header and +// body frames are regrouped in one transaction func (amqp *amqpPlugin) handleDelivering(server *amqpMessage) { tuple := server.tcpTuple trans := amqp.getTransaction(tuple.Hashable()) @@ -403,8 +403,8 @@ func (amqp *amqpPlugin) handleDelivering(server *amqpMessage) { trans.ts = server.ts trans.src, trans.dst = common.MakeEndpointPair(server.tcpTuple.BaseTuple, server.cmdlineTuple) - //for publishing and delivering, bytes in and out represent the length of the - //message itself + // for publishing and delivering, bytes in and out represent the length of the + // message itself trans.bytesOut = server.bodySize if server.bodySize > uint64(amqp.maxBodyLength) { @@ -422,7 +422,7 @@ func (amqp *amqpPlugin) handleDelivering(server *amqpMessage) { amqp.publishTransaction(trans) debugf("Amqp transaction completed") - //delete trans from map + // delete trans from map amqp.transactions.Delete(trans.tuple.Hashable()) } @@ -459,7 +459,7 @@ func (amqp *amqpPlugin) publishTransaction(t *amqpTransaction) { fields["user.id"] = userID } - //let's try to convert request/response to a readable format + // let's try to convert request/response to a readable format if amqp.sendRequest { if t.method == "basic.publish" { if t.toString { @@ -503,7 +503,7 @@ func (amqp *amqpPlugin) publishTransaction(t *amqpTransaction) { amqp.results(evt) } -//function to check if method is async or not +// function to check if method is async or not func isAsynchronous(trans *amqpTransaction) bool { if val, ok := trans.amqp["no-wait"]; ok && val == true { return true @@ -514,7 +514,7 @@ func isAsynchronous(trans *amqpTransaction) bool { trans.method == "basic.nack" } -//function to convert a body slice into a readable format +// function to convert a body slice into a readable format func bodyToString(data []byte) string { ret := make([]string, len(data)) for i, c := range data { @@ -523,7 +523,7 @@ func bodyToString(data []byte) string { return strings.Join(ret, " ") } -//function used to check if a body message can be converted to readable string +// function used to check if a body message can be converted to readable string func isStringable(m *amqpMessage) bool { stringable := false diff --git a/packetbeat/protos/amqp/amqp_fields.go b/packetbeat/protos/amqp/amqp_fields.go index ff4f7c382b5..d7536572978 100644 --- a/packetbeat/protos/amqp/amqp_fields.go +++ b/packetbeat/protos/amqp/amqp_fields.go @@ -28,9 +28,9 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" ) +// getTable updates fields with the table data at the given offset. +// fields must be non_nil on entry. func getTable(fields common.MapStr, data []byte, offset uint32) (next uint32, err bool, exists bool) { - ret := common.MapStr{} - length := binary.BigEndian.Uint32(data[offset : offset+4]) // size declared too big @@ -39,23 +39,20 @@ func getTable(fields common.MapStr, data []byte, offset uint32) (next uint32, er } if length > 0 { exists = true - err := fieldUnmarshal(ret, data[offset+4:offset+4+length], 0, length, -1) + table := common.MapStr{} + err := fieldUnmarshal(table, data[offset+4:offset+4+length], 0, length, -1) if err { logp.Warn("Error while parsing a field table") return 0, true, false } - if fields == nil { - fields = ret - } else { - fields.Update(ret) - } + fields.Update(table) } return length + 4 + offset, false, exists } +// getTable updates fields with the array data at the given offset. +// fields must be non_nil on entry. func getArray(fields common.MapStr, data []byte, offset uint32) (next uint32, err bool, exists bool) { - ret := common.MapStr{} - length := binary.BigEndian.Uint32(data[offset : offset+4]) // size declared too big @@ -64,30 +61,27 @@ func getArray(fields common.MapStr, data []byte, offset uint32) (next uint32, er } if length > 0 { exists = true - err := fieldUnmarshal(ret, data[offset+4:offset+4+length], 0, length, 0) + array := common.MapStr{} + err := fieldUnmarshal(array, data[offset+4:offset+4+length], 0, length, 0) if err { logp.Warn("Error while parsing a field array") return 0, true, false } - if fields == nil { - fields = ret - } else { - fields.Update(ret) - } + fields.Update(array) } return length + 4 + offset, false, exists } -//The index parameter, when set at -1, indicates that the entry is a field table. -//If it's set at 0, it is an array. +// The index parameter, when set at -1, indicates that the entry is a field table. +// If it's set at 0, it is an array. func fieldUnmarshal(table common.MapStr, data []byte, offset uint32, length uint32, index int) (err bool) { var name string if offset >= length { return false } - //get name of the field. If it's an array, it will be the index parameter as a - //string. If it's a table, it will be the name of the field. + // get name of the field. If it's an array, it will be the index parameter as a + // string. If it's a table, it will be the name of the field. if index < 0 { fieldName, offsetTemp, err := getShortString(data, offset+1, uint32(data[offset])) if err { @@ -199,10 +193,10 @@ func fieldUnmarshal(table common.MapStr, data []byte, offset uint32, length uint table[name] = bodyToByteArray(data[offset+1+size : offset+5+size]) offset += 5 + size default: - //unknown field + // unknown field return true } - //advance to next field recursively + // advance to next field recursively return fieldUnmarshal(table, data, offset, length, index) } diff --git a/packetbeat/protos/amqp/amqp_methods.go b/packetbeat/protos/amqp/amqp_methods.go index 93cec5f2747..d74148026c5 100644 --- a/packetbeat/protos/amqp/amqp_methods.go +++ b/packetbeat/protos/amqp/amqp_methods.go @@ -32,7 +32,7 @@ func connectionStartMethod(m *amqpMessage, args []byte) (bool, bool) { properties := make(common.MapStr) next, err, exists := getTable(properties, args, 2) if err { - //failed to get de peer-properties, size may be wrong, let's quit + // failed to get de peer-properties, size may be wrong, let's quit logp.Warn("Failed to parse server properties in connection.start method") return false, false } @@ -54,7 +54,7 @@ func connectionStartMethod(m *amqpMessage, args []byte) (bool, bool) { "mechanisms": mechanisms, "locales": locales, } - //if there is a server properties table, add it + // if there is a server properties table, add it if exists { m.fields["server-properties"] = properties } @@ -65,7 +65,7 @@ func connectionStartOkMethod(m *amqpMessage, args []byte) (bool, bool) { properties := make(common.MapStr) next, err, exists := getTable(properties, args, 0) if err { - //failed to get de peer-properties, size may be wrong, let's quit + // failed to get de peer-properties, size may be wrong, let's quit logp.Warn("Failed to parse server properties in connection.start method") return false, false } @@ -89,7 +89,7 @@ func connectionStartOkMethod(m *amqpMessage, args []byte) (bool, bool) { "mechanism": mechanism, "locale": locale, } - //if there is a client properties table, add it + // if there is a client properties table, add it if exists { m.fields["client-properties"] = properties } @@ -99,8 +99,8 @@ func connectionStartOkMethod(m *amqpMessage, args []byte) (bool, bool) { func connectionTuneMethod(m *amqpMessage, args []byte) (bool, bool) { m.isRequest = true m.method = "connection.tune" - //parameters are not parsed here, they are further negotiated by the server - //in the connection.tune-ok method + // parameters are not parsed here, they are further negotiated by the server + // in the connection.tune-ok method return true, true } @@ -163,7 +163,7 @@ func channelCloseMethod(m *amqpMessage, args []byte) (bool, bool) { return true, true } -//function to fetch fields from channel close and connection close +// function to fetch fields from channel close and connection close func getCloseInfo(args []byte, m *amqpMessage) bool { code := binary.BigEndian.Uint16(args[0:2]) m.isRequest = true @@ -411,7 +411,7 @@ func exchangeDeleteMethod(m *amqpMessage, args []byte) (bool, bool) { return true, true } -//this is a method exclusive to RabbitMQ +// this is a method exclusive to RabbitMQ func exchangeBindMethod(m *amqpMessage, args []byte) (bool, bool) { m.method = "exchange.bind" err := exchangeBindUnbindInfo(m, args) @@ -421,7 +421,7 @@ func exchangeBindMethod(m *amqpMessage, args []byte) (bool, bool) { return true, true } -//this is a method exclusive to RabbitMQ +// this is a method exclusive to RabbitMQ func exchangeUnbindMethod(m *amqpMessage, args []byte) (bool, bool) { m.method = "exchange.unbind" err := exchangeBindUnbindInfo(m, args) @@ -588,7 +588,7 @@ func basicPublishMethod(m *amqpMessage, args []byte) (bool, bool) { func basicReturnMethod(m *amqpMessage, args []byte) (bool, bool) { code := binary.BigEndian.Uint16(args[0:2]) if code < 300 { - //not an error or exception ? not interesting + // not an error or exception ? not interesting return true, false } replyText, nextOffset, err := getShortString(args, 3, uint32(args[2])) @@ -707,7 +707,7 @@ func basicAckMethod(m *amqpMessage, args []byte) (bool, bool) { return true, true } -//this is a rabbitMQ specific method +// this is a rabbitMQ specific method func basicNackMethod(m *amqpMessage, args []byte) (bool, bool) { params := getBitParams(args[8]) m.method = "basic.nack" @@ -761,14 +761,14 @@ func txRollbackMethod(m *amqpMessage, args []byte) (bool, bool) { return true, true } -//simple function used when server/client responds to a sync method with no new info +// simple function used when server/client responds to a sync method with no new info func okMethod(m *amqpMessage, args []byte) (bool, bool) { return true, true } // function to get a short string. It sends back an error if slice is too short -//for declared length. if length == 0, the function sends back an empty string and -//advances the offset. Otherwise, it returns the string and the new offset +// for declared length. if length == 0, the function sends back an empty string and +// advances the offset. Otherwise, it returns the string and the new offset func getShortString(data []byte, start uint32, length uint32) (short string, nextOffset uint32, err bool) { if length == 0 { return "", start, false @@ -779,7 +779,7 @@ func getShortString(data []byte, start uint32, length uint32) (short string, nex return string(data[start : start+length]), start + length, false } -//function to extract bit information in various AMQP methods +// function to extract bit information in various AMQP methods func getBitParams(bits byte) (ret [5]bool) { if bits&16 == 16 { ret[4] = true diff --git a/packetbeat/protos/amqp/amqp_parser.go b/packetbeat/protos/amqp/amqp_parser.go index 6ab15ec8159..a623219ca0b 100644 --- a/packetbeat/protos/amqp/amqp_parser.go +++ b/packetbeat/protos/amqp/amqp_parser.go @@ -42,10 +42,10 @@ func (amqp *amqpPlugin) amqpMessageParser(s *amqpStream) (ok bool, complete bool f, err := readFrameHeader(s.data[s.parseOffset:]) if err { - //incorrect header + // incorrect header return false, false } else if f == nil { - //header not complete + // header not complete return true, false } @@ -67,17 +67,14 @@ func (amqp *amqpPlugin) amqpMessageParser(s *amqpStream) (ok bool, complete bool s.parseOffset += 8 + int(f.size) if !ok { return false, false - } else if complete { + } + if complete { return true, true } } return ok, complete } -func (s *amqpStream) prepareForNewMessage() { - s.message = nil -} - func isProtocolHeader(data []byte) (isHeader bool, version string) { if (string(data[:4]) == "AMQP") && data[4] == 0 { return true, string(data[5:8]) @@ -85,7 +82,7 @@ func isProtocolHeader(data []byte) (isHeader bool, version string) { return false, "" } -//func to read a frame header and check if it is valid and complete +// func to read a frame header and check if it is valid and complete func readFrameHeader(data []byte) (ret *amqpFrame, err bool) { var frame amqpFrame if len(data) < 8 { @@ -103,7 +100,7 @@ func readFrameHeader(data []byte) (ret *amqpFrame, err bool) { } frame.Type = frameType(data[0]) if frame.size == 0 { - //frame content is nil with heartbeat frames + // frame content is nil with heartbeat frames frame.content = nil } else { frame.content = data[7 : frame.size+7] @@ -159,7 +156,7 @@ func (amqp *amqpPlugin) decodeHeaderFrame(s *amqpStream, buf []byte) bool { s.message.bodySize = binary.BigEndian.Uint64(buf[4:12]) debugf("Received Header frame. A message of %d bytes is expected", s.message.bodySize) - if amqp.parseHeaders == true { + if amqp.parseHeaders { err := getMessageProperties(s, buf[12:]) if err { return false @@ -180,7 +177,7 @@ func (s *amqpStream) decodeBodyFrame(buf []byte) (ok bool, complete bool) { debugf("A body frame of %d bytes long has been transmitted", len(buf)) - //is the message complete ? If yes, let's publish it + // is the message complete ? If yes, let's publish it complete = uint64(len(s.message.body)) >= s.message.bodySize return true, complete @@ -190,16 +187,16 @@ func hasProperty(prop, flag byte) bool { return (prop & flag) == flag } -//function to get message content-type and content-encoding +// function to get message content-type and content-encoding func getMessageProperties(s *amqpStream, data []byte) bool { m := s.message - //properties are coded in the two first bytes + // properties are coded in the two first bytes prop1 := data[0] prop2 := data[1] var offset uint32 = 2 - //while last bit set, we have another property flag field + // while last bit set, we have another property flag field for lastbit := 1; data[lastbit]&1 == 1; { lastbit += 2 offset += 2 @@ -238,9 +235,10 @@ func getMessageProperties(s *amqpStream, data []byte) bool { } if hasProperty(prop1, deliveryModeProp) { - if data[offset] == 1 { + switch data[offset] { + case 1: m.fields["delivery-mode"] = "non-persistent" - } else if data[offset] == 2 { + case 2: m.fields["delivery-mode"] = "persistent" } offset++ @@ -337,20 +335,20 @@ func (amqp *amqpPlugin) handleAmqp(m *amqpMessage, tcptuple *common.TCPTuple, di m.direction = dir m.cmdlineTuple = amqp.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) - if m.method == "basic.publish" { + switch { + case m.method == "basic.publish": amqp.handlePublishing(m) - } else if m.method == "basic.deliver" || m.method == "basic.return" || - m.method == "basic.get-ok" { + case m.method == "basic.deliver" || m.method == "basic.return" || m.method == "basic.get-ok": amqp.handleDelivering(m) - } else if m.isRequest == true { + case m.isRequest: amqp.handleAmqpRequest(m) - } else if m.isRequest == false { + default: // !m.isRequest amqp.handleAmqpResponse(m) } } func (amqp *amqpPlugin) mustHideCloseMethod(m *amqpMessage) bool { - return amqp.hideConnectionInformation == true && + return amqp.hideConnectionInformation && (m.method == "connection.close" || m.method == "channel.close") && - getReplyCode(m.fields) < uint16(300) + getReplyCode(m.fields) < 300 } diff --git a/packetbeat/protos/amqp/amqp_structs.go b/packetbeat/protos/amqp/amqp_structs.go index 387e8df6fcb..ada8441c45a 100644 --- a/packetbeat/protos/amqp/amqp_structs.go +++ b/packetbeat/protos/amqp/amqp_structs.go @@ -30,12 +30,12 @@ const ( transactionTimeout = 10 * 1e9 ) -//layout used when a timestamp must be parsed +// layout used when a timestamp must be parsed const ( amqpTimeLayout = "January _2 15:04:05 2006" ) -//Frame types and codes +// Frame types and codes type frameType byte @@ -50,7 +50,7 @@ const ( frameEndOctet byte = 206 ) -//Codes for MethodMap +// Codes for MethodMap type codeClass uint16 const ( @@ -137,7 +137,7 @@ const ( txRollbackOk codeMethod = 31 ) -//Message properties codes for byte prop1 in getMessageProperties +// Message properties codes for byte prop1 in getMessageProperties const ( expirationProp byte = 1 replyToProp byte = 2 @@ -149,7 +149,7 @@ const ( contentTypeProp byte = 128 ) -//Message properties codes for byte prop2 in getMessageProperties +// Message properties codes for byte prop2 in getMessageProperties const ( appIDProp byte = 8 @@ -159,7 +159,7 @@ const ( messageIDProp byte = 128 ) -//table types +// table types const ( boolean = 't' shortShortInt = 'b' @@ -179,7 +179,7 @@ const ( timestamp = 'T' fieldTable = 'F' noField = 'V' - byteArray = 'x' //rabbitMQ specific field + byteArray = 'x' // rabbitMQ specific field ) type amqpPrivateData struct { @@ -203,7 +203,7 @@ type amqpMessage struct { direction uint8 parseArguments bool - //mapstr containing all the options for the methods and header fields + // mapstr containing all the options for the methods and header fields fields common.MapStr body []byte diff --git a/packetbeat/protos/amqp/amqp_test.go b/packetbeat/protos/amqp/amqp_test.go index 725de0f3d11..9b7f7dca35c 100644 --- a/packetbeat/protos/amqp/amqp_test.go +++ b/packetbeat/protos/amqp/amqp_test.go @@ -96,7 +96,7 @@ func TestAmqp_FrameSize(t *testing.T) { _, amqp := amqpModForTests() - //incomplete frame + // incomplete frame data, err := hex.DecodeString("0100000000000c000a001fffff000200") assert.NoError(t, err) @@ -118,7 +118,7 @@ func TestAmqp_PartialFrameSize(t *testing.T) { _, amqp := amqpModForTests() - //incomplete frame + // incomplete frame data, err := hex.DecodeString("414d515000060606010000000000") assert.NoError(t, err) @@ -275,7 +275,7 @@ func TestAmqp_ExchangeDeletion(t *testing.T) { assert.Equal(t, false, m.fields["no-wait"]) } -//this method is exclusive to RabbitMQ +// this method is exclusive to RabbitMQ func TestAmqp_ExchangeBind(t *testing.T) { logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed")) @@ -308,7 +308,7 @@ func TestAmqp_ExchangeBind(t *testing.T) { } } -//this method is exclusive to RabbitMQ +// this method is exclusive to RabbitMQ func TestAmqp_ExchangeUnbindTransaction(t *testing.T) { logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed")) @@ -365,13 +365,13 @@ func TestAmqp_PublishMessage(t *testing.T) { req := protos.Packet{Payload: data} private := protos.ProtocolData(new(amqpPrivateData)) - //method frame + // method frame private = amqp.Parse(&req, tcptuple, 0, private) req = protos.Packet{Payload: data2} - //header frame + // header frame private = amqp.Parse(&req, tcptuple, 0, private) req = protos.Packet{Payload: data3} - //body frame + // body frame amqp.Parse(&req, tcptuple, 0, private) trans := expectTransaction(t, results) @@ -414,13 +414,13 @@ func TestAmqp_DeliverMessage(t *testing.T) { req := protos.Packet{Payload: data} private := protos.ProtocolData(new(amqpPrivateData)) - //method frame + // method frame private = amqp.Parse(&req, tcptuple, 0, private) req = protos.Packet{Payload: data2} - //header frame + // header frame private = amqp.Parse(&req, tcptuple, 0, private) req = protos.Packet{Payload: data3} - //body frame + // body frame amqp.Parse(&req, tcptuple, 0, private) trans := expectTransaction(t, results) @@ -467,7 +467,7 @@ func TestAmqp_MessagePropertiesFields(t *testing.T) { assert.Equal(t, "el mensaje", m.fields["message-id"]) assert.Equal(t, "love message", m.fields["type"]) assert.Equal(t, "text/plain", m.fields["content-type"]) - //assert.Equal(t, "September 15 15:31:44 2015", m.Fields["timestamp"]) + // assert.Equal(t, "September 15 15:31:44 2015", m.Fields["timestamp"]) priority, ok := m.fields["priority"].(uint8) if !ok { t.Errorf("Field should be present") @@ -576,7 +576,7 @@ func TestAmqp_RejectMessage(t *testing.T) { req := protos.Packet{Payload: data} private := protos.ProtocolData(new(amqpPrivateData)) - //method frame + // method frame amqp.Parse(&req, tcptuple, 0, private) trans := expectTransaction(t, results) @@ -668,8 +668,8 @@ func TestAmqp_MaxBodyLength(t *testing.T) { req := protos.Packet{Payload: data} private := protos.ProtocolData(new(amqpPrivateData)) - //method frame - private = amqp.Parse(&req, tcptuple, 0, private) + // method frame + amqp.Parse(&req, tcptuple, 0, private) trans := expectTransaction(t, results) @@ -699,7 +699,7 @@ func TestAmqp_MaxBodyLength(t *testing.T) { req = protos.Packet{Payload: data} private = protos.ProtocolData(new(amqpPrivateData)) - //method frame + // method frame amqp.Parse(&req, tcptuple, 0, private) trans = expectTransaction(t, results) @@ -728,7 +728,7 @@ func TestAmqp_HideArguments(t *testing.T) { amqp.parseHeaders = false amqp.parseArguments = false - //parse args + // parse args data, err := hex.DecodeString("0100010000004d0032000a00000a5465737448656164" + "6572180000003704626f6f6c74010362697462050568656c6c6f530000001f4869206461" + "726c696e6720c3aac3aac3aac3aac3aac3aac3aae697a5e69cacce") @@ -736,7 +736,7 @@ func TestAmqp_HideArguments(t *testing.T) { tcptuple := testTCPTuple() req := protos.Packet{Payload: data} private := protos.ProtocolData(new(amqpPrivateData)) - private = amqp.Parse(&req, tcptuple, 0, private) + amqp.Parse(&req, tcptuple, 0, private) trans := expectTransaction(t, results) assert.Equal(t, "queue.declare", trans["method"]) @@ -753,7 +753,7 @@ func TestAmqp_HideArguments(t *testing.T) { t.Errorf("Arguments field should not be present") } - //parse headers + // parse headers data, err = hex.DecodeString("01000100000013003c00280000000a546573744865616" + "4657200ce02000100000026003c0000000000000000001a98800a746578742f706c61696" + "e02060a656c206d656e73616a65ce0300010000001a54657374206865616465722066696" + @@ -808,7 +808,7 @@ func TestAmqp_RecoverMethod(t *testing.T) { assert.Equal(t, common.MapStr{"requeue": true}, trans["amqp"]) } -//this is a specific rabbitMQ method +// this is a specific rabbitMQ method func TestAmqp_BasicNack(t *testing.T) { logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed")) @@ -887,7 +887,7 @@ func TestAmqp_GetTable(t *testing.T) { assert.Equal(t, true, m.fields["no-wait"]) assert.Equal(t, true, m.fields["auto-delete"]) assert.Equal(t, false, m.fields["exclusive"]) - //assert.Equal(t, "September 15 11:25:29 2015", args["timestamp"]) + // assert.Equal(t, "September 15 11:25:29 2015", args["timestamp"]) assert.Equal(t, "TestHeader", m.request) } @@ -948,7 +948,7 @@ func TestAmqp_ArrayFields(t *testing.T) { _, amqp := amqpModForTests() - //byte array, rabbitMQ specific field + // byte array, rabbitMQ specific field data, err := hex.DecodeString("010001000000260028000a0000057465737431057" + "46f706963020000000f05617272617978000000040a007dd2ce") assert.NoError(t, err) @@ -1025,7 +1025,7 @@ func TestAmqp_WrongTable(t *testing.T) { _, amqp := amqpModForTests() - //declared table size too big + // declared table size too big data, err := hex.DecodeString("010001000000890032000a00000a54657374486561646" + "57218000000da0974696d657374616d70540000000055f7e409036269746205076465636" + "96d616c440500ec49050568656c6c6f530000001f4869206461726c696e6720c3aac3aac" + @@ -1048,7 +1048,7 @@ func TestAmqp_WrongTable(t *testing.T) { } assert.Equal(t, []string{"Failed to parse additional arguments"}, m.notes) - //table size ok, but total non-sense inside + // table size ok, but total non-sense inside data, err = hex.DecodeString("010001000000890032000a00000a54657374486561646" + "57218000000730974696d657374616d7054004400005521e409036269743705076400036" + "96d616c447600ec49180568036c6c0b536400001f480a2064076e6c696e0520c3aac3aac" + diff --git a/packetbeat/protos/amqp/config.go b/packetbeat/protos/amqp/config.go index 2939530046e..4cbd5a5200a 100644 --- a/packetbeat/protos/amqp/config.go +++ b/packetbeat/protos/amqp/config.go @@ -30,14 +30,12 @@ type amqpConfig struct { HideConnectionInformation bool `config:"hide_connection_information"` } -var ( - defaultConfig = amqpConfig{ - ProtocolCommon: config.ProtocolCommon{ - TransactionTimeout: protos.DefaultTransactionExpiration, - }, - ParseHeaders: true, - ParseArguments: true, - MaxBodyLength: 1000, - HideConnectionInformation: true, - } -) +var defaultConfig = amqpConfig{ + ProtocolCommon: config.ProtocolCommon{ + TransactionTimeout: protos.DefaultTransactionExpiration, + }, + ParseHeaders: true, + ParseArguments: true, + MaxBodyLength: 1000, + HideConnectionInformation: true, +} diff --git a/packetbeat/protos/cassandra/cassandra.go b/packetbeat/protos/cassandra/cassandra.go index 7d3001a5159..5c7cbd003d1 100644 --- a/packetbeat/protos/cassandra/cassandra.go +++ b/packetbeat/protos/cassandra/cassandra.go @@ -50,9 +50,7 @@ type stream struct { parser parser } -var ( - debugf = logp.MakeDebug("cassandra") -) +var debugf = logp.MakeDebug("cassandra") func init() { protos.Register("cassandra", New) @@ -202,11 +200,6 @@ func (cassandra *cassandra) ensureConnection(private protos.ProtocolData) *conne return conn } -func (conn *connection) dropStreams() { - conn.streams[0] = nil - conn.streams[1] = nil -} - func getConnection(private protos.ProtocolData) *connection { if private == nil { return nil diff --git a/packetbeat/protos/cassandra/config.go b/packetbeat/protos/cassandra/config.go index 0eafddcd1ee..8e93239bc6b 100644 --- a/packetbeat/protos/cassandra/config.go +++ b/packetbeat/protos/cassandra/config.go @@ -34,20 +34,18 @@ type cassandraConfig struct { OPsIgnored []gocql.FrameOp `config:"ignored_ops"` } -var ( - defaultConfig = cassandraConfig{ - ProtocolCommon: config.ProtocolCommon{ - TransactionTimeout: protos.DefaultTransactionExpiration, - SendRequest: true, - SendResponse: true, - }, - SendRequestHeader: true, - SendResponseHeader: true, - } -) +var defaultConfig = cassandraConfig{ + ProtocolCommon: config.ProtocolCommon{ + TransactionTimeout: protos.DefaultTransactionExpiration, + SendRequest: true, + SendResponse: true, + }, + SendRequestHeader: true, + SendResponseHeader: true, +} func (c *cassandraConfig) Validate() error { - if !(c.Compressor == "" || c.Compressor == "snappy") { + if c.Compressor != "" && c.Compressor != "snappy" { return fmt.Errorf("invalid compressor config: %s, only snappy supported", c.Compressor) } return nil diff --git a/packetbeat/protos/cassandra/internal/gocql/array_decoder.go b/packetbeat/protos/cassandra/internal/gocql/array_decoder.go index a224dc7d1b5..b64ecf8e146 100644 --- a/packetbeat/protos/cassandra/internal/gocql/array_decoder.go +++ b/packetbeat/protos/cassandra/internal/gocql/array_decoder.go @@ -26,10 +26,6 @@ type ByteArrayDecoder struct { Data *[]byte } -func readInt(p []byte) int32 { - return int32(p[0])<<24 | int32(p[1])<<16 | int32(p[2])<<8 | int32(p[3]) -} - func (f ByteArrayDecoder) ReadByte() (byte, error) { data := *f.Data if len(data) < 1 { @@ -167,7 +163,7 @@ func (f ByteArrayDecoder) ReadInet() (net.IP, int) { size := data[0] *f.Data = data[1:] - if !(size == 4 || size == 16) { + if size != 4 && size != 16 { panic(fmt.Errorf("invalid IP size: %d", size)) } diff --git a/packetbeat/protos/cassandra/internal/gocql/compressor.go b/packetbeat/protos/cassandra/internal/gocql/compressor.go index cfbff208ade..fe6583c3a77 100644 --- a/packetbeat/protos/cassandra/internal/gocql/compressor.go +++ b/packetbeat/protos/cassandra/internal/gocql/compressor.go @@ -49,11 +49,11 @@ func (s SnappyCompressor) Decode(data []byte) ([]byte, error) { const LZ4 string = "lz4" type LZ4Compressor struct { - //TODO + // TODO } const Deflate string = "deflate" type DeflateCompressor struct { - //TODO + // TODO } diff --git a/packetbeat/protos/cassandra/internal/gocql/decoder.go b/packetbeat/protos/cassandra/internal/gocql/decoder.go index 734a940d8bd..f628aa65120 100644 --- a/packetbeat/protos/cassandra/internal/gocql/decoder.go +++ b/packetbeat/protos/cassandra/internal/gocql/decoder.go @@ -23,34 +23,19 @@ import ( type Decoder interface { ReadByte() (byte, error) - ReadInt() (n int) - ReadShort() (n uint16) - ReadLong() (n int64) - ReadString() (s string) - ReadLongString() (s string) - ReadUUID() *UUID - ReadStringList() []string - ReadBytesInternal() []byte - ReadBytes() []byte - ReadShortBytes() []byte - ReadInet() (net.IP, int) - ReadConsistency() Consistency - ReadStringMap() map[string]string - ReadBytesMap() map[string][]byte - ReadStringMultiMap() map[string][]string } diff --git a/packetbeat/protos/cassandra/internal/gocql/frame.go b/packetbeat/protos/cassandra/internal/gocql/frame.go index 25e7df5bac8..b8ae7eeac4e 100644 --- a/packetbeat/protos/cassandra/internal/gocql/frame.go +++ b/packetbeat/protos/cassandra/internal/gocql/frame.go @@ -182,18 +182,17 @@ func (f *Framer) ReadFrame() (data map[string]interface{}, err error) { data = make(map[string]interface{}) - //Only QUERY, PREPARE and EXECUTE queries support tracing - //If a response frame has the tracing flag set, its body contains - //a tracing ID. The tracing ID is a [uuid] and is the first thing in - //the frame body. The rest of the body will then be the usual body - //corresponding to the response opcode. + // Only QUERY, PREPARE and EXECUTE queries support tracing + // If a response frame has the tracing flag set, its body contains + // a tracing ID. The tracing ID is a [uuid] and is the first thing in + // the frame body. The rest of the body will then be the usual body + // corresponding to the response opcode. if f.Header.Flags&flagTracing == flagTracing && (f.Header.Op&opQuery == opQuery || f.Header.Op&opExecute == opExecute || f.Header.Op&opPrepare == opPrepare) { - debugf("tracing enabled") - //seems no UUID to read, protocol incorrect? - //uid := decoder.ReadUUID() - //data["trace_id"] = uid.String() + // seems no UUID to read, protocol incorrect? + // uid := decoder.ReadUUID() + // data["trace_id"] = uid.String() } if f.Header.Flags&flagWarning == flagWarning { @@ -211,7 +210,7 @@ func (f *Framer) ReadFrame() (data map[string]interface{}, err error) { } if f.Header.Flags&flagCompress == flagCompress { - //decompress data and switch to use bytearray decoder + // decompress data and switch to use bytearray decoder if f.compres == nil { logp.Err("hit compress flag, but compressor was not set") panic(errors.New("hit compress flag, but compressor was not set")) @@ -234,13 +233,13 @@ func (f *Framer) ReadFrame() (data map[string]interface{}, err error) { // assumes that the frame body has been read into rbuf switch f.Header.Op { - //below ops are requests + // below ops are requests case opStartup, opAuthResponse, opOptions, opPrepare, opExecute, opBatch, opRegister: - //ignored + // ignored case opQuery: data = f.parseQueryFrame() - //below ops are responses + // below ops are responses case opError: data["error"] = f.parseErrorFrame() case opResult: @@ -259,7 +258,7 @@ func (f *Framer) ReadFrame() (data map[string]interface{}, err error) { // the body should be empty default: - //ignore + // ignore debugf("unknow ops, not processed, %v", f.Header) } @@ -348,7 +347,7 @@ func (f *Framer) parseErrorFrame() (data map[string]interface{}) { case errInvalid, errBootstrapping, errConfig, errCredentials, errOverloaded, errProtocol, errServer, errSyntax, errTruncate, errUnauthorized: - //ignored + // ignored default: logp.Err("unknown error code: 0x%x", code) } @@ -375,8 +374,7 @@ func (f *Framer) parseResultMetadata(getPKinfo bool) map[string]interface{} { meta["col_count"] = colCount if getPKinfo { - - //only for prepared result + // only for prepared result if f.proto >= protoVersion4 { pkeyCount := decoder.ReadInt() pkeys := make([]int, pkeyCount) @@ -450,7 +448,6 @@ func (f *Framer) parseResultPrepared() map[string]interface{} { result := make(map[string]interface{}) uuid, err := UUIDFromBytes((f.decoder).ReadShortBytes()) - if err != nil { logp.Err("Error in parsing UUID") } diff --git a/packetbeat/protos/cassandra/internal/gocql/marshal.go b/packetbeat/protos/cassandra/internal/gocql/marshal.go index 2061bb2f11e..3c55781cc81 100644 --- a/packetbeat/protos/cassandra/internal/gocql/marshal.go +++ b/packetbeat/protos/cassandra/internal/gocql/marshal.go @@ -349,8 +349,8 @@ const ( errUnprepared ErrType = 0x2500 ) -func (this ErrType) String() string { - switch this { +func (e ErrType) String() string { + switch e { case errUnavailable: return "errUnavailable" case errWriteTimeout: @@ -680,7 +680,7 @@ func (u UUID) Bytes() []byte { // String returns the UUID in it's canonical form, a 32 digit hexadecimal // number in the form of xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx. func (u UUID) String() string { - var offsets = [...]int{0, 2, 4, 6, 9, 11, 14, 16, 19, 21, 24, 26, 28, 30, 32, 34} + offsets := [...]int{0, 2, 4, 6, 9, 11, 14, 16, 19, 21, 24, 26, 28, 30, 32, 34} const hexString = "0123456789abcdef" r := make([]byte, 36) for i, b := range u { diff --git a/packetbeat/protos/cassandra/internal/gocql/stream_decoder.go b/packetbeat/protos/cassandra/internal/gocql/stream_decoder.go index ae107a95e9e..f39cde2d11e 100644 --- a/packetbeat/protos/cassandra/internal/gocql/stream_decoder.go +++ b/packetbeat/protos/cassandra/internal/gocql/stream_decoder.go @@ -149,7 +149,7 @@ func (f StreamDecoder) ReadInet() (net.IP, int) { panic(err) } - if !(size == 4 || size == 16) { + if size != 4 && size != 16 { panic(fmt.Errorf("invalid IP size: %d", size)) } diff --git a/packetbeat/protos/cassandra/parser.go b/packetbeat/protos/cassandra/parser.go index f699187b03f..65ff347a1ea 100644 --- a/packetbeat/protos/cassandra/parser.go +++ b/packetbeat/protos/cassandra/parser.go @@ -44,14 +44,7 @@ type parserConfig struct { // check whether this ops is enabled or not func (p *parser) CheckFrameOpsIgnored() bool { - if p.config.ignoredOps != nil && len(p.config.ignoredOps) > 0 { - //default map value is false - v := p.config.ignoredOps[p.framer.Header.Op] - if v { - return true - } - } - return false + return p.config.ignoredOps[p.framer.Header.Op] } type message struct { @@ -69,8 +62,6 @@ type message struct { // list element use by 'transactions' for correlation next *message - transactionTimeout time.Duration - results transactions } @@ -91,7 +82,6 @@ func (p *parser) init( } isDebug = logp.IsDebug("cassandra") - } func (p *parser) append(data []byte) error { @@ -153,7 +143,7 @@ func (p *parser) parserBody() (bool, error) { return true, nil } - //let's wait for enough buf + // let's wait for enough buf debugf("bodyLength: %d", bdyLen) if !p.buf.Avail(bdyLen) { if isDebug { @@ -162,7 +152,7 @@ func (p *parser) parserBody() (bool, error) { return false, nil } - //check if the ops already ignored + // check if the ops already ignored if p.message.ignored { if isDebug { debugf("message marked to be ignored, let's do this") @@ -232,7 +222,7 @@ func (p *parser) parse() (*message, error) { } } - //check if the ops need to be ignored + // check if the ops need to be ignored if p.CheckFrameOpsIgnored() { // as we already ignore the content, we now mark the result is ignored p.message.ignored = true @@ -248,7 +238,7 @@ func (p *parser) parse() (*message, error) { return nil, err } - //ignore and wait for more data + // ignore and wait for more data if !finished { return nil, nil } diff --git a/packetbeat/protos/cassandra/pub.go b/packetbeat/protos/cassandra/pub.go index 3b90c53feae..82effab92d3 100644 --- a/packetbeat/protos/cassandra/pub.go +++ b/packetbeat/protos/cassandra/pub.go @@ -33,7 +33,6 @@ type transPub struct { sendResponse bool sendRequestHeader bool sendResponseHeader bool - ignoredOps string results protos.Reporter } @@ -82,7 +81,7 @@ func (pub *transPub) createEvent(requ, resp *message) beat.Event { cassandra := common.MapStr{} status := common.OK_STATUS - //requ can be null, if the message is a PUSHed message + // requ can be null, if the message is a PUSHed message if requ != nil { pbf.Source.Bytes = int64(requ.Size) pbf.Event.Start = requ.Ts @@ -101,7 +100,7 @@ func (pub *transPub) createEvent(requ, resp *message) beat.Event { } } } else { - //dealing with PUSH message + // dealing with PUSH message cassandra["no_request"] = true } diff --git a/packetbeat/protos/cassandra/trans.go b/packetbeat/protos/cassandra/trans.go index 9b055d22c88..d7a9ae60d5f 100644 --- a/packetbeat/protos/cassandra/trans.go +++ b/packetbeat/protos/cassandra/trans.go @@ -132,7 +132,6 @@ func (trans *transactions) onResponse( func (trans *transactions) tryMergeRequests( prev, msg *message, ) (merged bool, err error) { - msg.isComplete = true return false, nil } @@ -150,7 +149,7 @@ func (trans *transactions) correlate() error { if requests.empty() { for !responses.empty() { - //if the response is EVENT, which pushed from server, we can accept that + // if the response is EVENT, which pushed from server, we can accept that resp := responses.first() if !resp.isComplete { break diff --git a/packetbeat/protos/dhcpv4/config.go b/packetbeat/protos/dhcpv4/config.go index 4121ee66ba7..7245e3c641b 100644 --- a/packetbeat/protos/dhcpv4/config.go +++ b/packetbeat/protos/dhcpv4/config.go @@ -25,10 +25,8 @@ type dhcpv4Config struct { config.ProtocolCommon `config:",inline"` } -var ( - defaultConfig = dhcpv4Config{ - ProtocolCommon: config.ProtocolCommon{ - Ports: []int{67, 68}, - }, - } -) +var defaultConfig = dhcpv4Config{ + ProtocolCommon: config.ProtocolCommon{ + Ports: []int{67, 68}, + }, +} diff --git a/packetbeat/protos/dns/config.go b/packetbeat/protos/dns/config.go index c98d52f8257..c1044cf2f7c 100644 --- a/packetbeat/protos/dns/config.go +++ b/packetbeat/protos/dns/config.go @@ -28,10 +28,8 @@ type dnsConfig struct { IncludeAdditionals bool `config:"include_additionals"` } -var ( - defaultConfig = dnsConfig{ - ProtocolCommon: config.ProtocolCommon{ - TransactionTimeout: protos.DefaultTransactionExpiration, - }, - } -) +var defaultConfig = dnsConfig{ + ProtocolCommon: config.ProtocolCommon{ + TransactionTimeout: protos.DefaultTransactionExpiration, + }, +} diff --git a/packetbeat/protos/dns/dns.go b/packetbeat/protos/dns/dns.go index 125d6112bae..04be88a36b1 100644 --- a/packetbeat/protos/dns/dns.go +++ b/packetbeat/protos/dns/dns.go @@ -59,18 +59,10 @@ type dnsPlugin struct { watcher procs.ProcessesWatcher } -var ( - debugf = logp.MakeDebug("dns") -) +var debugf = logp.MakeDebug("dns") const maxDNSTupleRawSize = 16 + 16 + 2 + 2 + 4 + 1 -// Constants used to associate the DNS QR flag with a meaningful value. -const ( - query = false - response = true -) - // Transport protocol. type transport uint8 @@ -203,13 +195,12 @@ func (dns *dnsPlugin) getTransaction(k hashableDNSTuple) *dnsTransaction { } type dnsTransaction struct { - ts time.Time // Time when the request was received. - tuple dnsTuple // Key used to track this transaction in the transactionsMap. - responseTime int32 // Elapsed time in milliseconds between the request and response. - src common.Endpoint - dst common.Endpoint - transport transport - notes []string + ts time.Time // Time when the request was received. + tuple dnsTuple // Key used to track this transaction in the transactionsMap. + src common.Endpoint + dst common.Endpoint + transport transport + notes []string request *dnsMessage response *dnsMessage diff --git a/packetbeat/protos/dns/dns_tcp.go b/packetbeat/protos/dns/dns_tcp.go index bbf7e736926..8addcdd6896 100644 --- a/packetbeat/protos/dns/dns_tcp.go +++ b/packetbeat/protos/dns/dns_tcp.go @@ -107,7 +107,6 @@ func (dns *dnsPlugin) doParse(conn *dnsConnectionData, pkt *protos.Packet, tcpTu } } decodedData, err := stream.handleTCPRawData() - if err != nil { if err == incompleteMsg { @@ -259,8 +258,8 @@ func (dns *dnsPlugin) publishResponseError(conn *dnsConnectionData, err error) { trans.notes = append(trans.notes, errDNS.responseError()) // Should we publish the length (bytes_out) of the failed Response? - //streamReverse.message.Length = len(streamReverse.rawData) - //trans.Response = streamReverse.message + // streamReverse.message.Length = len(streamReverse.rawData) + // trans.Response = streamReverse.message dns.publishTransaction(trans) dns.deleteTransaction(hashDNSTupleOrigin) @@ -296,7 +295,6 @@ func (stream *dnsStream) handleTCPRawData() (*mkdns.Msg, error) { } decodedData, err := decodeDNSData(transportTCP, rawData[:stream.parseOffset]) - if err != nil { return nil, err } diff --git a/packetbeat/protos/dns/dns_test.go b/packetbeat/protos/dns/dns_test.go index 9427ebeaecf..1ba03b2bd61 100644 --- a/packetbeat/protos/dns/dns_test.go +++ b/packetbeat/protos/dns/dns_test.go @@ -316,8 +316,10 @@ func TestRRsToMapStrsWithOPTRecord(t *testing.T) { o.Hdr.Rrtype = mkdns.TypeOPT r := new(mkdns.MX) - r.Hdr = mkdns.RR_Header{Name: "miek.nl", Rrtype: mkdns.TypeMX, - Class: mkdns.ClassINET, Ttl: 3600} + r.Hdr = mkdns.RR_Header{ + Name: "miek.nl", Rrtype: mkdns.TypeMX, + Class: mkdns.ClassINET, Ttl: 3600, + } r.Preference = 10 r.Mx = "mx.miek.nl" diff --git a/packetbeat/protos/dns/dns_udp_test.go b/packetbeat/protos/dns/dns_udp_test.go index 3711fb50e00..62713502e0a 100644 --- a/packetbeat/protos/dns/dns_udp_test.go +++ b/packetbeat/protos/dns/dns_udp_test.go @@ -155,10 +155,12 @@ var ( qSubdomain: "131.252.30", qTLD: "in-addr.arpa", answers: []string{"github.com"}, - authorities: []string{"a.root-servers.net", "b.root-servers.net", "c.root-servers.net", + authorities: []string{ + "a.root-servers.net", "b.root-servers.net", "c.root-servers.net", "d.root-servers.net", "e.root-servers.net", "f.root-servers.net", "g.root-servers.net", "h.root-servers.net", "i.root-servers.net", "j.root-servers.net", "k.root-servers.net", - "l.root-servers.net", "m.root-servers.net"}, + "l.root-servers.net", "m.root-servers.net", + }, request: []byte{ 0x01, 0x58, 0x01, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x31, 0x33, 0x31, 0x03, 0x32, 0x35, 0x32, 0x02, 0x33, 0x30, 0x03, 0x31, 0x39, 0x32, 0x07, 0x69, 0x6e, 0x2d, 0x61, diff --git a/packetbeat/protos/http/config.go b/packetbeat/protos/http/config.go index 86f59fb36b1..45878934b0c 100644 --- a/packetbeat/protos/http/config.go +++ b/packetbeat/protos/http/config.go @@ -39,12 +39,10 @@ type httpConfig struct { RedactHeaders []string `config:"redact_headers"` } -var ( - defaultConfig = httpConfig{ - ProtocolCommon: config.ProtocolCommon{ - TransactionTimeout: protos.DefaultTransactionExpiration, - }, - MaxMessageSize: tcp.TCPMaxDataInStream, - DecodeBody: true, - } -) +var defaultConfig = httpConfig{ + ProtocolCommon: config.ProtocolCommon{ + TransactionTimeout: protos.DefaultTransactionExpiration, + }, + MaxMessageSize: tcp.TCPMaxDataInStream, + DecodeBody: true, +} diff --git a/packetbeat/protos/http/http.go b/packetbeat/protos/http/http.go index e86cb665587..9cc436f84f0 100644 --- a/packetbeat/protos/http/http.go +++ b/packetbeat/protos/http/http.go @@ -39,8 +39,10 @@ import ( "github.com/elastic/beats/v7/packetbeat/protos" ) -var debugf = logp.MakeDebug("http") -var detailedf = logp.MakeDebug("httpdetailed") +var ( + debugf = logp.MakeDebug("http") + detailedf = logp.MakeDebug("httpdetailed") +) type parserState uint8 @@ -300,7 +302,6 @@ func (http *httpPlugin) doParse( tcptuple *common.TCPTuple, dir uint8, ) *httpConnectionData { - if isDetailed { detailedf("Payload received: [%s]", pkt.Payload) } @@ -368,8 +369,8 @@ func newStream(pkt *protos.Packet, tcptuple *common.TCPTuple) *stream { // ReceivedFin will be called when TCP transaction is terminating. func (http *httpPlugin) ReceivedFin(tcptuple *common.TCPTuple, dir uint8, - private protos.ProtocolData) protos.ProtocolData { - + private protos.ProtocolData, +) protos.ProtocolData { debugf("Received FIN") conn := getHTTPConnection(private) if conn == nil { @@ -396,8 +397,8 @@ func (http *httpPlugin) ReceivedFin(tcptuple *common.TCPTuple, dir uint8, // GapInStream is called when a gap of nbytes bytes is found in the stream (due // to packet loss). func (http *httpPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8, - nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) { - + nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool, +) { defer logp.Recover("GapInStream(http) exception") conn := getHTTPConnection(private) @@ -436,7 +437,6 @@ func (http *httpPlugin) handleHTTP( tcptuple *common.TCPTuple, dir uint8, ) { - m.tcpTuple = *tcptuple m.direction = dir m.cmdlineTuple = http.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) @@ -488,7 +488,6 @@ func (http *httpPlugin) flushRequests(conn *httpConnectionData) { } func (http *httpPlugin) correlate(conn *httpConnectionData) { - // drop responses with missing requests if conn.requests.empty() { http.flushResponses(conn) @@ -724,8 +723,7 @@ func splitCookiesHeader(headerVal string) map[string]string { for _, cval := range cstring { cookie := strings.SplitN(cval, "=", 2) if len(cookie) == 2 { - cookies[strings.ToLower(strings.TrimSpace(cookie[0]))] = - parseCookieValue(strings.TrimSpace(cookie[1])) + cookies[strings.ToLower(strings.TrimSpace(cookie[0]))] = parseCookieValue(strings.TrimSpace(cookie[1])) } } @@ -928,10 +926,6 @@ func (ml *messageList) pop() *message { return msg } -func (ml *messageList) last() *message { - return ml.tail -} - func extractBasicAuthUser(headers map[string]common.NetString) string { const prefix = "Basic " diff --git a/packetbeat/protos/http/http_parser.go b/packetbeat/protos/http/http_parser.go index be4343ea120..a903460f772 100644 --- a/packetbeat/protos/http/http_parser.go +++ b/packetbeat/protos/http/http_parser.go @@ -45,7 +45,7 @@ type message struct { cmdlineTuple *common.ProcessTuple direction uint8 - //Request Info + // Request Info requestURI common.NetString method common.NetString statusCode uint16 @@ -187,7 +187,7 @@ func (*parser) parseHTTPLine(s *stream, m *message) (cont, ok, complete bool) { return false, false, false } if bytes.Equal(fline[0:5], constHTTPVersion) { - //RESPONSE + // RESPONSE m.isRequest = false version = fline[5:8] m.statusCode, m.statusPhrase, err = parseResponseStatus(fline[9:]) @@ -289,7 +289,7 @@ func (parser *parser) parseHeaders(s *stream, m *message) (cont, ok, complete bo s.parseOffset = 0 if !m.isRequest && ((100 <= m.statusCode && m.statusCode < 200) || m.statusCode == 204 || m.statusCode == 304) { - //response with a 1xx, 204 , or 304 status code is always terminated + // response with a 1xx, 204 , or 304 status code is always terminated // by the first empty line after the header fields if isDebug { debugf("Terminate response, status code %d", m.statusCode) diff --git a/packetbeat/protos/http/http_test.go b/packetbeat/protos/http/http_test.go index 0fa9077a5ed..4179a047d56 100644 --- a/packetbeat/protos/http/http_test.go +++ b/packetbeat/protos/http/http_test.go @@ -56,10 +56,6 @@ func (e *eventStore) publish(event beat.Event) { e.events = append(e.events, event) } -func (e *eventStore) empty() bool { - return len(e.events) == 0 -} - func newTestParser(http *httpPlugin, payloads ...string) *testParser { if http == nil { http = httpModForTests(nil) @@ -538,7 +534,7 @@ func TestHttpParser_RequestResponseBody(t *testing.T) { tp.stream.PrepareForNewMessage() tp.stream.message = &message{ts: time.Now()} - msg, ok, complete = tp.parse() + _, ok, complete = tp.parse() assert.True(t, ok) assert.True(t, complete) } @@ -646,17 +642,23 @@ func TestEatBodyChunked(t *testing.T) { st.data = append(st.data, msgs[1]...) cont, ok, complete = parser.parseBodyChunkedStart(st, msg) assert.True(t, cont) + assert.True(t, ok) + assert.False(t, complete) assert.Equal(t, 3, msg.chunkedLength) assert.Equal(t, 0, len(msg.body)) assert.Equal(t, stateBodyChunked, st.parseState) cont, ok, complete = parser.parseBodyChunked(st, msg) assert.True(t, cont) + assert.True(t, ok) + assert.False(t, complete) assert.Equal(t, stateBodyChunkedStart, st.parseState) assert.Equal(t, 3, msg.contentLength) cont, ok, complete = parser.parseBodyChunkedStart(st, msg) assert.True(t, cont) + assert.True(t, ok) + assert.False(t, complete) assert.Equal(t, 3, msg.chunkedLength) assert.Equal(t, 3, msg.contentLength) assert.Equal(t, stateBodyChunked, st.parseState) @@ -672,6 +674,8 @@ func TestEatBodyChunked(t *testing.T) { st.data = append(st.data, msgs[2]...) cont, ok, complete = parser.parseBodyChunked(st, msg) assert.True(t, cont) + assert.True(t, ok) + assert.False(t, complete) assert.Equal(t, 6, msg.contentLength) assert.Equal(t, stateBodyChunkedStart, st.parseState) @@ -729,7 +733,6 @@ func TestEatBodyChunkedWaitCRLF(t *testing.T) { ok, complete = parser.parseBodyChunkedWaitFinalCRLF(st, msg) if ok != true || complete != false { t.Error("Wrong return values", ok, complete) - } st.data = append(st.data, msgs[1]...) @@ -817,13 +820,12 @@ func TestHttpParser_censorPasswordPOST(t *testing.T) { http.parserConfig.sendHeaders = true http.parserConfig.sendAllHeaders = true - data1 := - "POST /users/login HTTP/1.1\r\n" + - "HOST: www.example.com\r\n" + - "Content-Type: application/x-www-form-urlencoded\r\n" + - "Content-Length: 28\r\n" + - "\r\n" + - "username=ME&password=secret\r\n" + data1 := "POST /users/login HTTP/1.1\r\n" + + "HOST: www.example.com\r\n" + + "Content-Type: application/x-www-form-urlencoded\r\n" + + "Content-Length: 28\r\n" + + "\r\n" + + "username=ME&password=secret\r\n" tp := newTestParser(http, data1) msg, ok, complete := tp.parse() @@ -1511,7 +1513,8 @@ func TestHTTP_Encodings(t *testing.T) { gzipDeflateBody := string([]byte{ 0x1f, 0x8b, 0x08, 0x00, 0x65, 0xdb, 0x6a, 0x5b, 0x00, 0x03, 0x3b, 0x7d, 0xe2, 0xbc, 0xe7, 0x13, 0x26, 0x06, 0x00, 0x95, 0xfa, 0x49, 0xbf, 0x07, - 0x00, 0x00, 0x00}) + 0x00, 0x00, 0x00, + }) var store eventStore http := httpModForTests(&store) diff --git a/packetbeat/protos/icmp/config.go b/packetbeat/protos/icmp/config.go index cb3ee28d7f9..4fcdb6667e5 100644 --- a/packetbeat/protos/icmp/config.go +++ b/packetbeat/protos/icmp/config.go @@ -29,8 +29,6 @@ type icmpConfig struct { TransactionTimeout time.Duration `config:"transaction_timeout"` } -var ( - defaultConfig = icmpConfig{ - TransactionTimeout: protos.DefaultTransactionExpiration, - } -) +var defaultConfig = icmpConfig{ + TransactionTimeout: protos.DefaultTransactionExpiration, +} diff --git a/packetbeat/protos/icmp/icmp.go b/packetbeat/protos/icmp/icmp.go index 521bb019ce6..c204b819adb 100644 --- a/packetbeat/protos/icmp/icmp.go +++ b/packetbeat/protos/icmp/icmp.go @@ -102,7 +102,7 @@ func (icmp *icmpPlugin) init(results protos.Reporter, watcher procs.ProcessesWat } logp.Debug("icmp", "Local IP addresses: %s", icmp.localIps) - var removalListener = func(k common.Key, v common.Value) { + removalListener := func(k common.Key, v common.Value) { icmp.expireTransaction(k.(hashableIcmpTuple), v.(*icmpTransaction)) } @@ -145,7 +145,7 @@ func (icmp *icmpPlugin) ProcessICMPv4( ts: pkt.Ts, Type: typ, code: code, - length: len(icmp4.BaseLayer.Payload), + length: len(icmp4.Payload), } if isRequest(tuple, msg) { @@ -180,7 +180,7 @@ func (icmp *icmpPlugin) ProcessICMPv6( ts: pkt.Ts, Type: typ, code: code, - length: len(icmp6.BaseLayer.Payload), + length: len(icmp6.Payload), } if isRequest(tuple, msg) { @@ -257,14 +257,6 @@ func (icmp *icmpPlugin) isLocalIP(ip net.IP) bool { return false } -func (icmp *icmpPlugin) getTransaction(k hashableIcmpTuple) *icmpTransaction { - v := icmp.transactions.Get(k) - if v != nil { - return v.(*icmpTransaction) - } - return nil -} - func (icmp *icmpPlugin) deleteTransaction(k hashableIcmpTuple) *icmpTransaction { v := icmp.transactions.Delete(k) if v != nil { diff --git a/packetbeat/protos/icmp/tuple_test.go b/packetbeat/protos/icmp/tuple_test.go index 7c9112decb1..b8d09e6753f 100644 --- a/packetbeat/protos/icmp/tuple_test.go +++ b/packetbeat/protos/icmp/tuple_test.go @@ -79,7 +79,8 @@ func TestIcmpTupleHashable(t *testing.T) { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 192, 168, 0, 2, 1, 0, 0, 1, - 4} + 4, + } assert.Equal(t, expectedHashable, actualHashable) } diff --git a/packetbeat/protos/memcache/binary.go b/packetbeat/protos/memcache/binary.go index f9db915ce8a..ee831055317 100644 --- a/packetbeat/protos/memcache/binary.go +++ b/packetbeat/protos/memcache/binary.go @@ -28,8 +28,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common/streambuf" ) -type memcacheMagic uint8 - const ( memcacheMagicRequest = 0x80 memcacheMagicResponse = 0x81 @@ -56,10 +54,11 @@ var binStatsValue = argDef{ serialize: serializeStats, } -var extraValue = makeValueExtra("value") -var extraDelta = makeValueExtra("delta") -var extraInitial = makeValue2Extra("initial") -var extraVerbosity = make32ValueExtra("verbosity") +var ( + extraDelta = makeValueExtra("delta") + extraInitial = makeValue2Extra("initial") + extraVerbosity = make32ValueExtra("verbosity") +) func init() { // define all memcache opcode commands: diff --git a/packetbeat/protos/memcache/config.go b/packetbeat/protos/memcache/config.go index e558c84153d..dfe01e86e3b 100644 --- a/packetbeat/protos/memcache/config.go +++ b/packetbeat/protos/memcache/config.go @@ -32,11 +32,9 @@ type memcacheConfig struct { ParseUnknown bool } -var ( - defaultConfig = memcacheConfig{ - ProtocolCommon: config.ProtocolCommon{ - TransactionTimeout: protos.DefaultTransactionExpiration, - }, - UDPTransactionTimeout: protos.DefaultTransactionExpiration, - } -) +var defaultConfig = memcacheConfig{ + ProtocolCommon: config.ProtocolCommon{ + TransactionTimeout: protos.DefaultTransactionExpiration, + }, + UDPTransactionTimeout: protos.DefaultTransactionExpiration, +} diff --git a/packetbeat/protos/memcache/errors.go b/packetbeat/protos/memcache/errors.go index 3b6b5b9ae45..282812a3492 100644 --- a/packetbeat/protos/memcache/errors.go +++ b/packetbeat/protos/memcache/errors.go @@ -23,9 +23,7 @@ import ( "errors" ) -var ( - errNotImplemented = errors.New("not implemented") -) +var errNotImplemented = errors.New("not implemented") // memcache text parser errors var ( diff --git a/packetbeat/protos/memcache/parse.go b/packetbeat/protos/memcache/parse.go index 2fe774c19e5..01ce9a05acb 100644 --- a/packetbeat/protos/memcache/parse.go +++ b/packetbeat/protos/memcache/parse.go @@ -25,11 +25,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common/streambuf" ) -const ( - codeSpace byte = ' ' - codeTab = '\t' -) - type parserConfig struct { maxValues int maxBytesPerValue int diff --git a/packetbeat/protos/memcache/parse_test.go b/packetbeat/protos/memcache/parse_test.go index 95fdac5bad3..e1ac77160f5 100644 --- a/packetbeat/protos/memcache/parse_test.go +++ b/packetbeat/protos/memcache/parse_test.go @@ -56,8 +56,10 @@ type binValueWriter interface { WriteNetUint64At(uint64, int) error } -type extraFn func(binValueWriter) int -type valueFn func(*streambuf.Buffer, int) int +type ( + extraFn func(binValueWriter) int + valueFn func(*streambuf.Buffer, int) int +) type offsetBinWriter struct { w binValueWriter diff --git a/packetbeat/protos/memcache/plugin_tcp.go b/packetbeat/protos/memcache/plugin_tcp.go index 830a0cd64a5..c16abb3c712 100644 --- a/packetbeat/protos/memcache/plugin_tcp.go +++ b/packetbeat/protos/memcache/plugin_tcp.go @@ -60,8 +60,6 @@ type messageList struct { tail *message } -const defaultTCPTransDuration uint = 200 - func ensureMemcacheConnection(private protos.ProtocolData) *tcpConnectionData { if private == nil { return &tcpConnectionData{} diff --git a/packetbeat/protos/memcache/text.go b/packetbeat/protos/memcache/text.go index 72b36442c9a..86baa8d0627 100644 --- a/packetbeat/protos/memcache/text.go +++ b/packetbeat/protos/memcache/text.go @@ -109,12 +109,14 @@ var argStat = argDef{ serialize: serializeStats, } -var argDelta = makeValueArg("delta") -var argSleepUs = makeValueArg("sleep_us") -var argValue = makeValueArg("value") -var argVerbosity = makeValueArg("verbosity") -var argSourceClass = makeIValueArg("source_class") -var argDestClass = makeIValue2Arg("dest_class") +var ( + argDelta = makeValueArg("delta") + argSleepUs = makeValueArg("sleep_us") + argValue = makeValueArg("value") + argVerbosity = makeValueArg("verbosity") + argSourceClass = makeIValueArg("source_class") + argDestClass = makeIValue2Arg("dest_class") +) var argNoReply = argDef{ parse: func(parser *parser, hdr, buf *streambuf.Buffer) error { @@ -297,8 +299,10 @@ func makeDefTextDataMessage( } } -var defTextDataRequest = makeDefTextDataMessage(true) -var defTextDataResponse = makeDefTextDataMessage(false) +var ( + defTextDataRequest = makeDefTextDataMessage(true) + defTextDataResponse = makeDefTextDataMessage(false) +) func loadCommand(name string, code commandCode) textCommandType { return defTextMessage(name, memcacheLoadMsg, code, argMultiKeys) @@ -403,13 +407,6 @@ func makeValueArg(name string) argDef { } } -func makeValue2Arg(name string) argDef { - return argDef{ - parse: textUint64Arg(setValue2), - serialize: serializeValue2(name), - } -} - func makeIValueArg(name string) argDef { return argDef{ parse: func(parser *parser, hdr, buf *streambuf.Buffer) error { @@ -592,7 +589,7 @@ func parseNoReplyArg(buf *streambuf.Buffer) (bool, error) { return false, textArgError(err) } - var noreplyArg = []byte("noreply") + noreplyArg := []byte("noreply") noreply := bytes.HasPrefix(buf.Bytes(), noreplyArg) if !noreply { return false, errExpectedNoReply diff --git a/packetbeat/protos/mongodb/config.go b/packetbeat/protos/mongodb/config.go index e7e2143b647..c90aeb67290 100644 --- a/packetbeat/protos/mongodb/config.go +++ b/packetbeat/protos/mongodb/config.go @@ -28,12 +28,10 @@ type mongodbConfig struct { MaxDocs int `config:"max_docs"` } -var ( - defaultConfig = mongodbConfig{ - ProtocolCommon: config.ProtocolCommon{ - TransactionTimeout: protos.DefaultTransactionExpiration, - }, - MaxDocLength: 5000, - MaxDocs: 10, - } -) +var defaultConfig = mongodbConfig{ + ProtocolCommon: config.ProtocolCommon{ + TransactionTimeout: protos.DefaultTransactionExpiration, + }, + MaxDocLength: 5000, + MaxDocs: 10, +} diff --git a/packetbeat/protos/mongodb/mongodb.go b/packetbeat/protos/mongodb/mongodb.go index b05ebce9150..0fcc5464b45 100644 --- a/packetbeat/protos/mongodb/mongodb.go +++ b/packetbeat/protos/mongodb/mongodb.go @@ -55,9 +55,7 @@ type transactionKey struct { id int } -var ( - unmatchedRequests = monitoring.NewInt(nil, "mongodb.unmatched_requests") -) +var unmatchedRequests = monitoring.NewInt(nil, "mongodb.unmatched_requests") func init() { protos.Register("mongodb", New) @@ -218,7 +216,6 @@ func (mongodb *mongodbPlugin) handleMongodb( tcptuple *common.TCPTuple, dir uint8, ) { - m.tcpTuple = *tcptuple m.direction = dir m.cmdlineTuple = mongodb.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) @@ -345,11 +342,12 @@ func reconstructQuery(t *transaction, full bool) (query string) { if !full { // remove the actual data. // TODO: review if we need to add other commands here - if t.method == "insert" { + switch t.method { + case "insert": params, err = doc2str(copyMapWithoutKey(t.params, "documents")) - } else if t.method == "update" { + case "update": params, err = doc2str(copyMapWithoutKey(t.params, "updates")) - } else if t.method == "findandmodify" { + case "findandmodify": params, err = doc2str(copyMapWithoutKey(t.params, "update")) } } else { diff --git a/packetbeat/protos/mongodb/mongodb_parser.go b/packetbeat/protos/mongodb/mongodb_parser.go index a5ff5fa0625..1abc6f890f2 100644 --- a/packetbeat/protos/mongodb/mongodb_parser.go +++ b/packetbeat/protos/mongodb/mongodb_parser.go @@ -116,10 +116,26 @@ func mongodbMessageParser(s *stream) (bool, bool) { // see http://docs.mongodb.org/meta-driver/latest/legacy/mongodb-wire-protocol/#op-reply func opReplyParse(d *decoder, m *mongodbMessage) (bool, bool) { _, err := d.readInt32() // ignore flags for now + if err != nil { + logp.Err("An error occurred while parsing OP_REPLY message: %s", err) + return false, false + } m.event["cursorId"], err = d.readInt64() + if err != nil { + logp.Err("An error occurred while parsing OP_REPLY message: %s", err) + return false, false + } m.event["startingFrom"], err = d.readInt32() + if err != nil { + logp.Err("An error occurred while parsing OP_REPLY message: %s", err) + return false, false + } numberReturned, err := d.readInt32() + if err != nil { + logp.Err("An error occurred while parsing OP_REPLY message: %s", err) + return false, false + } m.event["numberReturned"] = numberReturned debugf("Prepare to read %d document from reply", m.event["numberReturned"]) @@ -128,15 +144,27 @@ func opReplyParse(d *decoder, m *mongodbMessage) (bool, bool) { for i := 0; i < numberReturned; i++ { var document bson.M document, err = d.readDocument() + if err != nil { + logp.Err("An error occurred while parsing OP_REPLY message: %s", err) + return false, false + } // Check if the result is actually an error if i == 0 { if mongoError, present := document["$err"]; present { m.error, err = doc2str(mongoError) + if err != nil { + logp.Err("An error occurred while parsing OP_REPLY message: %s", err) + return false, false + } } if writeErrors, present := document["writeErrors"]; present { m.error, err = doc2str(writeErrors) + if err != nil { + logp.Err("An error occurred while parsing OP_REPLY message: %s", err) + return false, false + } } } @@ -144,10 +172,6 @@ func opReplyParse(d *decoder, m *mongodbMessage) (bool, bool) { } m.documents = documents - if err != nil { - logp.Err("An error occurred while parsing OP_REPLY message: %s", err) - return false, false - } return true, true } @@ -163,12 +187,26 @@ func opMsgLegacyParse(d *decoder, m *mongodbMessage) (bool, bool) { func opUpdateParse(d *decoder, m *mongodbMessage) (bool, bool) { _, err := d.readInt32() // always ZERO, a slot reserved in the protocol for future use + if err != nil { + logp.Err("An error occurred while parsing OP_UPDATE message: %s", err) + return false, false + } m.event["fullCollectionName"], err = d.readCStr() + if err != nil { + logp.Err("An error occurred while parsing OP_UPDATE message: %s", err) + return false, false + } _, err = d.readInt32() // ignore flags for now - + if err != nil { + logp.Err("An error occurred while parsing OP_UPDATE message: %s", err) + return false, false + } m.event["selector"], err = d.readDocumentStr() + if err != nil { + logp.Err("An error occurred while parsing OP_UPDATE message: %s", err) + return false, false + } m.event["update"], err = d.readDocumentStr() - if err != nil { logp.Err("An error occurred while parsing OP_UPDATE message: %s", err) return false, false @@ -179,6 +217,10 @@ func opUpdateParse(d *decoder, m *mongodbMessage) (bool, bool) { func opInsertParse(d *decoder, m *mongodbMessage) (bool, bool) { _, err := d.readInt32() // ignore flags for now + if err != nil { + logp.Err("An error occurred while parsing OP_INSERT message: %s", err) + return false, false + } m.event["fullCollectionName"], err = d.readCStr() // TODO parse bson documents @@ -230,11 +272,27 @@ func isDatabaseCommand(key string, val interface{}) bool { func opQueryParse(d *decoder, m *mongodbMessage) (bool, bool) { _, err := d.readInt32() // ignore flags for now + if err != nil { + logp.Err("An error occurred while parsing OP_QUERY message: %s", err) + return false, false + } fullCollectionName, err := d.readCStr() + if err != nil { + logp.Err("An error occurred while parsing OP_QUERY message: %s", err) + return false, false + } m.event["fullCollectionName"] = fullCollectionName m.event["numberToSkip"], err = d.readInt32() + if err != nil { + logp.Err("An error occurred while parsing OP_QUERY message: %s", err) + return false, false + } m.event["numberToReturn"], err = d.readInt32() + if err != nil { + logp.Err("An error occurred while parsing OP_QUERY message: %s", err) + return false, false + } query, err := d.readDocument() if d.i < len(d.in) { @@ -275,10 +333,21 @@ func opQueryParse(d *decoder, m *mongodbMessage) (bool, bool) { func opGetMoreParse(d *decoder, m *mongodbMessage) (bool, bool) { _, err := d.readInt32() // always ZERO, a slot reserved in the protocol for future use + if err != nil { + logp.Err("An error occurred while parsing OP_GET_MORE message: %s", err) + return false, false + } m.event["fullCollectionName"], err = d.readCStr() + if err != nil { + logp.Err("An error occurred while parsing OP_GET_MORE message: %s", err) + return false, false + } m.event["numberToReturn"], err = d.readInt32() + if err != nil { + logp.Err("An error occurred while parsing OP_GET_MORE message: %s", err) + return false, false + } m.event["cursorId"], err = d.readInt64() - if err != nil { logp.Err("An error occurred while parsing OP_GET_MORE message: %s", err) return false, false @@ -288,11 +357,21 @@ func opGetMoreParse(d *decoder, m *mongodbMessage) (bool, bool) { func opDeleteParse(d *decoder, m *mongodbMessage) (bool, bool) { _, err := d.readInt32() // always ZERO, a slot reserved in the protocol for future use + if err != nil { + logp.Err("An error occurred while parsing OP_DELETE message: %s", err) + return false, false + } m.event["fullCollectionName"], err = d.readCStr() + if err != nil { + logp.Err("An error occurred while parsing OP_DELETE message: %s", err) + return false, false + } _, err = d.readInt32() // ignore flags for now - + if err != nil { + logp.Err("An error occurred while parsing OP_DELETE message: %s", err) + return false, false + } m.event["selector"], err = d.readDocumentStr() - if err != nil { logp.Err("An error occurred while parsing OP_DELETE message: %s", err) return false, false @@ -405,7 +484,6 @@ func (d *decoder) readByte() (byte, error) { func (d *decoder) readInt32() (int, error) { b, err := d.readBytes(4) - if err != nil { return 0, err } @@ -418,7 +496,6 @@ func (d *decoder) readInt32() (int, error) { func (d *decoder) readInt64() (int, error) { b, err := d.readBytes(8) - if err != nil { return 0, err } @@ -436,6 +513,9 @@ func (d *decoder) readInt64() (int, error) { func (d *decoder) readDocument() (bson.M, error) { start := d.i documentLength, err := d.readInt32() + if err != nil { + return nil, err + } d.i = start + documentLength if len(d.in) < d.i { return nil, errors.New("document out of bounds") @@ -461,6 +541,9 @@ func doc2str(documentMap interface{}) (string, error) { func (d *decoder) readDocumentStr() (string, error) { documentMap, err := d.readDocument() + if err != nil { + return "", err + } document, err := doc2str(documentMap) return document, err } diff --git a/packetbeat/protos/mongodb/mongodb_test.go b/packetbeat/protos/mongodb/mongodb_test.go index ece0e1cf4aa..a1415dfa0a2 100644 --- a/packetbeat/protos/mongodb/mongodb_test.go +++ b/packetbeat/protos/mongodb/mongodb_test.go @@ -170,6 +170,7 @@ func TestSimpleFindLimit1_split(t *testing.T) { "1b000000013000e6762ff7c97652c001" + "3100d5b14ae9996c4440000273747265" + "657400100000004d6f72726973205061") + assert.NoError(t, err) respData2, err := hex.DecodeString( "726b2041766500027a6970636f646500" + @@ -183,6 +184,7 @@ func TestSimpleFindLimit1_split(t *testing.T) { "0000000964617465000044510a410100" + "00026772616465000200000041001073" + "636f72650006000000000332002b0000") + assert.NoError(t, err) respData3, err := hex.DecodeString( "00096461746500009cda693c01000002" + diff --git a/packetbeat/protos/mysql/config.go b/packetbeat/protos/mysql/config.go index d53429c3a2c..0c634b98291 100644 --- a/packetbeat/protos/mysql/config.go +++ b/packetbeat/protos/mysql/config.go @@ -31,13 +31,11 @@ type mysqlConfig struct { StatementTimeout time.Duration `config:"statement_timeout"` } -var ( - defaultConfig = mysqlConfig{ - ProtocolCommon: config.ProtocolCommon{ - TransactionTimeout: protos.DefaultTransactionExpiration, - }, - MaxRowLength: 1024, - MaxRows: 10, - StatementTimeout: 3600 * time.Second, - } -) +var defaultConfig = mysqlConfig{ + ProtocolCommon: config.ProtocolCommon{ + TransactionTimeout: protos.DefaultTransactionExpiration, + }, + MaxRowLength: 1024, + MaxRows: 10, + StatementTimeout: 3600 * time.Second, +} diff --git a/packetbeat/protos/mysql/mysql.go b/packetbeat/protos/mysql/mysql.go index 1553bdf9901..49ca17ece85 100644 --- a/packetbeat/protos/mysql/mysql.go +++ b/packetbeat/protos/mysql/mysql.go @@ -62,8 +62,6 @@ type mysqlMessage struct { numberOfRows int numberOfFields int size uint64 - fields []string - rows [][]string tables string isOK bool affectedRows uint64 @@ -83,7 +81,6 @@ type mysqlMessage struct { statementID int numberOfParams int - params []string } type mysqlTransaction struct { @@ -337,7 +334,7 @@ func mysqlMessageParser(s *mysqlStream) (bool, bool) { return true, false } - s.parseOffset += 4 //header + s.parseOffset += 4 // header s.parseOffset += int(m.packetLength) m.end = s.parseOffset if m.isRequest { @@ -347,7 +344,6 @@ func mysqlMessageParser(s *mysqlStream) (bool, bool) { } else { m.query = string(s.data[m.start+5 : m.end]) } - } else if m.isOK { // affected rows affectedRows, off, complete, err := readLinteger(s.data, m.start+5) @@ -475,7 +471,7 @@ func mysqlMessageParser(s *mysqlStream) (bool, bool) { return true, false } - s.parseOffset += 4 //header + s.parseOffset += 4 // header if s.data[s.parseOffset] == 0xfe { logp.Debug("mysqldetailed", "Received EOF packet") @@ -553,8 +549,8 @@ func (mysql *mysqlPlugin) ConnectionTimeout() time.Duration { } func (mysql *mysqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple, - dir uint8, private protos.ProtocolData) protos.ProtocolData { - + dir uint8, private protos.ProtocolData, +) protos.ProtocolData { defer logp.Recover("ParseMysql exception") priv := mysqlPrivateData{} @@ -613,8 +609,8 @@ func (mysql *mysqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple, } func (mysql *mysqlPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8, - nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) { - + nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool, +) { defer logp.Recover("GapInStream(mysql) exception") if private == nil { @@ -642,16 +638,16 @@ func (mysql *mysqlPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8, } func (mysql *mysqlPlugin) ReceivedFin(tcptuple *common.TCPTuple, dir uint8, - private protos.ProtocolData) protos.ProtocolData { - + private protos.ProtocolData, +) protos.ProtocolData { // TODO: check if we have data pending and either drop it to free // memory or send it up the stack. return private } func handleMysql(mysql *mysqlPlugin, m *mysqlMessage, tcptuple *common.TCPTuple, - dir uint8, rawMsg []byte) { - + dir uint8, rawMsg []byte, +) { m.tcpTuple = *tcptuple m.direction = dir m.cmdlineTuple = mysql.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) @@ -689,9 +685,10 @@ func (mysql *mysqlPlugin) receivedMysqlRequest(msg *mysqlMessage) { trans.statementID = msg.statementID stmts := mysql.getStmtsMap(msg.tcpTuple.Hashable()) if stmts == nil { - if msg.typ == mysqlCmdStmtExecute { + switch msg.typ { + case mysqlCmdStmtExecute: trans.query = "Request Execute Statement" - } else if msg.typ == mysqlCmdStmtClose { + case mysqlCmdStmtClose: trans.query = "Request Close Statement" } trans.notes = append(trans.notes, "The actual query being used is unknown") @@ -699,7 +696,8 @@ func (mysql *mysqlPlugin) receivedMysqlRequest(msg *mysqlMessage) { trans.bytesIn = msg.size return } - if msg.typ == mysqlCmdStmtExecute { + switch msg.typ { + case mysqlCmdStmtExecute: if value, ok := stmts[trans.statementID]; ok { trans.query = value.query // parse parameters @@ -711,7 +709,7 @@ func (mysql *mysqlPlugin) receivedMysqlRequest(msg *mysqlMessage) { trans.bytesIn = msg.size return } - } else if msg.typ == mysqlCmdStmtClose { + case mysqlCmdStmtClose: delete(stmts, trans.statementID) trans.query = "CmdStmtClose" mysql.transactions.Delete(tuple.Hashable()) @@ -899,7 +897,7 @@ func (mysql *mysqlPlugin) parseMysqlExecuteStatement(data []byte, stmtdata *mysq valueString := strconv.Itoa(int(binary.LittleEndian.Uint32(data[paramOffset:]))) paramString = append(paramString, valueString) paramOffset += 4 - //FIELD_TYPE_FLOAT + // FIELD_TYPE_FLOAT case 0x04: paramString = append(paramString, "TYPE_FLOAT") paramOffset += 4 @@ -1012,19 +1010,19 @@ func (mysql *mysqlPlugin) parseMysqlResponse(data []byte) ([]string, [][]string) return []string{}, [][]string{} } - fields := []string{} - rows := [][]string{} - if len(data) < 5 { - logp.Warn("Invalid response: data less than 4 bytes") + logp.Warn("Invalid response: data less than 5 bytes") return []string{}, [][]string{} } - if data[4] == 0x00 { + fields := []string{} + rows := [][]string{} + switch data[4] { + case 0x00: // OK response - } else if data[4] == 0xff { + case 0xff: // Error response - } else { + default: offset := 5 logp.Debug("mysql", "Data len: %d", len(data)) @@ -1100,7 +1098,7 @@ func (mysql *mysqlPlugin) parseMysqlResponse(data []byte) ([]string, [][]string) if data[offset+4] == 0xfe { // EOF - offset += length + 4 + offset += length + 4 // ineffassign break } diff --git a/packetbeat/protos/mysql/mysql_test.go b/packetbeat/protos/mysql/mysql_test.go index 309fd45cb0f..dd942399073 100644 --- a/packetbeat/protos/mysql/mysql_test.go +++ b/packetbeat/protos/mysql/mysql_test.go @@ -49,10 +49,6 @@ func (e *eventStore) publish(event beat.Event) { e.events = append(e.events, event) } -func (e *eventStore) empty() bool { - return len(e.events) == 0 -} - func mysqlModForTests(store *eventStore) *mysqlPlugin { callback := func(beat.Event) {} if store != nil { @@ -110,6 +106,7 @@ func TestMySQLParser_simpleRequest(t *testing.T) { t.Errorf("Wrong message size %d", stream.message.size) } } + func TestMySQLParser_OKResponse(t *testing.T) { data := []byte( "0700000100010401000000") @@ -363,11 +360,11 @@ func TestParseMySQL_simpleUpdateResponse(t *testing.T) { var tuple common.TCPTuple var private mysqlPrivateData - var countHandleMysql = 0 + countHandleMysql := 0 mysql.handleMysql = func(mysql *mysqlPlugin, m *mysqlMessage, tcp *common.TCPTuple, - dir uint8, raw_msg []byte) { - + dir uint8, raw_msg []byte, + ) { countHandleMysql++ } @@ -404,11 +401,11 @@ func TestParseMySQL_threeResponses(t *testing.T) { var tuple common.TCPTuple var private mysqlPrivateData - var countHandleMysql = 0 + countHandleMysql := 0 mysql.handleMysql = func(mysql *mysqlPlugin, m *mysqlMessage, tcptuple *common.TCPTuple, - dir uint8, raw_msg []byte) { - + dir uint8, raw_msg []byte, + ) { countHandleMysql++ } @@ -431,7 +428,6 @@ func TestParseMySQL_splitResponse(t *testing.T) { "3b00000303646566086d696e697477697404706f737404706f73740d706f73745f757365726e616d6508757365726e616d650c2100f0000000fd0000000000" + "3500000403646566086d696e697477697404706f737404706f73740a706f73745f7469746c65057469746c650c2100f0000000fd0000000000" + "3300000503646566086d696e697477697404706f737404706f737409706f73745f626f647904626f64790c2100fdff0200fc1000000000") - if err != nil { t.Errorf("Failed to decode string") } @@ -446,11 +442,11 @@ func TestParseMySQL_splitResponse(t *testing.T) { var tuple common.TCPTuple var private mysqlPrivateData - var countHandleMysql = 0 + countHandleMysql := 0 mysql.handleMysql = func(mysql *mysqlPlugin, m *mysqlMessage, tcptuple *common.TCPTuple, - dir uint8, raw_msg []byte) { - + dir uint8, raw_msg []byte, + ) { countHandleMysql++ } @@ -623,12 +619,18 @@ func Test_parseMysqlResponse_invalid(t *testing.T) { {0x05, 0x00, 0x00, 0x01, 0x01, 0x05, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00}, {0x05, 0x00, 0x00, 0x01, 0x01, 0x05, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00}, {0x05, 0x00, 0x00, 0x01, 0x01, 0x05, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00}, - {0x05, 0x00, 0x00, 0x01, 0x01, 0x05, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, - 0x01, 0x00}, - {0x15, 0x00, 0x00, 0x01, 0x01, 0x05, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, - 0x01, 0x00, 0x01}, - {0x15, 0x00, 0x00, 0x01, 0x01, 0x05, 0x15, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, - 0x01, 0x00, 0x01, 0x00}, + { + 0x05, 0x00, 0x00, 0x01, 0x01, 0x05, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, + 0x01, 0x00, + }, + { + 0x15, 0x00, 0x00, 0x01, 0x01, 0x05, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, + 0x01, 0x00, 0x01, + }, + { + 0x15, 0x00, 0x00, 0x01, 0x01, 0x05, 0x15, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, + 0x01, 0x00, 0x01, 0x00, + }, } for _, input := range tests { @@ -638,12 +640,14 @@ func Test_parseMysqlResponse_invalid(t *testing.T) { } tests = [][]byte{ - {0x15, 0x00, 0x00, 0x01, 0x01, - 0x0b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0xfe, 0x00, 0x01, //field + { + 0x15, 0x00, 0x00, 0x01, 0x01, + 0x0b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0xfe, 0x00, 0x01, // field 0x01, 0x00, 0x00, 0x00, 0xfe, // EOF }, - {0x15, 0x00, 0x00, 0x01, 0x01, - 0x0b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0xfe, 0x00, 0x01, //field + { + 0x15, 0x00, 0x00, 0x01, 0x01, + 0x0b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0xfe, 0x00, 0x01, // field 0x01, 0x00, 0x00, 0x00, 0xfe, // EOF 0x00, 0x00, }, @@ -668,7 +672,7 @@ func Test_PreparedStatement(t *testing.T) { packet := protos.Packet{Payload: rawData} var private protos.ProtocolData - private = mysql.Parse(&packet, tcpTuple, dir, private) + mysql.Parse(&packet, tcpTuple, dir, private) } send(tcp.TCPDirectionOriginal, "c00000001673656c6563742064697374696e637420636f756e742864697374696e63742070757263686173656465305f2e69642920617320636f6c5f305f305f2066726f6d2070757263686173655f64656d616e642070757263686173656465305f2077686572652070757263686173656465305f2e636861696e5f6d61737465723d3f20616e642070757263686173656465305f2e6372656174655f74696d653e3d3f20616e642070757263686173656465305f2e6372656174655f74696d653c3d3f") diff --git a/packetbeat/protos/nfs/config.go b/packetbeat/protos/nfs/config.go index 639bdad525d..e23672024c8 100644 --- a/packetbeat/protos/nfs/config.go +++ b/packetbeat/protos/nfs/config.go @@ -27,10 +27,8 @@ type rpcConfig struct { config.ProtocolCommon `config:",inline"` } -var ( - defaultConfig = rpcConfig{ - ProtocolCommon: config.ProtocolCommon{ - TransactionTimeout: 1 * time.Minute, - }, - } -) +var defaultConfig = rpcConfig{ + ProtocolCommon: config.ProtocolCommon{ + TransactionTimeout: 1 * time.Minute, + }, +} diff --git a/packetbeat/protos/nfs/request_handler.go b/packetbeat/protos/nfs/request_handler.go index fea417f4dc1..1225f9f6012 100644 --- a/packetbeat/protos/nfs/request_handler.go +++ b/packetbeat/protos/nfs/request_handler.go @@ -41,9 +41,7 @@ var acceptStatus = [...]string{ "system_err", } -var ( - unmatchedRequests = monitoring.NewInt(nil, "nfs.unmatched_requests") -) +var unmatchedRequests = monitoring.NewInt(nil, "nfs.unmatched_requests") // called by Cache, when re reply seen within expected time window func (r *rpc) handleExpiredPacket(nfs *nfs) { diff --git a/packetbeat/protos/nfs/rpc.go b/packetbeat/protos/nfs/rpc.go index 9cde7ab5aac..64fffe47606 100644 --- a/packetbeat/protos/nfs/rpc.go +++ b/packetbeat/protos/nfs/rpc.go @@ -126,7 +126,6 @@ func (r *rpc) Parse( dir uint8, private protos.ProtocolData, ) protos.ProtocolData { - defer logp.Recover("ParseRPC exception") conn := ensureRPCConnection(private) @@ -140,8 +139,8 @@ func (r *rpc) Parse( // Called when the FIN flag is seen in the TCP stream. func (r *rpc) ReceivedFin(tcptuple *common.TCPTuple, dir uint8, - private protos.ProtocolData) protos.ProtocolData { - + private protos.ProtocolData, +) protos.ProtocolData { defer logp.Recover("ReceivedFinRpc exception") // forced by TCP interface @@ -151,8 +150,8 @@ func (r *rpc) ReceivedFin(tcptuple *common.TCPTuple, dir uint8, // Called when a packets are missing from the tcp // stream. func (r *rpc) GapInStream(tcptuple *common.TCPTuple, dir uint8, - nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) { - + nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool, +) { defer logp.Recover("GapInRpcStream exception") // forced by TCP interface @@ -199,7 +198,6 @@ func (r *rpc) handleRPCFragment( tcptuple *common.TCPTuple, dir uint8, ) *rpcConnectionData { - st := conn.streams[dir] if st == nil { st = newStream(pkt, tcptuple) diff --git a/packetbeat/protos/nfs/xdr.go b/packetbeat/protos/nfs/xdr.go index de63fc755a2..168fb274cfd 100644 --- a/packetbeat/protos/nfs/xdr.go +++ b/packetbeat/protos/nfs/xdr.go @@ -40,24 +40,12 @@ func (r *xdr) size() int { return len(r.data) } -func (r *xdr) getInt() int32 { - i := int32(binary.BigEndian.Uint32(r.data[r.offset : r.offset+4])) - r.offset += 4 - return int32(i) -} - func (r *xdr) getUInt() uint32 { i := uint32(binary.BigEndian.Uint32(r.data[r.offset : r.offset+4])) r.offset += 4 return i } -func (r *xdr) getHyper() int64 { - i := int64(binary.BigEndian.Uint64(r.data[r.offset : r.offset+8])) - r.offset += 8 - return i -} - func (r *xdr) getUHyper() uint64 { i := uint64(binary.BigEndian.Uint64(r.data[r.offset : r.offset+8])) r.offset += 8 diff --git a/packetbeat/protos/pgsql/config.go b/packetbeat/protos/pgsql/config.go index 54a9a1b9e2e..13c11531409 100644 --- a/packetbeat/protos/pgsql/config.go +++ b/packetbeat/protos/pgsql/config.go @@ -28,12 +28,10 @@ type pgsqlConfig struct { MaxRows int `config:"max_rows"` } -var ( - defaultConfig = pgsqlConfig{ - ProtocolCommon: config.ProtocolCommon{ - TransactionTimeout: protos.DefaultTransactionExpiration, - }, - MaxRowLength: 1024, - MaxRows: 10, - } -) +var defaultConfig = pgsqlConfig{ + ProtocolCommon: config.ProtocolCommon{ + TransactionTimeout: protos.DefaultTransactionExpiration, + }, + MaxRowLength: 1024, + MaxRows: 10, +} diff --git a/packetbeat/protos/pgsql/parse.go b/packetbeat/protos/pgsql/parse.go index a7bcdb44770..9a302f99a18 100644 --- a/packetbeat/protos/pgsql/parse.go +++ b/packetbeat/protos/pgsql/parse.go @@ -158,7 +158,7 @@ func (pgsql *pgsqlPlugin) parseSimpleQuery(s *pgsqlStream, length int) (bool, bo m.start = s.parseOffset m.isRequest = true - s.parseOffset++ //type + s.parseOffset++ // type s.parseOffset += length m.end = s.parseOffset m.size = uint64(m.end - m.start) @@ -190,8 +190,8 @@ func (pgsql *pgsqlPlugin) parseRowDescription(s *pgsqlStream, length int) (bool, } pgsql.detailf("Fields: %s", m.fields) - s.parseOffset++ //type - s.parseOffset += length //length + s.parseOffset++ // type + s.parseOffset += length // length s.parseState = pgsqlGetDataState return pgsql.parseMessageData(s) } @@ -238,7 +238,7 @@ func (pgsql *pgsqlPlugin) parseCommandComplete(s *pgsqlStream, length int) (bool m.isOK = true m.toExport = true - s.parseOffset++ //type + s.parseOffset++ // type name, err := pgsqlString(s.data[s.parseOffset+4:], length-4) if err != nil { return false, false @@ -276,10 +276,10 @@ func (pgsql *pgsqlPlugin) parseErrorResponse(s *pgsqlStream, length int) (bool, m.isError = true m.toExport = true - s.parseOffset++ //type + s.parseOffset++ // type pgsql.parseError(s, s.data[s.parseOffset+4:s.parseOffset+length]) - s.parseOffset += length //length + s.parseOffset += length // length m.end = s.parseOffset m.size = uint64(m.end - m.start) @@ -294,7 +294,7 @@ func (pgsql *pgsqlPlugin) parseExtReq(s *pgsqlStream, length int) (bool, bool) { m.start = s.parseOffset m.isRequest = true - s.parseOffset++ //type + s.parseOffset++ // type s.parseOffset += length m.end = s.parseOffset m.size = uint64(m.end - m.start) @@ -326,7 +326,7 @@ func (pgsql *pgsqlPlugin) parseExtResp(s *pgsqlStream, length int) (bool, bool) m.isOK = true m.toExport = true - s.parseOffset++ //type + s.parseOffset++ // type s.parseOffset += length pgsql.detailf("Parse completion in an extended query response") s.parseState = pgsqlGetDataState @@ -336,7 +336,7 @@ func (pgsql *pgsqlPlugin) parseExtResp(s *pgsqlStream, length int) (bool, bool) func (pgsql *pgsqlPlugin) parseSkipMessage(s *pgsqlStream, length int) (bool, bool) { // TODO: add info from NoticeResponse in case there are warning messages for a query // ignore command - s.parseOffset++ //type + s.parseOffset++ // type s.parseOffset += length m := s.message @@ -615,21 +615,21 @@ func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool) // skip type s.parseOffset++ s.parseOffset += length - //TODO: pgsql.parseBind(s) + // TODO: pgsql.parseBind(s) case 'D': // Bind -> Describe // skip type s.parseOffset++ s.parseOffset += length - //TODO: pgsql.parseDescribe(s) + // TODO: pgsql.parseDescribe(s) case 'E': // Bind(or Describe) -> Execute // skip type s.parseOffset++ s.parseOffset += length - //TODO: pgsql.parseExecute(s) + // TODO: pgsql.parseExecute(s) case 'S': // Execute -> Sync diff --git a/packetbeat/protos/pgsql/pgsql.go b/packetbeat/protos/pgsql/pgsql.go index 5ad6f6e305a..20ad2fc5461 100644 --- a/packetbeat/protos/pgsql/pgsql.go +++ b/packetbeat/protos/pgsql/pgsql.go @@ -126,13 +126,9 @@ const ( cancelRequest ) -var ( - errInvalidLength = errors.New("invalid length") -) +var errInvalidLength = errors.New("invalid length") -var ( - unmatchedResponses = monitoring.NewInt(nil, "pgsql.unmatched_responses") -) +var unmatchedResponses = monitoring.NewInt(nil, "pgsql.unmatched_responses") func init() { protos.Register("pgsql", New) @@ -240,8 +236,8 @@ func (pgsql *pgsqlPlugin) ConnectionTimeout() time.Duration { } func (pgsql *pgsqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple, - dir uint8, private protos.ProtocolData) protos.ProtocolData { - + dir uint8, private protos.ProtocolData, +) protos.ProtocolData { defer logp.Recover("ParsePgsql exception") priv := pgsqlPrivateData{} @@ -334,8 +330,8 @@ func messageHasEnoughData(msg *pgsqlMessage) bool { // Called when there's a drop packet func (pgsql *pgsqlPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8, - nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) { - + nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool, +) { defer logp.Recover("GapInPgsqlStream exception") if private == nil { @@ -378,8 +374,8 @@ func (pgsql *pgsqlPlugin) ReceivedFin(tcptuple *common.TCPTuple, dir uint8, } var handlePgsql = func(pgsql *pgsqlPlugin, m *pgsqlMessage, tcptuple *common.TCPTuple, - dir uint8, raw_msg []byte) { - + dir uint8, raw_msg []byte, +) { m.tcpTuple = *tcptuple m.direction = dir m.cmdlineTuple = pgsql.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) @@ -433,7 +429,7 @@ func (pgsql *pgsqlPlugin) receivedPgsqlRequest(msg *pgsqlMessage) { func (pgsql *pgsqlPlugin) receivedPgsqlResponse(msg *pgsqlMessage) { tuple := msg.tcpTuple transList := pgsql.getTransaction(tuple.Hashable()) - if transList == nil || len(transList) == 0 { + if len(transList) == 0 { pgsql.debugf("Response from unknown transaction. Ignoring.") unmatchedResponses.Add(1) return @@ -511,8 +507,8 @@ func (pgsql *pgsqlPlugin) publishTransaction(t *pgsqlTransaction) { } func (pgsql *pgsqlPlugin) removeTransaction(transList []*pgsqlTransaction, - tuple common.TCPTuple, index int) *pgsqlTransaction { - + tuple common.TCPTuple, index int, +) *pgsqlTransaction { trans := transList[index] transList = append(transList[:index], transList[index+1:]...) if len(transList) == 0 { diff --git a/packetbeat/protos/pgsql/pgsql_test.go b/packetbeat/protos/pgsql/pgsql_test.go index 328b31e28c2..82075eaacae 100644 --- a/packetbeat/protos/pgsql/pgsql_test.go +++ b/packetbeat/protos/pgsql/pgsql_test.go @@ -46,10 +46,6 @@ func (e *eventStore) publish(event beat.Event) { e.events = append(e.events, event) } -func (e *eventStore) empty() bool { - return len(e.events) == 0 -} - func pgsqlModForTests(store *eventStore) *pgsqlPlugin { callback := func(beat.Event) {} if store != nil { @@ -234,11 +230,11 @@ func TestPgsqlParser_threeResponses(t *testing.T) { } var tuple common.TCPTuple var private pgsqlPrivateData - var countHandlePgsql = 0 + countHandlePgsql := 0 pgsql.handlePgsql = func(pgsql *pgsqlPlugin, m *pgsqlMessage, tcptuple *common.TCPTuple, - dir uint8, raw_msg []byte) { - + dir uint8, raw_msg []byte, + ) { countHandlePgsql++ } diff --git a/packetbeat/protos/protos.go b/packetbeat/protos/protos.go index e0343a0ee87..39dc09d8b7d 100644 --- a/packetbeat/protos/protos.go +++ b/packetbeat/protos/protos.go @@ -79,7 +79,6 @@ type Protocols interface { BpfFilter(withVlans bool, withICMP bool) string GetTCP(proto Protocol) TCPPlugin GetUDP(proto Protocol) UDPPlugin - GetAllTCP() map[Protocol]TCPPlugin GetAllUDP() map[Protocol]UDPPlugin diff --git a/packetbeat/protos/protos_test.go b/packetbeat/protos/protos_test.go index 22269a192d2..b3c1a912069 100644 --- a/packetbeat/protos/protos_test.go +++ b/packetbeat/protos/protos_test.go @@ -70,9 +70,7 @@ func (proto *UDPProtocol) GetPorts() []int { return proto.Ports } -func (proto *UDPProtocol) ParseUDP(pkt *Packet) { - return -} +func (proto *UDPProtocol) ParseUDP(pkt *Packet) {} type TCPUDPProtocol TestProtocol @@ -99,9 +97,7 @@ func (proto *TCPUDPProtocol) GapInStream(tcptuple *common.TCPTuple, dir uint8, return private, true } -func (proto *TCPUDPProtocol) ParseUDP(pkt *Packet) { - return -} +func (proto *TCPUDPProtocol) ParseUDP(pkt *Packet) {} func (proto *TCPUDPProtocol) ConnectionTimeout() time.Duration { return 0 } diff --git a/packetbeat/protos/redis/config.go b/packetbeat/protos/redis/config.go index beb1caed2e2..a1fd48f4149 100644 --- a/packetbeat/protos/redis/config.go +++ b/packetbeat/protos/redis/config.go @@ -27,14 +27,12 @@ type redisConfig struct { QueueLimits MessageQueueConfig `config:",inline"` } -var ( - defaultConfig = redisConfig{ - ProtocolCommon: config.ProtocolCommon{ - TransactionTimeout: protos.DefaultTransactionExpiration, - }, - QueueLimits: MessageQueueConfig{ - MaxBytes: 1024 * 1024, - MaxMessages: 20000, - }, - } -) +var defaultConfig = redisConfig{ + ProtocolCommon: config.ProtocolCommon{ + TransactionTimeout: protos.DefaultTransactionExpiration, + }, + QueueLimits: MessageQueueConfig{ + MaxBytes: 1024 * 1024, + MaxMessages: 20000, + }, +} diff --git a/packetbeat/protos/redis/redis.go b/packetbeat/protos/redis/redis.go index 23dd1ad8696..a219e8a14f9 100644 --- a/packetbeat/protos/redis/redis.go +++ b/packetbeat/protos/redis/redis.go @@ -140,6 +140,7 @@ func (redis *redisPlugin) Parse( } return conn } + func (redis *redisPlugin) newConnectionData() *redisConnectionData { return &redisConnectionData{ requests: NewMessageQueue(redis.queueConfig), @@ -171,7 +172,6 @@ func (redis *redisPlugin) doParse( tcptuple *common.TCPTuple, dir uint8, ) *redisConnectionData { - st := conn.streams[dir] if st == nil { st = newStream(pkt.Ts, tcptuple) @@ -339,8 +339,8 @@ func (redis *redisPlugin) newTransaction(requ, resp *redisMessage) beat.Event { } func (redis *redisPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8, - nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) { - + nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool, +) { // tsg: being packet loss tolerant is probably not very useful for Redis, // because most requests/response tend to fit in a single packet. @@ -348,8 +348,8 @@ func (redis *redisPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8, } func (redis *redisPlugin) ReceivedFin(tcptuple *common.TCPTuple, dir uint8, - private protos.ProtocolData) protos.ProtocolData { - + private protos.ProtocolData, +) protos.ProtocolData { // TODO: check if we have pending data that we can send up the stack return private diff --git a/packetbeat/protos/sip/config.go b/packetbeat/protos/sip/config.go index 58a92606e80..309558a225f 100644 --- a/packetbeat/protos/sip/config.go +++ b/packetbeat/protos/sip/config.go @@ -29,13 +29,11 @@ type config struct { KeepOriginal bool `config:"keep_original"` } -var ( - defaultConfig = config{ - ProtocolCommon: cfg.ProtocolCommon{ - TransactionTimeout: protos.DefaultTransactionExpiration, - }, - ParseAuthorization: true, - ParseBody: true, - KeepOriginal: true, - } -) +var defaultConfig = config{ + ProtocolCommon: cfg.ProtocolCommon{ + TransactionTimeout: protos.DefaultTransactionExpiration, + }, + ParseAuthorization: true, + ParseBody: true, + KeepOriginal: true, +} diff --git a/packetbeat/protos/sip/parser.go b/packetbeat/protos/sip/parser.go index 8d49ad61742..db63f05b429 100644 --- a/packetbeat/protos/sip/parser.go +++ b/packetbeat/protos/sip/parser.go @@ -269,8 +269,7 @@ func parseVersion(s []byte) (uint8, uint8, error) { func (parser *parser) parseHeaders(pi *parsingInfo, m *message) error { // check if it isn't headers end yet with /r/n/r/n - if !(len(pi.data)-pi.parseOffset >= 2 && - bytes.Equal(pi.data[pi.parseOffset:pi.parseOffset+2], constCRLF)) { + if len(pi.data)-pi.parseOffset < 2 || !bytes.Equal(pi.data[pi.parseOffset:pi.parseOffset+2], constCRLF) { offset, err := parser.parseHeader(m, pi.data[pi.parseOffset:]) if err != nil { return err diff --git a/packetbeat/protos/tcp/tcp.go b/packetbeat/protos/tcp/tcp.go index e9db7b94897..51ac46346a0 100644 --- a/packetbeat/protos/tcp/tcp.go +++ b/packetbeat/protos/tcp/tcp.go @@ -61,9 +61,7 @@ type Processor interface { Process(flow *flows.FlowID, hdr *layers.TCP, pkt *protos.Packet) } -var ( - droppedBecauseOfGaps = monitoring.NewInt(nil, "tcp.dropped_because_of_gaps") -) +var droppedBecauseOfGaps = monitoring.NewInt(nil, "tcp.dropped_because_of_gaps") type seqCompare int @@ -271,7 +269,8 @@ func (tcp *TCP) getStream(pkt *protos.Packet) (stream TCPStream, created bool) { id: tcp.getID(), tuple: &pkt.Tuple, protocol: protocol, - tcp: tcp} + tcp: tcp, + } conn.tcptuple = common.TCPTupleFromIPPort(conn.tuple, conn.id) tcp.streams.PutWithTimeout(pkt.Tuple.Hashable(), conn, timeout) return TCPStream{conn: conn, dir: TCPDirectionOriginal}, true @@ -289,16 +288,12 @@ func tcpSeqCompare(seq1, seq2 uint32) seqCompare { } } -func tcpSeqBefore(seq1 uint32, seq2 uint32) bool { - return int32(seq1-seq2) < 0 -} - func tcpSeqBeforeEq(seq1 uint32, seq2 uint32) bool { return int32(seq1-seq2) <= 0 } func buildPortsMap(plugins map[protos.Protocol]protos.TCPPlugin) (map[uint16]protos.Protocol, error) { - var res = map[uint16]protos.Protocol{} + res := map[uint16]protos.Protocol{} for proto, protoPlugin := range plugins { for _, port := range protoPlugin.GetPorts() { diff --git a/packetbeat/protos/tcp/tcp_test.go b/packetbeat/protos/tcp/tcp_test.go index 9dd910799b6..5678924a7d6 100644 --- a/packetbeat/protos/tcp/tcp_test.go +++ b/packetbeat/protos/tcp/tcp_test.go @@ -41,9 +41,7 @@ const ( ClientIP = "10.0.0.1" ) -var ( - httpProtocol, mysqlProtocol, redisProtocol protos.Protocol -) +var httpProtocol, mysqlProtocol, redisProtocol protos.Protocol func init() { new := func(_ bool, _ protos.Reporter, _ procs.ProcessesWatcher, _ *common.Config) (protos.Plugin, error) { @@ -199,7 +197,7 @@ func (p protocols) GetUDP(proto protos.Protocol) protos.UDPPlugin { retur func (p protocols) GetAll() map[protos.Protocol]protos.Plugin { return nil } func (p protocols) GetAllTCP() map[protos.Protocol]protos.TCPPlugin { return p.tcp } func (p protocols) GetAllUDP() map[protos.Protocol]protos.UDPPlugin { return nil } -func (p protocols) Register(proto protos.Protocol, plugin protos.Plugin) { return } +func (p protocols) Register(proto protos.Protocol, plugin protos.Plugin) {} func TestTCSeqPayload(t *testing.T) { type segment struct { @@ -213,7 +211,8 @@ func TestTCSeqPayload(t *testing.T) { expectedGaps int expectedState []byte }{ - {"No overlap", + { + "No overlap", []segment{ {1, []byte{1, 2, 3, 4, 5}}, {6, []byte{6, 7, 8, 9, 10}}, @@ -221,7 +220,8 @@ func TestTCSeqPayload(t *testing.T) { 0, []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, }, - {"Gap drop state", + { + "Gap drop state", []segment{ {1, []byte{1, 2, 3, 4}}, {15, []byte{5, 6, 7, 8}}, @@ -229,7 +229,8 @@ func TestTCSeqPayload(t *testing.T) { 10, []byte{5, 6, 7, 8}, }, - {"ACK same sequence number", + { + "ACK same sequence number", []segment{ {1, []byte{1, 2}}, {3, nil}, @@ -239,7 +240,8 @@ func TestTCSeqPayload(t *testing.T) { 0, []byte{1, 2, 3, 4, 5, 6}, }, - {"ACK same sequence number 2", + { + "ACK same sequence number 2", []segment{ {1, nil}, {2, nil}, @@ -253,7 +255,8 @@ func TestTCSeqPayload(t *testing.T) { 0, []byte{1, 2, 3, 4, 5, 6, 7, 8}, }, - {"Overlap, first segment bigger", + { + "Overlap, first segment bigger", []segment{ {1, []byte{1, 2}}, {3, []byte{3, 4}}, @@ -263,7 +266,8 @@ func TestTCSeqPayload(t *testing.T) { 0, []byte{1, 2, 3, 4, 5, 6}, }, - {"Overlap, second segment bigger", + { + "Overlap, second segment bigger", []segment{ {1, []byte{1, 2}}, {3, []byte{3}}, @@ -273,7 +277,8 @@ func TestTCSeqPayload(t *testing.T) { 0, []byte{1, 2, 3, 4, 5, 6}, }, - {"Overlap, covered", + { + "Overlap, covered", []segment{ {1, []byte{1, 2, 3, 4}}, {2, []byte{2, 3}}, diff --git a/packetbeat/protos/thrift/config.go b/packetbeat/protos/thrift/config.go index f687da9ac0c..0d946a02438 100644 --- a/packetbeat/protos/thrift/config.go +++ b/packetbeat/protos/thrift/config.go @@ -34,16 +34,14 @@ type thriftConfig struct { IdlFiles []string `config:"idl_files"` } -var ( - defaultConfig = thriftConfig{ - ProtocolCommon: config.ProtocolCommon{ - TransactionTimeout: protos.DefaultTransactionExpiration, - }, - StringMaxSize: 200, - CollectionMaxSize: 15, - DropAfterNStructFields: 500, - TransportType: "socket", - ProtocolType: "binary", - CaptureReply: true, - } -) +var defaultConfig = thriftConfig{ + ProtocolCommon: config.ProtocolCommon{ + TransactionTimeout: protos.DefaultTransactionExpiration, + }, + StringMaxSize: 200, + CollectionMaxSize: 15, + DropAfterNStructFields: 500, + TransportType: "socket", + ProtocolType: "binary", + CaptureReply: true, +} diff --git a/packetbeat/protos/thrift/thrift.go b/packetbeat/protos/thrift/thrift.go index d9778031d76..04c73747f7f 100644 --- a/packetbeat/protos/thrift/thrift.go +++ b/packetbeat/protos/thrift/thrift.go @@ -656,8 +656,8 @@ func (thrift *thriftPlugin) readStruct(data []byte) (value string, ok bool, comp } func (thrift *thriftPlugin) formatStruct(fields []thriftField, resolveNames bool, - fieldnames []*string) string { - + fieldnames []*string, +) string { toJoin := []string{} for i, field := range fields { if i == thrift.collectionMaxSize { @@ -746,7 +746,7 @@ func (thrift *thriftPlugin) readField(s *thriftStream) (ok bool, complete bool, func (thrift *thriftPlugin) messageParser(s *thriftStream) (bool, bool) { var ok, complete bool - var m = s.message + m := s.message logp.Debug("thriftdetailed", "messageParser called parseState=%v offset=%v", s.parseState, s.parseOffset) @@ -860,7 +860,7 @@ func (stream *thriftStream) prepareForNewMessage(flush bool) { } else { stream.data = stream.data[stream.parseOffset:] } - //logp.Debug("thrift", "remaining data: [%s]", stream.data) + // logp.Debug("thrift", "remaining data: [%s]", stream.data) stream.parseOffset = 0 stream.message = nil stream.parseState = thriftStartState @@ -871,8 +871,8 @@ type thriftPrivateData struct { } func (thrift *thriftPlugin) messageComplete(tcptuple *common.TCPTuple, dir uint8, - stream *thriftStream, priv *thriftPrivateData) { - + stream *thriftStream, priv *thriftPrivateData, +) { flush := false if stream.message.isRequest { @@ -906,7 +906,6 @@ func (thrift *thriftPlugin) messageComplete(tcptuple *common.TCPTuple, dir uint8 // and reset message stream.prepareForNewMessage(flush) - } func (thrift *thriftPlugin) ConnectionTimeout() time.Duration { @@ -914,8 +913,8 @@ func (thrift *thriftPlugin) ConnectionTimeout() time.Duration { } func (thrift *thriftPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple, dir uint8, - private protos.ProtocolData) protos.ProtocolData { - + private protos.ProtocolData, +) protos.ProtocolData { defer logp.Recover("ParseThrift exception") priv := thriftPrivateData{} @@ -1040,8 +1039,8 @@ func (thrift *thriftPlugin) receivedReply(msg *thriftMessage) { } func (thrift *thriftPlugin) ReceivedFin(tcptuple *common.TCPTuple, dir uint8, - private protos.ProtocolData) protos.ProtocolData { - + private protos.ProtocolData, +) protos.ProtocolData { trans := thrift.getTransaction(tcptuple.Hashable()) if trans != nil { if trans.request != nil && trans.reply == nil { @@ -1055,8 +1054,8 @@ func (thrift *thriftPlugin) ReceivedFin(tcptuple *common.TCPTuple, dir uint8, } func (thrift *thriftPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8, - nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) { - + nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool, +) { defer logp.Recover("GapInStream(thrift) exception") logp.Debug("thriftdetailed", "GapInStream called") diff --git a/packetbeat/protos/thrift/thrift_idl.go b/packetbeat/protos/thrift/thrift_idl.go index 82d8dedbe93..8bc8bec5b3b 100644 --- a/packetbeat/protos/thrift/thrift_idl.go +++ b/packetbeat/protos/thrift/thrift_idl.go @@ -49,7 +49,7 @@ func fieldsToArrayByID(fields []*parser.Field) []*string { } } - output := make([]*string, max+1, max+1) + output := make([]*string, max+1) for _, field := range fields { if len(field.Name) > 0 { diff --git a/packetbeat/protos/thrift/thrift_test.go b/packetbeat/protos/thrift/thrift_test.go index b5377eb4665..0938c60a3be 100644 --- a/packetbeat/protos/thrift/thrift_test.go +++ b/packetbeat/protos/thrift/thrift_test.go @@ -122,7 +122,6 @@ func TestThrift_readMessageBegin(t *testing.T) { data, _ = hex.DecodeString("800100010000000570696e6700000001") stream = thriftStream{data: data, message: new(thriftMessage)} - m = stream.message ok, complete = thrift.readMessageBegin(&stream) if !ok || complete { t.Errorf("Bad result: %v %v", ok, complete) @@ -130,7 +129,6 @@ func TestThrift_readMessageBegin(t *testing.T) { data, _ = hex.DecodeString("800100010000000570696e6700000001") stream = thriftStream{data: data, message: new(thriftMessage)} - m = stream.message ok, complete = thrift.readMessageBegin(&stream) if !ok || complete { t.Errorf("Bad result: %v %v", ok, complete) @@ -150,7 +148,6 @@ func TestThrift_readMessageBegin(t *testing.T) { data, _ = hex.DecodeString("0000000570696e670100000000") stream = thriftStream{data: data, message: new(thriftMessage)} - m = stream.message ok, complete = thrift.readMessageBegin(&stream) if !ok || complete { t.Error("Bad result:", ok, complete) diff --git a/packetbeat/protos/tls/alerts.go b/packetbeat/protos/tls/alerts.go index 4713e7e81f6..4da5c7d5b75 100644 --- a/packetbeat/protos/tls/alerts.go +++ b/packetbeat/protos/tls/alerts.go @@ -25,8 +25,10 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" ) -type alertSeverity uint8 -type alertCode uint8 +type ( + alertSeverity uint8 + alertCode uint8 +) type alert struct { severity alertSeverity @@ -67,9 +69,7 @@ var alertNames = map[alertCode]string{ 115: "unknown_psk_identity", } -var ( - errRead = errors.New("Buffer read error") -) +var errRead = errors.New("Buffer read error") func (severity alertSeverity) String() string { switch severity { diff --git a/packetbeat/protos/tls/algos.go b/packetbeat/protos/tls/algos.go index b3ba6ea1867..8abd7021b67 100644 --- a/packetbeat/protos/tls/algos.go +++ b/packetbeat/protos/tls/algos.go @@ -22,11 +22,13 @@ import ( "fmt" ) -type cipherSuite uint16 -type signatureScheme uint16 -type pointsGroup uint16 -type compressionMethod uint8 -type ecPointsFormat uint8 +type ( + cipherSuite uint16 + signatureScheme uint16 + pointsGroup uint16 + compressionMethod uint8 + ecPointsFormat uint8 +) // from https://www.iana.org/assignments/tls-parameters/tls-parameters.xhtml#tls-parameters-4 var cipherSuites = map[cipherSuite]string{ @@ -440,26 +442,26 @@ var supportedGroups = map[pointsGroup]string{ } var signatureSchemes = map[signatureScheme]string{ - /* RSASSA-PKCS1-v1_5 algorithms */ + // RSASSA-PKCS1-v1_5 algorithms 0x0401: "rsa_pkcs1_sha256", 0x0501: "rsa_pkcs1_sha384", 0x0601: "rsa_pkcs1_sha512", - /* ECDSA algorithms */ + // ECDSA algorithms 0x0403: "ecdsa_secp256r1_sha256", 0x0503: "ecdsa_secp384r1_sha384", 0x0603: "ecdsa_secp521r1_sha512", - /* RSASSA-PSS algorithms */ + // RSASSA-PSS algorithms 0x0804: "rsa_pss_sha256", 0x0805: "rsa_pss_sha384", 0x0806: "rsa_pss_sha512", - /* EdDSA algorithms */ + // EdDSA algorithms 0x0807: "ed25519", 0x0808: "ed448", - /* Legacy algorithms */ + // Legacy algorithms 0x0201: "rsa_pkcs1_sha1", 0x0203: "ecdsa_sha1", } diff --git a/packetbeat/protos/tls/config.go b/packetbeat/protos/tls/config.go index 775a8f69cc1..2190ccc18dd 100644 --- a/packetbeat/protos/tls/config.go +++ b/packetbeat/protos/tls/config.go @@ -30,13 +30,11 @@ type tlsConfig struct { Fingerprints []string `config:"fingerprints"` } -var ( - defaultConfig = tlsConfig{ - ProtocolCommon: config.ProtocolCommon{ - TransactionTimeout: protos.DefaultTransactionExpiration, - }, - SendCertificates: true, - IncludeDetailedFields: true, - Fingerprints: []string{"sha1"}, - } -) +var defaultConfig = tlsConfig{ + ProtocolCommon: config.ProtocolCommon{ + TransactionTimeout: protos.DefaultTransactionExpiration, + }, + SendCertificates: true, + IncludeDetailedFields: true, + Fingerprints: []string{"sha1"}, +} diff --git a/packetbeat/protos/tls/extensions.go b/packetbeat/protos/tls/extensions.go index 0021705c7f7..1ffceb3c235 100644 --- a/packetbeat/protos/tls/extensions.go +++ b/packetbeat/protos/tls/extensions.go @@ -35,12 +35,14 @@ type Extensions struct { InOrder []ExtensionID } -type extensionParser func(reader bufferView) interface{} -type extension struct { - label string - parser extensionParser - saveRaw bool -} +type ( + extensionParser func(reader bufferView) interface{} + extension struct { + label string + parser extensionParser + saveRaw bool + } +) const ( // ExtensionSupportedGroups identifies the supported group extension @@ -72,7 +74,6 @@ var extensionMap = map[uint16]extension{ // ParseExtensions returns an Extensions object parsed from the supplied buffer func ParseExtensions(buffer bufferView) Extensions { - var extensionsLength uint16 if !buffer.read16Net(0, &extensionsLength) || extensionsLength == 0 { // No extensions diff --git a/packetbeat/protos/tls/extensions_test.go b/packetbeat/protos/tls/extensions_test.go index 23caec09019..c61937411a9 100644 --- a/packetbeat/protos/tls/extensions_test.go +++ b/packetbeat/protos/tls/extensions_test.go @@ -27,7 +27,6 @@ import ( ) func TestSni(t *testing.T) { - // Single element buf := mkBuf(t, "000d"+ // 13 bytes @@ -126,7 +125,6 @@ func TestSni(t *testing.T) { } func TestParseMaxFragmentLength(t *testing.T) { - r := parseMaxFragmentLen(*mkBuf(t, "01", 1)) assert.Equal(t, "2^9", r.(string)) r = parseMaxFragmentLen(*mkBuf(t, "04", 1)) diff --git a/packetbeat/protos/tls/fingerprint.go b/packetbeat/protos/tls/fingerprint.go index 8768ff448e7..70489b687f9 100644 --- a/packetbeat/protos/tls/fingerprint.go +++ b/packetbeat/protos/tls/fingerprint.go @@ -36,8 +36,10 @@ type FingerprintAlgorithm struct { algo AlgorithmFactory } -var hashMap = make(map[string]*FingerprintAlgorithm) -var hashNames []string +var ( + hashMap = make(map[string]*FingerprintAlgorithm) + hashNames []string +) func init() { registerAlgo(func() hash.Hash { return md5.New() }, "md5", "") diff --git a/packetbeat/protos/tls/ja3.go b/packetbeat/protos/tls/ja3.go index 6e6faf5e26c..09a0aec5ea4 100644 --- a/packetbeat/protos/tls/ja3.go +++ b/packetbeat/protos/tls/ja3.go @@ -25,7 +25,6 @@ import ( ) func getJa3Fingerprint(hello *helloMessage) (hash string, ja3str string) { - // build the array of arrays of numbers data := make([][]uint16, 5) diff --git a/packetbeat/protos/tls/parse.go b/packetbeat/protos/tls/parse.go index d0ec3be75ef..fa4515e2248 100644 --- a/packetbeat/protos/tls/parse.go +++ b/packetbeat/protos/tls/parse.go @@ -54,9 +54,9 @@ type recordType uint8 const ( recordTypeChangeCipherSpec recordType = 20 - recordTypeAlert = 21 - recordTypeHandshake = 22 - recordTypeApplicationData = 23 + recordTypeAlert recordType = 21 + recordTypeHandshake recordType = 22 + recordTypeApplicationData recordType = 23 ) type handshakeType uint8 @@ -169,8 +169,10 @@ func readHandshakeHeader(buf *streambuf.Buffer) (*handshakeHeader, error) { if len16, err = buf.ReadNetUint16At(2); err != nil { return nil, err } - return &handshakeHeader{handshakeType(typ), - int(len16) | (int(len8) << 16)}, nil + return &handshakeHeader{ + handshakeType(typ), + int(len16) | (int(len8) << 16), + }, nil } func (header *recordHeader) String() string { @@ -215,7 +217,6 @@ func (hello *helloMessage) supportedCiphers() []string { } func (parser *parser) parse(buf *streambuf.Buffer) parserResult { - for buf.Avail(recordHeaderSize) { header, err := readRecordHeader(buf) diff --git a/packetbeat/protos/tls/parse_test.go b/packetbeat/protos/tls/parse_test.go index 37f4a1b1956..f599b30c44c 100644 --- a/packetbeat/protos/tls/parse_test.go +++ b/packetbeat/protos/tls/parse_test.go @@ -125,12 +125,6 @@ func mapGet(t *testing.T, m common.MapStr, key string) interface{} { return value } -func mapInt(t *testing.T, m common.MapStr, key string) uint32 { - value, err := m.GetValue(key) - assert.NoError(t, err) - return value.(uint32) -} - func TestParseRecordHeader(t *testing.T) { if testing.Verbose() { isDebug = true @@ -173,6 +167,7 @@ func TestParseHandshakeHeader(t *testing.T) { _, err = readHandshakeHeader(sBuf(t, "112233")) assert.Error(t, err) header, err := readHandshakeHeader(sBuf(t, "11223344")) + assert.NoError(t, err) assert.Equal(t, handshakeType(0x11), header.handshakeType) assert.Equal(t, 0x223344, header.length) } @@ -202,7 +197,6 @@ func TestParserParse(t *testing.T) { // Certificate request assert.Equal(t, resultOK, parser.parse(sBuf(t, "16030300040d000000"))) assert.True(t, parser.certRequested) - } func TestParserHello(t *testing.T) { diff --git a/packetbeat/protos/tls/tls.go b/packetbeat/protos/tls/tls.go index 9db2a28f694..a785d101021 100644 --- a/packetbeat/protos/tls/tls.go +++ b/packetbeat/protos/tls/tls.go @@ -170,10 +170,9 @@ func (plugin *tlsPlugin) doParse( tcptuple *common.TCPTuple, dir uint8, ) *tlsConnectionData { - // Ignore further traffic after the handshake is completed (encrypted connection) // TODO: request/response analysis - if 0 != conn.handshakeCompleted&(1< 0 { desc = dev.Description } - r += fmt.Sprintf(" (%s)", desc) + fmt.Fprintf(&buf, " (%s)", desc) } if withIP { - ips := "Not assigned ip address" - if len(dev.Addresses) > 0 { - ips = "" - + buf.WriteString(" (") + if len(dev.Addresses) == 0 { + buf.WriteString("Not assigned ip address") + } else { for i, address := range []pcap.InterfaceAddress(dev.Addresses) { - // Add a space between the IP address. - if i > 0 { - ips += " " + if i != 0 { + buf.WriteByte(' ') } - - ips += fmt.Sprintf("%s", address.IP.String()) + fmt.Fprint(&buf, address.IP) } } - r += fmt.Sprintf(" (%s)", ips) - + buf.WriteByte(')') } - ret = append(ret, r) + names = append(names, buf.String()) } - return ret, nil + return names } func resolveDeviceName(name string) (string, error) { diff --git a/packetbeat/sniffer/device_test.go b/packetbeat/sniffer/device_test.go new file mode 100644 index 00000000000..189bc92a9cd --- /dev/null +++ b/packetbeat/sniffer/device_test.go @@ -0,0 +1,136 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sniffer + +import ( + "net" + "reflect" + "testing" + + "github.com/tsg/gopacket/pcap" +) + +var formatDeviceNamesTests = []struct { + name string + interfaces []pcap.Interface + withDesc bool + withIP bool + want []string +}{ + {name: "empty"}, + { + name: "loopback no withs", + interfaces: []pcap.Interface{ + { + Name: "lo", Description: "loopback", + Addresses: []pcap.InterfaceAddress{ + {IP: net.IP{127, 0, 0, 1}, Netmask: net.IPMask{255, 0, 0, 0}}, + }, + }, + }, + want: []string{ + "lo", + }, + }, + { + name: "loopback with desc", + interfaces: []pcap.Interface{ + { + Name: "lo", Description: "loopback", + Addresses: []pcap.InterfaceAddress{ + {IP: net.IP{127, 0, 0, 1}, Netmask: net.IPMask{255, 0, 0, 0}}, + }, + }, + }, + withDesc: true, + want: []string{ + "lo (loopback)", + }, + }, + { + name: "loopback with IPs", + interfaces: []pcap.Interface{ + { + Name: "lo", Description: "loopback", + Addresses: []pcap.InterfaceAddress{ + {IP: net.IP{127, 0, 0, 1}, Netmask: net.IPMask{255, 0, 0, 0}}, + }, + }, + }, + withIP: true, + want: []string{ + "lo (127.0.0.1)", + }, + }, + { + name: "loopback with the lot", + interfaces: []pcap.Interface{ + { + Name: "lo", Description: "loopback", + Addresses: []pcap.InterfaceAddress{ + {IP: net.IP{127, 0, 0, 1}, Netmask: net.IPMask{255, 0, 0, 0}}, + }, + }, + }, + withDesc: true, + withIP: true, + want: []string{ + "lo (loopback) (127.0.0.1)", + }, + }, + { + name: "two addr loopback with the lot", + interfaces: []pcap.Interface{ + { + Name: "lo", Description: "loopback", + Addresses: []pcap.InterfaceAddress{ + {IP: net.IP{127, 0, 0, 1}, Netmask: net.IPMask{255, 0, 0, 0}}, + {IP: net.IP{127, 0, 1, 1}, Netmask: net.IPMask{255, 0, 0, 0}}, + }, + }, + }, + withDesc: true, + withIP: true, + want: []string{ + "lo (loopback) (127.0.0.1 127.0.1.1)", + }, + }, + { + name: "no IP loopback with the lot", + interfaces: []pcap.Interface{ + { + Name: "lo", Description: "loopback", + }, + }, + withDesc: true, + withIP: true, + want: []string{ + "lo (loopback) (Not assigned ip address)", + }, + }, +} + +func TestFormatDevices(t *testing.T) { + for _, test := range formatDeviceNamesTests { + got := formatDeviceNames(test.interfaces, test.withDesc, test.withIP) + if !reflect.DeepEqual(got, test.want) { + t.Errorf("unexpected result for test %s:\ngot: %v\nwant:%v", + test.name, got, test.want) + } + } +} diff --git a/packetbeat/sniffer/sniffer.go b/packetbeat/sniffer/sniffer.go index bfe720c7b98..07a7e3096a9 100644 --- a/packetbeat/sniffer/sniffer.go +++ b/packetbeat/sniffer/sniffer.go @@ -39,7 +39,6 @@ import ( // to a Worker. type Sniffer struct { config config.InterfacesConfig - dumper *pcap.Dumper state atomic.Int32 // store snifferState @@ -60,7 +59,6 @@ type Worker interface { type snifferHandle interface { gopacket.PacketDataSource - LinkType() layers.LinkType Close() } diff --git a/packetbeat/sniffer/sniffer_test.go b/packetbeat/sniffer/sniffer_test.go index eea7821d889..52b0b57fb33 100644 --- a/packetbeat/sniffer/sniffer_test.go +++ b/packetbeat/sniffer/sniffer_test.go @@ -52,13 +52,16 @@ func TestSniffer_afpacketComputeSize(t *testing.T) { t.Error("Value too big", blockSize, numBlocks) } - frameSize, blockSize, numBlocks, err = afpacketComputeSize(0, 1514, 4096) + _, _, _, err = afpacketComputeSize(0, 1514, 4096) if err == nil { t.Error("Expected an error") } // 16436 is the default MTU size of the loopback interface frameSize, blockSize, numBlocks, err = afpacketComputeSize(30, 16436, 4096) + if err != nil { + t.Error(err) + } if frameSize != 4096*5 || blockSize != 4096*5*128 || numBlocks != 12 { t.Error("Bad result", frameSize, blockSize, numBlocks) }