diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5f191aa5d1b6..0db0bb5b2faf 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -148,6 +148,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] *Packetbeat* - Add option to allow sniffer to change device when default route changes. {issue}31905[31905] {pull}32681[32681] +- Add option to allow sniffing multiple interface devices. {issue}31905[31905] {pull}32933[32933] *Functionbeat* diff --git a/packetbeat/_meta/config/beat.yml.tmpl b/packetbeat/_meta/config/beat.yml.tmpl index bf990dd1e0d0..12a31129944b 100644 --- a/packetbeat/_meta/config/beat.yml.tmpl +++ b/packetbeat/_meta/config/beat.yml.tmpl @@ -12,7 +12,19 @@ # Select the network interface to sniff the data. On Linux, you can use the # "any" keyword to sniff on all connected interfaces. On all platforms, you # can use "default_route", "default_route_ipv4" or "default_route_ipv6" -# to sniff on the device carrying the default route. +# to sniff on the device carrying the default route. If you wish to sniff +# on multiple network interfaces you may specify an array of distinct interfaces +# as a YAML array with each device's configuration specified individually. +# Each device may only appear once in the array of interfaces. +# +# packetbeat.interfaces: +# - device: en0 +# internal_networks: +# - private +# - device: en1 +# internal_networks: +# - private +# packetbeat.interfaces.device: {{ call .device .GOOS }} # Specify the amount of time between polling for changes in the default diff --git a/packetbeat/packetbeat.yml b/packetbeat/packetbeat.yml index ace50f29d3ec..16f19f670fef 100644 --- a/packetbeat/packetbeat.yml +++ b/packetbeat/packetbeat.yml @@ -12,7 +12,19 @@ # Select the network interface to sniff the data. On Linux, you can use the # "any" keyword to sniff on all connected interfaces. On all platforms, you # can use "default_route", "default_route_ipv4" or "default_route_ipv6" -# to sniff on the device carrying the default route. +# to sniff on the device carrying the default route. If you wish to sniff +# on multiple network interfaces you may specify an array of distinct interfaces +# as a YAML array with each device's configuration specified individually. +# Each device may only appear once in the array of interfaces. +# +# packetbeat.interfaces: +# - device: en0 +# internal_networks: +# - private +# - device: en1 +# internal_networks: +# - private +# packetbeat.interfaces.device: any # Specify the amount of time between polling for changes in the default diff --git a/packetbeat/sniffer/sniffer.go b/packetbeat/sniffer/sniffer.go index b95da16a1e78..09903284ba64 100644 --- a/packetbeat/sniffer/sniffer.go +++ b/packetbeat/sniffer/sniffer.go @@ -18,6 +18,7 @@ package sniffer import ( + "context" "encoding/json" "fmt" "io" @@ -30,6 +31,7 @@ import ( "github.com/google/gopacket/layers" "github.com/google/gopacket/pcap" "github.com/google/gopacket/pcapgo" + "golang.org/x/sync/errgroup" "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/elastic-agent-libs/logp" @@ -41,7 +43,11 @@ import ( // Sniffer provides packet sniffing capabilities, forwarding packets read // to a Worker. type Sniffer struct { - config []config.InterfacesConfig + sniffers []sniffer +} + +type sniffer struct { + config config.InterfacesConfig state atomic.Int32 // store snifferState done chan struct{} // done is required to wire state into a select. @@ -77,14 +83,15 @@ const ( // only, but no device is opened yet. Accessing and configuring the actual device // is done by the Run method. func New(testMode bool, _ string, decoders Decoders, interfaces []config.InterfacesConfig) (*Sniffer, error) { - s := &Sniffer{ - config: interfaces, - decoders: decoders, - state: atomic.MakeInt32(snifferInactive), - followDefault: interfaces[0].PollDefaultRoute > 0 && strings.HasPrefix(interfaces[0].Device, "default_route"), - } + s := &Sniffer{sniffers: make([]sniffer, len(interfaces))} + + for i, iface := range interfaces { + child := sniffer{ + state: atomic.MakeInt32(snifferInactive), + followDefault: iface.PollDefaultRoute > 0 && strings.HasPrefix(iface.Device, "default_route"), + decoders: decoders, + } - for i, iface := range s.config { logp.Debug("sniffer", "interface: %d, BPF filter: '%s'", i, iface.BpfFilter) // pre-check and normalize configuration: @@ -99,8 +106,8 @@ func New(testMode bool, _ string, decoders Decoders, interfaces []config.Interfa } // we read file with the pcap provider - s.config[i].Type = "pcap" - s.config[i].Device = "" + iface.Type = "pcap" + iface.Device = "" } else { // try to resolve device name (ignore error if testMode is enabled) if name, err := resolveDeviceName(iface.Device); err != nil { @@ -108,31 +115,33 @@ func New(testMode bool, _ string, decoders Decoders, interfaces []config.Interfa return nil, err } } else { - s.device = name + child.device = name if name == "any" && !deviceAnySupported { return nil, fmt.Errorf("any interface is not supported on %s", runtime.GOOS) } if iface.Snaplen == 0 { - s.config[i].Snaplen = 65535 + iface.Snaplen = 65535 } if iface.BufferSizeMb <= 0 { - s.config[i].BufferSizeMb = 24 + iface.BufferSizeMb = 24 } if t := iface.Type; t == "autodetect" || t == "" { - s.config[i].Type = "pcap" + iface.Type = "pcap" } - logp.Debug("sniffer", "Sniffer type: %s device: %s", iface.Type, s.device) + logp.Debug("sniffer", "Sniffer type: %s device: %s", iface.Type, child.device) } } - iface = s.config[i] err := validateConfig(iface.BpfFilter, &iface) //nolint:gosec // Bad linter! validateConfig completes before the next iteration. if err != nil { cfg, _ := json.Marshal(iface) return nil, fmt.Errorf("validate: %w: %s", err, cfg) } + + child.config = iface + s.sniffers[i] = child } return s, nil @@ -175,20 +184,27 @@ func validateAfPacketConfig(cfg *config.InterfacesConfig) error { // Run opens the sniffing device and processes packets being read from that device. // Worker instances are instantiated as needed. func (s *Sniffer) Run() error { - var ( - defaultRoute chan string - refresh chan struct{} - ) - if s.followDefault { - s.done = make(chan struct{}) - defaultRoute = make(chan string) - refresh = make(chan struct{}, 1) - go s.pollDefaultRoute(defaultRoute, refresh) - } - if defaultRoute == nil { - return s.sniffStatic(s.device) + g, ctx := errgroup.WithContext(context.Background()) + for _, c := range s.sniffers { + c := c + g.Go(func() error { + var ( + defaultRoute chan string + refresh chan struct{} + ) + if c.followDefault { + c.done = make(chan struct{}) + defaultRoute = make(chan string) + refresh = make(chan struct{}, 1) + go c.pollDefaultRoute(ctx, defaultRoute, refresh) + } + if defaultRoute == nil { + return c.sniffStatic(ctx, c.device) + } + return c.sniffDynamic(ctx, defaultRoute, refresh) + }) } - return s.sniffDynamic(defaultRoute, refresh) + return g.Wait() } // pollDefaultRoute repeatedly polls the default route's device at intervals @@ -197,7 +213,7 @@ func (s *Sniffer) Run() error { // Changes in default route will put the Sniffer into the inactive state to // trigger a new sniffer connection. Termination of the sniffer is not under // the control of the poller. -func (s *Sniffer) pollDefaultRoute(device chan<- string, refresh <-chan struct{}) { +func (s *sniffer) pollDefaultRoute(ctx context.Context, device chan<- string, refresh <-chan struct{}) { go func() { logp.Info("starting default route poller") @@ -206,7 +222,7 @@ func (s *Sniffer) pollDefaultRoute(device chan<- string, refresh <-chan struct{} device <- current defaultRouteMetric.Set(current) - tick := time.NewTicker(s.config[0].PollDefaultRoute) + tick := time.NewTicker(s.config.PollDefaultRoute) for { select { case <-tick.C: @@ -215,6 +231,11 @@ func (s *Sniffer) pollDefaultRoute(device chan<- string, refresh <-chan struct{} case <-refresh: logp.Debug("sniffer", "requested new default route") current = s.poll(current, device) + case <-ctx.Done(): + logp.Info("polling cancelled") + close(device) + tick.Stop() + return case <-s.done: logp.Info("closing default route poller") close(device) @@ -235,8 +256,8 @@ func (s *Sniffer) pollDefaultRoute(device chan<- string, refresh <-chan struct{} // poll returns the current default route interface and sends it on device // if it has change from the old default route interface. If device resolution // fails, the default route interface is left unchanged. -func (s *Sniffer) poll(old string, device chan<- string) (current string) { - current, err := resolveDeviceName(s.config[0].Device) +func (s *sniffer) poll(old string, device chan<- string) (current string) { + current, err := resolveDeviceName(s.config.Device) if err != nil { logp.Warn("sniffer failed to poll default route device: %v", err) return old @@ -251,7 +272,7 @@ func (s *Sniffer) poll(old string, device chan<- string) (current string) { } // sniffStatic performs the sniffing work on a single static interface. -func (s *Sniffer) sniffStatic(device string) error { +func (s *sniffer) sniffStatic(ctx context.Context, device string) error { handle, err := s.open(device) if err != nil { return fmt.Errorf("failed to start sniffer: %w", err) @@ -263,20 +284,20 @@ func (s *Sniffer) sniffStatic(device string) error { return err } - return s.sniffHandle(handle, dec, nil) + return s.sniffHandle(ctx, handle, dec, nil) } // sniffDynamic performs sniffing work on a stream of dynamic interfaces from // defaultRoute decoders are retained between successive interfaces if they are // the same link type. -func (s *Sniffer) sniffDynamic(defaultRoute <-chan string, refresh chan<- struct{}) error { +func (s *sniffer) sniffDynamic(ctx context.Context, defaultRoute <-chan string, refresh chan<- struct{}) error { var ( last layers.LinkType dec *decoder.Decoder ) for device := range defaultRoute { var err error - last, dec, err = s.sniffOneDynamic(device, last, dec, refresh) + last, dec, err = s.sniffOneDynamic(ctx, device, last, dec, refresh) if err != nil { return err } @@ -288,7 +309,7 @@ func (s *Sniffer) sniffDynamic(defaultRoute <-chan string, refresh chan<- struct // If the link type associated with the device differs from the last link // type or dec is nil, a new decoder is returned. The link type associated // with the device is returned. -func (s *Sniffer) sniffOneDynamic(device string, last layers.LinkType, dec *decoder.Decoder, refresh chan<- struct{}) (layers.LinkType, *decoder.Decoder, error) { +func (s *sniffer) sniffOneDynamic(ctx context.Context, device string, last layers.LinkType, dec *decoder.Decoder, refresh chan<- struct{}) (layers.LinkType, *decoder.Decoder, error) { handle, err := s.open(device) if err != nil { return last, dec, fmt.Errorf("failed to start sniffer: %w", err) @@ -304,16 +325,16 @@ func (s *Sniffer) sniffOneDynamic(device string, last layers.LinkType, dec *deco } } - err = s.sniffHandle(handle, dec, refresh) + err = s.sniffHandle(ctx, handle, dec, refresh) return linkType, dec, err } // sniff performs the sniffing work and writing dump files if requested. -func (s *Sniffer) sniffHandle(handle snifferHandle, dec *decoder.Decoder, refresh chan<- struct{}) error { +func (s *sniffer) sniffHandle(ctx context.Context, handle snifferHandle, dec *decoder.Decoder, refresh chan<- struct{}) error { var w *pcapgo.Writer - if s.config[0].Dumpfile != "" { + if s.config.Dumpfile != "" { const timeSuffixFormat = "20060102150405" - filename := fmt.Sprintf("%s-%s.pcap", s.config[0].Dumpfile, time.Now().Format(timeSuffixFormat)) + filename := fmt.Sprintf("%s-%s.pcap", s.config.Dumpfile, time.Now().Format(timeSuffixFormat)) logp.Info("creating new dump file %s", filename) f, err := os.Create(filename) if err != nil { @@ -324,7 +345,7 @@ func (s *Sniffer) sniffHandle(handle snifferHandle, dec *decoder.Decoder, refres w = pcapgo.NewWriterNanos(f) err = w.WriteFileHeader(65535, handle.LinkType()) if err != nil { - return fmt.Errorf("failed to write dump file header to %s: %w", s.config[0].Dumpfile, err) + return fmt.Errorf("failed to write dump file header to %s: %w", s.config.Dumpfile, err) } } @@ -341,7 +362,18 @@ func (s *Sniffer) sniffHandle(handle snifferHandle, dec *decoder.Decoder, refres timeouts int ) for s.state.Load() == snifferActive { - if s.config[0].OneAtATime { + select { + case <-ctx.Done(): + logp.Info("sniffing cancelled: %q", s.config.Device) + + // Return nil since this must have been due to an errgroup + // termination and any error that caused that will already + // have been captured by the errgroup. + return nil + default: + } + + if s.config.OneAtATime { fmt.Fprintln(os.Stdout, "Press enter to read packet") fmt.Scanln() } @@ -368,7 +400,7 @@ func (s *Sniffer) sniffHandle(handle snifferHandle, dec *decoder.Decoder, refres if err != nil { // ignore EOF, if sniffer was driven from file - if err == io.EOF && s.config[0].File != "" { //nolint:errorlint // io.EOF should never be wrapped. + if err == io.EOF && s.config.File != "" { //nolint:errorlint // io.EOF should never be wrapped. return nil } @@ -409,27 +441,29 @@ func (s *Sniffer) sniffHandle(handle snifferHandle, dec *decoder.Decoder, refres return nil } -func (s *Sniffer) open(device string) (snifferHandle, error) { - if s.config[0].File != "" { - return newFileHandler(s.config[0].File, s.config[0].TopSpeed, s.config[0].Loop) +func (s *sniffer) open(device string) (snifferHandle, error) { + if s.config.File != "" { + return newFileHandler(s.config.File, s.config.TopSpeed, s.config.Loop) } - switch s.config[0].Type { + switch s.config.Type { case "pcap": - return openPcap(device, s.filter, &s.config[0]) + return openPcap(device, s.filter, &s.config) case "af_packet": - return openAFPacket(device, s.filter, &s.config[0]) + return openAFPacket(device, s.filter, &s.config) default: - return nil, fmt.Errorf("unknown sniffer type for %s: %q", device, s.config[0].Type) + return nil, fmt.Errorf("unknown sniffer type for %s: %q", device, s.config.Type) } } // Stop marks a sniffer as stopped. The Run method will return once the stop // signal has been given. func (s *Sniffer) Stop() { - s.state.Store(snifferClosing) - if s.done != nil { - close(s.done) + for _, c := range s.sniffers { + c.state.Store(snifferClosing) + if c.done != nil { + close(c.done) + } } } diff --git a/x-pack/packetbeat/packetbeat.yml b/x-pack/packetbeat/packetbeat.yml index ace50f29d3ec..16f19f670fef 100644 --- a/x-pack/packetbeat/packetbeat.yml +++ b/x-pack/packetbeat/packetbeat.yml @@ -12,7 +12,19 @@ # Select the network interface to sniff the data. On Linux, you can use the # "any" keyword to sniff on all connected interfaces. On all platforms, you # can use "default_route", "default_route_ipv4" or "default_route_ipv6" -# to sniff on the device carrying the default route. +# to sniff on the device carrying the default route. If you wish to sniff +# on multiple network interfaces you may specify an array of distinct interfaces +# as a YAML array with each device's configuration specified individually. +# Each device may only appear once in the array of interfaces. +# +# packetbeat.interfaces: +# - device: en0 +# internal_networks: +# - private +# - device: en1 +# internal_networks: +# - private +# packetbeat.interfaces.device: any # Specify the amount of time between polling for changes in the default