Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

packetbeat: preparation for npcap addition #29017

Merged
merged 11 commits into from
Nov 30, 2021
2 changes: 1 addition & 1 deletion packetbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var RootCmd *cmd.BeatsRootCmd

// PacketbeatSettings contains the default settings for packetbeat
func PacketbeatSettings() instance.Settings {
var runFlags = pflag.NewFlagSet(Name, pflag.ExitOnError)
runFlags := pflag.NewFlagSet(Name, pflag.ExitOnError)
runFlags.AddGoFlag(flag.CommandLine.Lookup("I"))
runFlags.AddGoFlag(flag.CommandLine.Lookup("t"))
runFlags.AddGoFlag(flag.CommandLine.Lookup("O"))
Expand Down
4 changes: 2 additions & 2 deletions packetbeat/config/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (i agentInput) addProcessorsAndIndex(cfg *common.Config) (*common.Config, e
mergeConfig, err := common.NewConfigFrom(common.MapStr{
"index": datastreamConfig.Datastream.Type + "-" + datastreamConfig.Datastream.Dataset + "-" + namespace,
"processors": append([]common.MapStr{
common.MapStr{
{
"add_fields": common.MapStr{
"target": "data_stream",
"fields": common.MapStr{
Expand All @@ -76,7 +76,7 @@ func (i agentInput) addProcessorsAndIndex(cfg *common.Config) (*common.Config, e
},
},
},
common.MapStr{
{
"add_fields": common.MapStr{
"target": "event",
"fields": common.MapStr{
Expand Down
3 changes: 2 additions & 1 deletion packetbeat/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ func New(
d := Decoder{
flows: f,
decoders: make(map[gopacket.LayerType]gopacket.DecodingLayer),
icmp4Proc: icmp4, icmp6Proc: icmp6, tcpProc: tcp, udpProc: udp}
icmp4Proc: icmp4, icmp6Proc: icmp6, tcpProc: tcp, udpProc: udp,
}
d.stD1Q.init(&d.d1q[0], &d.d1q[1])
d.stIP4.init(&d.ip4[0], &d.ip4[1])
d.stIP6.init(&d.ip6[0], &d.ip6[1])
Expand Down
6 changes: 5 additions & 1 deletion packetbeat/flows/flows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@ func TestFlowsCounting(t *testing.T) {
assert.NoError(t, err)

uint1, err := module.NewUint("uint1")
assert.NoError(t, err)
uint2, err := module.NewUint("uint2")
assert.NoError(t, err)
int1, err := module.NewInt("int1")
assert.NoError(t, err)
int2, err := module.NewInt("int2")
assert.NoError(t, err)
float1, err := module.NewFloat("float1")
assert.NoError(t, err)
float2, err := module.NewFloat("float2")

assert.NoError(t, err)

pub := &flowsChan{make(chan []beat.Event, 1)}
Expand Down
2 changes: 1 addition & 1 deletion packetbeat/flows/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func makeWorker(
if align > 0 {
// round time to nearest 10 seconds for alignment
aligned := time.Unix(((time.Now().Unix()+(align-1))/align)*align, 0)
waitStart := aligned.Sub(time.Now())
waitStart := time.Until(aligned)
debugf("worker wait start(%v): %v", aligned, waitStart)
if cont := w.sleep(waitStart); !cont {
return
Expand Down
8 changes: 3 additions & 5 deletions packetbeat/flows/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ import (
"github.com/elastic/beats/v7/packetbeat/procs"
)

var (
// Use `go test -data` to update sample event files.
dataFlag = flag.Bool("data", false, "Write updated data.json files")
)
// Use `go test -data` to update sample event files.
var dataFlag = flag.Bool("data", false, "Write updated data.json files")

func TestCreateEvent(t *testing.T) {
logp.TestingSetup()
Expand Down Expand Up @@ -124,7 +122,7 @@ func TestCreateEvent(t *testing.T) {
t.Fatal(err)
}

if err := ioutil.WriteFile("../_meta/sample_outputs/flow.json", output, 0644); err != nil {
if err := ioutil.WriteFile("../_meta/sample_outputs/flow.json", output, 0o644); err != nil {
t.Fatal(err)
}
}
Expand Down
10 changes: 5 additions & 5 deletions packetbeat/pb/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,14 @@ func (f *Fields) ComputeValues(localIPs []net.IP, internalNetworks []string) err
}

// network.community_id
switch {
case f.Network.Transport == "udp":
switch f.Network.Transport {
case "udp":
flow.Protocol = 17
case f.Network.Transport == "tcp":
case "tcp":
flow.Protocol = 6
case f.Network.Transport == "icmp":
case "icmp":
flow.Protocol = 1
case f.Network.Transport == "ipv6-icmp":
case "ipv6-icmp":
flow.Protocol = 58
}
flow.ICMP.Type = f.ICMPType
Expand Down
4 changes: 2 additions & 2 deletions packetbeat/processor/add_kubernetes_metadata/indexers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ func init() {
// Register default indexers
cfg := common.NewConfig()

//Add IP Port Indexer as a default indexer
// Add IP Port Indexer as a default indexer
kubernetes.Indexing.AddDefaultIndexerConfig(kubernetes.IPPortIndexerName, *cfg)

formatCfg, err := common.NewConfigFrom(map[string]interface{}{
"format": "%{[ip]}:%{[port]}",
})
if err == nil {
//Add field matcher with field to lookup as metricset.host
// Add field matcher with field to lookup as metricset.host
kubernetes.Indexing.AddDefaultMatcherConfig(kubernetes.FieldFormatMatcherName, *formatCfg)
}
}
2 changes: 1 addition & 1 deletion packetbeat/procs/procs.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) {
if logp.HasSelector("procsdetailed") {
start := time.Now()
defer func() {
logp.Debug("procsdetailed", "updateMap() took %v", time.Now().Sub(start))
logp.Debug("procsdetailed", "updateMap() took %v", time.Since(start))
}()
}

Expand Down
4 changes: 2 additions & 2 deletions packetbeat/procs/procs_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func createFakeDirectoryStructure(prefix string, files []testProcFile) error {
var err error
for _, file := range files {
dir := filepath.Dir(file.path)
err = os.MkdirAll(filepath.Join(prefix, dir), 0755)
err = os.MkdirAll(filepath.Join(prefix, dir), 0o755)
if err != nil {
return err
}

if !file.isLink {
err = ioutil.WriteFile(filepath.Join(prefix, file.path),
[]byte(file.contents), 0644)
[]byte(file.contents), 0o644)
if err != nil {
return err
}
Expand Down
16 changes: 10 additions & 6 deletions packetbeat/procs/procs_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,17 @@ type extractor interface {
Size() int
}

type callbackFn func(net.IP, uint16, int)
type extractorFactory func(fn callbackFn) extractor
type (
callbackFn func(net.IP, uint16, int)
extractorFactory func(fn callbackFn) extractor
)

type tcpRowOwnerPIDExtractor callbackFn
type tcp6RowOwnerPIDExtractor callbackFn
type udpRowOwnerPIDExtractor callbackFn
type udp6RowOwnerPIDExtractor callbackFn
type (
tcpRowOwnerPIDExtractor callbackFn
tcp6RowOwnerPIDExtractor callbackFn
udpRowOwnerPIDExtractor callbackFn
udp6RowOwnerPIDExtractor callbackFn
)

var tablesByTransport = map[applayer.Transport][]struct {
family uint32
Expand Down
38 changes: 26 additions & 12 deletions packetbeat/procs/procs_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,33 @@ func TestParseTableRaw(t *testing.T) {
expected []portProcMapping
mustErr bool
}{
{"Empty table IPv4", IPv4,
"00000000", nil, false},
{"Empty table IPv6", IPv6,
"00000000", nil, false},
{"Short table (no length)", IPv4,
"000000", nil, true},
{"Short table (partial entry)", IPv6,
"01000000AAAAAAAAAAAAAAAAAAAA", nil, true},
{"One entry (IPv4)", IPv4,
{
"Empty table IPv4", IPv4,
"00000000", nil, false,
},
{
"Empty table IPv6", IPv6,
"00000000", nil, false,
},
{
"Short table (no length)", IPv4,
"000000", nil, true,
},
{
"Short table (partial entry)", IPv6,
"01000000AAAAAAAAAAAAAAAAAAAA", nil, true,
},
{
"One entry (IPv4)", IPv4,
"01000000" +
"77777777AAAAAAAA12340000BBBBBBBBFFFF0000CCCCCCCC",
[]portProcMapping{
{endpoint: endpoint{address: "170.170.170.170", port: 0x1234}, pid: int(pid)},
}, false},
{"Two entries (IPv6)", IPv6,
},
false,
},
{
"Two entries (IPv6)", IPv6,
"02000000" +
// First entry
"11112222333344445555666677778888F0F0F0F0" +
Expand All @@ -76,7 +88,9 @@ func TestParseTableRaw(t *testing.T) {
[]portProcMapping{
{endpoint: endpoint{address: "1111:2222:3333:4444:5555:6666:7777:8888", port: 0xABCD}, pid: 1},
{endpoint: endpoint{address: "aaaa:aaaa:aaaa:aaaa:aaaa:aaaa:aaaa:aaaa", port: 0}, pid: 0xffff},
}, false},
},
false,
},
} {
msg := fmt.Sprintf("Test case #%d: %s", idx+1, testCase.name)
table, err := hex.DecodeString(testCase.raw)
Expand Down
4 changes: 1 addition & 3 deletions packetbeat/procs/zsyscall_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ const (
errnoERROR_IO_PENDING = 997
)

var (
errERROR_IO_PENDING error = syscall.Errno(errnoERROR_IO_PENDING)
)
var errERROR_IO_PENDING error = syscall.Errno(errnoERROR_IO_PENDING)

// errnoErr returns common boxed Errno values, to prevent
// allocations at runtime.
Expand Down
Loading