Skip to content

Commit

Permalink
packetbeat/config: allow multiple interfaces to be specified in the c…
Browse files Browse the repository at this point in the history
…onfig (#32732)

Currently only the first interface is used, retaining the existing behaviour. If
command line arguments are provided the existing behaviour is kept with the first
interface having options from the command line replacing being placed in that
config as defaults, replaced by the options in the config file. Duplicated
interface device declarations are disallow in the configuration.
  • Loading branch information
efd6 authored and chrisberkhout committed Jun 1, 2023
1 parent a8546a4 commit bcf7eca
Show file tree
Hide file tree
Showing 7 changed files with 691 additions and 112 deletions.
8 changes: 5 additions & 3 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,17 @@ var cmdLineArgs = flags{
}

func initialConfig() config.Config {
return config.Config{
Interfaces: config.InterfacesConfig{
c := config.Config{
Interfaces: []config.InterfacesConfig{{
File: *cmdLineArgs.file,
Loop: *cmdLineArgs.loop,
TopSpeed: *cmdLineArgs.topSpeed,
OneAtATime: *cmdLineArgs.oneAtAtime,
Dumpfile: *cmdLineArgs.dumpfile,
},
}},
}
c.Interface = &c.Interfaces[0]
return c
}

// Beater object. Contains all objects needed to run the beat
Expand Down
16 changes: 9 additions & 7 deletions packetbeat/beater/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,16 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)
p.beat.Info.Name,
p.beat.Publisher,
config.IgnoreOutgoing,
config.Interfaces.File == "",
config.Interfaces.InternalNetworks,
config.Interfaces[0].File == "",
config.Interfaces[0].InternalNetworks,
)
if err != nil {
return nil, err
}

watcher := procs.ProcessesWatcher{}
// Enable the process watcher only if capturing live traffic
if config.Interfaces.File == "" {
if config.Interfaces[0].File == "" {
err = watcher.Init(config.Procs)
if err != nil {
logp.Critical(err.Error())
Expand Down Expand Up @@ -195,12 +195,14 @@ func setupSniffer(cfg config.Config, protocols *protos.ProtocolsStruct, decoders
return nil, err
}

filter := cfg.Interfaces.BpfFilter
if filter == "" && !cfg.Flows.IsEnabled() {
filter = protocols.BpfFilter(cfg.Interfaces.WithVlans, icmp.Enabled())
for i, iface := range cfg.Interfaces {
if iface.BpfFilter != "" || cfg.Flows.IsEnabled() {
continue
}
cfg.Interfaces[i].BpfFilter = protocols.BpfFilter(iface.WithVlans, icmp.Enabled())
}

return sniffer.New(false, filter, decoders, cfg.Interfaces)
return sniffer.New(false, "", decoders, cfg.Interfaces)
}

// CheckConfig performs a dry-run creation of a Packetbeat pipeline based
Expand Down
21 changes: 13 additions & 8 deletions packetbeat/config/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,10 @@ func mergeProcsConfig(one, two procs.ProcsConfig) procs.ProcsConfig {
// agent semantics
func NewAgentConfig(cfg *conf.C) (Config, error) {
logp.Debug("agent", "Normalizing agent configuration")
var input agentInput
config := Config{
Interfaces: InterfacesConfig{
// TODO: make this configurable rather than just using the default device
Device: defaultDevice(),
},
}
var (
input agentInput
config Config
)
if err := cfg.Unpack(&input); err != nil {
return config, err
}
Expand All @@ -132,9 +129,11 @@ func NewAgentConfig(cfg *conf.C) (Config, error) {
if err != nil {
return config, err
}
if err := cfg.Unpack(&config.Interfaces); err != nil {
var iface InterfacesConfig
if err := cfg.Unpack(&iface); err != nil {
return config, err
}
config.Interfaces = append(config.Interfaces, iface)
}

if procsOverride, ok := stream["procs"]; ok {
Expand Down Expand Up @@ -173,5 +172,11 @@ func NewAgentConfig(cfg *conf.C) (Config, error) {
}
}
}
if len(config.Interfaces) == 0 {
config.Interfaces = []InterfacesConfig{
// TODO: Make this configurable rather than just using the default device.
{Device: defaultDevice()},
}
}
return config, nil
}
24 changes: 20 additions & 4 deletions packetbeat/config/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ streams:
period: 10s
keep_null: false
interface:
device: thisisignoredfornow
device: default_route
snaplen: 1514
type: af_packet
buffer_size_mb: 100
Expand All @@ -69,18 +69,34 @@ streams:
data_stream:
dataset: packet.icmp
type: logs
- type: http
interface:
device: en2
snaplen: 1514
type: af_packet
buffer_size_mb: 100
procs:
enabled: true
monitored:
- process: curl
cmdline_grep: curl
data_stream:
dataset: packet.http
type: logs
`)
require.NoError(t, err)
config, err := NewAgentConfig(cfg)
require.NoError(t, err)

require.Equal(t, config.Flows.Timeout, "10s")
require.Equal(t, config.Flows.Index, "logs-packet.flow-default")
require.Len(t, config.ProtocolsList, 1)
require.Len(t, config.ProtocolsList, 2)

var protocol map[string]interface{}
require.NoError(t, config.ProtocolsList[0].Unpack(&protocol))
require.Len(t, protocol["processors"].([]interface{}), 3)
require.Equal(t, config.Interfaces.Device, "en1")
require.Len(t, config.Procs.Monitored, 2)
require.Equal(t, "default_route", config.Interfaces[0].Device)
require.Equal(t, "en1", config.Interfaces[1].Device)
require.Equal(t, "en2", config.Interfaces[2].Device)
require.Len(t, config.Procs.Monitored, 3)
}
41 changes: 38 additions & 3 deletions packetbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package config

import (
"errors"
"fmt"
"runtime"
"strings"
"time"

"github.com/elastic/beats/v7/libbeat/processors"
Expand All @@ -28,7 +31,8 @@ import (
)

type Config struct {
Interfaces InterfacesConfig `config:"interfaces"`
Interface *InterfacesConfig `config:"interfaces"`
Interfaces []InterfacesConfig `config:"interfaces"`
Flows *Flows `config:"flows"`
Protocols map[string]*conf.C `config:"protocols"`
ProtocolsList []*conf.C `config:"protocols"`
Expand All @@ -43,8 +47,39 @@ func (c Config) FromStatic(cfg *conf.C) (Config, error) {
if err != nil {
return c, err
}
if 0 < c.Interfaces.PollDefaultRoute && c.Interfaces.PollDefaultRoute < time.Second {
c.Interfaces.PollDefaultRoute = time.Second
iface, err := cfg.Child("interfaces", -1)
if err == nil {
if !iface.IsArray() {
c.Interfaces = []InterfacesConfig{*c.Interface}
}
}
c.Interface = nil
counts := make(map[string]int)
for i, iface := range c.Interfaces {
name := iface.Device
if name == "" {
if runtime.GOOS == "linux" {
name = "any"
} else {
name = "default_route"
}
}
counts[name]++
if 0 < c.Interfaces[i].PollDefaultRoute && c.Interfaces[i].PollDefaultRoute < time.Second {
c.Interfaces[i].PollDefaultRoute = time.Second
}
}
for n, c := range counts {
if c == 1 {
delete(counts, n)
}
}
if len(counts) != 0 {
dups := make([]string, 0, len(counts))
for n := range counts {
dups = append(dups, n)
}
return c, fmt.Errorf("duplicated device configurations: %s", strings.Join(dups, ", "))
}
return c, nil
}
Expand Down
Loading

0 comments on commit bcf7eca

Please sign in to comment.