Skip to content

Commit

Permalink
packetbeat/sniffer: allow multiple interface devices to be followed (#…
Browse files Browse the repository at this point in the history
…32933)

This runs multiple sniffers for each Sniffer, each handling default route
polling as needed independently. All sniffers terminate if any individual
terminates.
  • Loading branch information
efd6 authored and chrisberkhout committed Jun 1, 2023
1 parent bcf7eca commit 59211c1
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
*Packetbeat*

- Add option to allow sniffer to change device when default route changes. {issue}31905[31905] {pull}32681[32681]
- Add option to allow sniffing multiple interface devices. {issue}31905[31905] {pull}32933[32933]

*Functionbeat*

Expand Down
14 changes: 13 additions & 1 deletion packetbeat/_meta/config/beat.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,19 @@
# Select the network interface to sniff the data. On Linux, you can use the
# "any" keyword to sniff on all connected interfaces. On all platforms, you
# can use "default_route", "default_route_ipv4" or "default_route_ipv6"
# to sniff on the device carrying the default route.
# to sniff on the device carrying the default route. If you wish to sniff
# on multiple network interfaces you may specify an array of distinct interfaces
# as a YAML array with each device's configuration specified individually.
# Each device may only appear once in the array of interfaces.
#
# packetbeat.interfaces:
# - device: en0
# internal_networks:
# - private
# - device: en1
# internal_networks:
# - private
#
packetbeat.interfaces.device: {{ call .device .GOOS }}

# Specify the amount of time between polling for changes in the default
Expand Down
14 changes: 13 additions & 1 deletion packetbeat/packetbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,19 @@
# Select the network interface to sniff the data. On Linux, you can use the
# "any" keyword to sniff on all connected interfaces. On all platforms, you
# can use "default_route", "default_route_ipv4" or "default_route_ipv6"
# to sniff on the device carrying the default route.
# to sniff on the device carrying the default route. If you wish to sniff
# on multiple network interfaces you may specify an array of distinct interfaces
# as a YAML array with each device's configuration specified individually.
# Each device may only appear once in the array of interfaces.
#
# packetbeat.interfaces:
# - device: en0
# internal_networks:
# - private
# - device: en1
# internal_networks:
# - private
#
packetbeat.interfaces.device: any

# Specify the amount of time between polling for changes in the default
Expand Down
144 changes: 89 additions & 55 deletions packetbeat/sniffer/sniffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package sniffer

import (
"context"
"encoding/json"
"fmt"
"io"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/pcapgo"
"golang.org/x/sync/errgroup"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/elastic-agent-libs/logp"
Expand All @@ -41,7 +43,11 @@ import (
// Sniffer provides packet sniffing capabilities, forwarding packets read
// to a Worker.
type Sniffer struct {
config []config.InterfacesConfig
sniffers []sniffer
}

type sniffer struct {
config config.InterfacesConfig

state atomic.Int32 // store snifferState
done chan struct{} // done is required to wire state into a select.
Expand Down Expand Up @@ -77,14 +83,15 @@ const (
// only, but no device is opened yet. Accessing and configuring the actual device
// is done by the Run method.
func New(testMode bool, _ string, decoders Decoders, interfaces []config.InterfacesConfig) (*Sniffer, error) {
s := &Sniffer{
config: interfaces,
decoders: decoders,
state: atomic.MakeInt32(snifferInactive),
followDefault: interfaces[0].PollDefaultRoute > 0 && strings.HasPrefix(interfaces[0].Device, "default_route"),
}
s := &Sniffer{sniffers: make([]sniffer, len(interfaces))}

for i, iface := range interfaces {
child := sniffer{
state: atomic.MakeInt32(snifferInactive),
followDefault: iface.PollDefaultRoute > 0 && strings.HasPrefix(iface.Device, "default_route"),
decoders: decoders,
}

for i, iface := range s.config {
logp.Debug("sniffer", "interface: %d, BPF filter: '%s'", i, iface.BpfFilter)

// pre-check and normalize configuration:
Expand All @@ -99,40 +106,42 @@ func New(testMode bool, _ string, decoders Decoders, interfaces []config.Interfa
}

// we read file with the pcap provider
s.config[i].Type = "pcap"
s.config[i].Device = ""
iface.Type = "pcap"
iface.Device = ""
} else {
// try to resolve device name (ignore error if testMode is enabled)
if name, err := resolveDeviceName(iface.Device); err != nil {
if !testMode {
return nil, err
}
} else {
s.device = name
child.device = name
if name == "any" && !deviceAnySupported {
return nil, fmt.Errorf("any interface is not supported on %s", runtime.GOOS)
}

if iface.Snaplen == 0 {
s.config[i].Snaplen = 65535
iface.Snaplen = 65535
}
if iface.BufferSizeMb <= 0 {
s.config[i].BufferSizeMb = 24
iface.BufferSizeMb = 24
}

if t := iface.Type; t == "autodetect" || t == "" {
s.config[i].Type = "pcap"
iface.Type = "pcap"
}
logp.Debug("sniffer", "Sniffer type: %s device: %s", iface.Type, s.device)
logp.Debug("sniffer", "Sniffer type: %s device: %s", iface.Type, child.device)
}
}
iface = s.config[i]

err := validateConfig(iface.BpfFilter, &iface) //nolint:gosec // Bad linter! validateConfig completes before the next iteration.
if err != nil {
cfg, _ := json.Marshal(iface)
return nil, fmt.Errorf("validate: %w: %s", err, cfg)
}

child.config = iface
s.sniffers[i] = child
}

return s, nil
Expand Down Expand Up @@ -175,20 +184,27 @@ func validateAfPacketConfig(cfg *config.InterfacesConfig) error {
// Run opens the sniffing device and processes packets being read from that device.
// Worker instances are instantiated as needed.
func (s *Sniffer) Run() error {
var (
defaultRoute chan string
refresh chan struct{}
)
if s.followDefault {
s.done = make(chan struct{})
defaultRoute = make(chan string)
refresh = make(chan struct{}, 1)
go s.pollDefaultRoute(defaultRoute, refresh)
}
if defaultRoute == nil {
return s.sniffStatic(s.device)
g, ctx := errgroup.WithContext(context.Background())
for _, c := range s.sniffers {
c := c
g.Go(func() error {
var (
defaultRoute chan string
refresh chan struct{}
)
if c.followDefault {
c.done = make(chan struct{})
defaultRoute = make(chan string)
refresh = make(chan struct{}, 1)
go c.pollDefaultRoute(ctx, defaultRoute, refresh)
}
if defaultRoute == nil {
return c.sniffStatic(ctx, c.device)
}
return c.sniffDynamic(ctx, defaultRoute, refresh)
})
}
return s.sniffDynamic(defaultRoute, refresh)
return g.Wait()
}

// pollDefaultRoute repeatedly polls the default route's device at intervals
Expand All @@ -197,7 +213,7 @@ func (s *Sniffer) Run() error {
// Changes in default route will put the Sniffer into the inactive state to
// trigger a new sniffer connection. Termination of the sniffer is not under
// the control of the poller.
func (s *Sniffer) pollDefaultRoute(device chan<- string, refresh <-chan struct{}) {
func (s *sniffer) pollDefaultRoute(ctx context.Context, device chan<- string, refresh <-chan struct{}) {
go func() {
logp.Info("starting default route poller")

Expand All @@ -206,7 +222,7 @@ func (s *Sniffer) pollDefaultRoute(device chan<- string, refresh <-chan struct{}
device <- current
defaultRouteMetric.Set(current)

tick := time.NewTicker(s.config[0].PollDefaultRoute)
tick := time.NewTicker(s.config.PollDefaultRoute)
for {
select {
case <-tick.C:
Expand All @@ -215,6 +231,11 @@ func (s *Sniffer) pollDefaultRoute(device chan<- string, refresh <-chan struct{}
case <-refresh:
logp.Debug("sniffer", "requested new default route")
current = s.poll(current, device)
case <-ctx.Done():
logp.Info("polling cancelled")
close(device)
tick.Stop()
return
case <-s.done:
logp.Info("closing default route poller")
close(device)
Expand All @@ -235,8 +256,8 @@ func (s *Sniffer) pollDefaultRoute(device chan<- string, refresh <-chan struct{}
// poll returns the current default route interface and sends it on device
// if it has change from the old default route interface. If device resolution
// fails, the default route interface is left unchanged.
func (s *Sniffer) poll(old string, device chan<- string) (current string) {
current, err := resolveDeviceName(s.config[0].Device)
func (s *sniffer) poll(old string, device chan<- string) (current string) {
current, err := resolveDeviceName(s.config.Device)
if err != nil {
logp.Warn("sniffer failed to poll default route device: %v", err)
return old
Expand All @@ -251,7 +272,7 @@ func (s *Sniffer) poll(old string, device chan<- string) (current string) {
}

// sniffStatic performs the sniffing work on a single static interface.
func (s *Sniffer) sniffStatic(device string) error {
func (s *sniffer) sniffStatic(ctx context.Context, device string) error {
handle, err := s.open(device)
if err != nil {
return fmt.Errorf("failed to start sniffer: %w", err)
Expand All @@ -263,20 +284,20 @@ func (s *Sniffer) sniffStatic(device string) error {
return err
}

return s.sniffHandle(handle, dec, nil)
return s.sniffHandle(ctx, handle, dec, nil)
}

// sniffDynamic performs sniffing work on a stream of dynamic interfaces from
// defaultRoute decoders are retained between successive interfaces if they are
// the same link type.
func (s *Sniffer) sniffDynamic(defaultRoute <-chan string, refresh chan<- struct{}) error {
func (s *sniffer) sniffDynamic(ctx context.Context, defaultRoute <-chan string, refresh chan<- struct{}) error {
var (
last layers.LinkType
dec *decoder.Decoder
)
for device := range defaultRoute {
var err error
last, dec, err = s.sniffOneDynamic(device, last, dec, refresh)
last, dec, err = s.sniffOneDynamic(ctx, device, last, dec, refresh)
if err != nil {
return err
}
Expand All @@ -288,7 +309,7 @@ func (s *Sniffer) sniffDynamic(defaultRoute <-chan string, refresh chan<- struct
// If the link type associated with the device differs from the last link
// type or dec is nil, a new decoder is returned. The link type associated
// with the device is returned.
func (s *Sniffer) sniffOneDynamic(device string, last layers.LinkType, dec *decoder.Decoder, refresh chan<- struct{}) (layers.LinkType, *decoder.Decoder, error) {
func (s *sniffer) sniffOneDynamic(ctx context.Context, device string, last layers.LinkType, dec *decoder.Decoder, refresh chan<- struct{}) (layers.LinkType, *decoder.Decoder, error) {
handle, err := s.open(device)
if err != nil {
return last, dec, fmt.Errorf("failed to start sniffer: %w", err)
Expand All @@ -304,16 +325,16 @@ func (s *Sniffer) sniffOneDynamic(device string, last layers.LinkType, dec *deco
}
}

err = s.sniffHandle(handle, dec, refresh)
err = s.sniffHandle(ctx, handle, dec, refresh)
return linkType, dec, err
}

// sniff performs the sniffing work and writing dump files if requested.
func (s *Sniffer) sniffHandle(handle snifferHandle, dec *decoder.Decoder, refresh chan<- struct{}) error {
func (s *sniffer) sniffHandle(ctx context.Context, handle snifferHandle, dec *decoder.Decoder, refresh chan<- struct{}) error {
var w *pcapgo.Writer
if s.config[0].Dumpfile != "" {
if s.config.Dumpfile != "" {
const timeSuffixFormat = "20060102150405"
filename := fmt.Sprintf("%s-%s.pcap", s.config[0].Dumpfile, time.Now().Format(timeSuffixFormat))
filename := fmt.Sprintf("%s-%s.pcap", s.config.Dumpfile, time.Now().Format(timeSuffixFormat))
logp.Info("creating new dump file %s", filename)
f, err := os.Create(filename)
if err != nil {
Expand All @@ -324,7 +345,7 @@ func (s *Sniffer) sniffHandle(handle snifferHandle, dec *decoder.Decoder, refres
w = pcapgo.NewWriterNanos(f)
err = w.WriteFileHeader(65535, handle.LinkType())
if err != nil {
return fmt.Errorf("failed to write dump file header to %s: %w", s.config[0].Dumpfile, err)
return fmt.Errorf("failed to write dump file header to %s: %w", s.config.Dumpfile, err)
}
}

Expand All @@ -341,7 +362,18 @@ func (s *Sniffer) sniffHandle(handle snifferHandle, dec *decoder.Decoder, refres
timeouts int
)
for s.state.Load() == snifferActive {
if s.config[0].OneAtATime {
select {
case <-ctx.Done():
logp.Info("sniffing cancelled: %q", s.config.Device)

// Return nil since this must have been due to an errgroup
// termination and any error that caused that will already
// have been captured by the errgroup.
return nil
default:
}

if s.config.OneAtATime {
fmt.Fprintln(os.Stdout, "Press enter to read packet")
fmt.Scanln()
}
Expand All @@ -368,7 +400,7 @@ func (s *Sniffer) sniffHandle(handle snifferHandle, dec *decoder.Decoder, refres

if err != nil {
// ignore EOF, if sniffer was driven from file
if err == io.EOF && s.config[0].File != "" { //nolint:errorlint // io.EOF should never be wrapped.
if err == io.EOF && s.config.File != "" { //nolint:errorlint // io.EOF should never be wrapped.
return nil
}

Expand Down Expand Up @@ -409,27 +441,29 @@ func (s *Sniffer) sniffHandle(handle snifferHandle, dec *decoder.Decoder, refres
return nil
}

func (s *Sniffer) open(device string) (snifferHandle, error) {
if s.config[0].File != "" {
return newFileHandler(s.config[0].File, s.config[0].TopSpeed, s.config[0].Loop)
func (s *sniffer) open(device string) (snifferHandle, error) {
if s.config.File != "" {
return newFileHandler(s.config.File, s.config.TopSpeed, s.config.Loop)
}

switch s.config[0].Type {
switch s.config.Type {
case "pcap":
return openPcap(device, s.filter, &s.config[0])
return openPcap(device, s.filter, &s.config)
case "af_packet":
return openAFPacket(device, s.filter, &s.config[0])
return openAFPacket(device, s.filter, &s.config)
default:
return nil, fmt.Errorf("unknown sniffer type for %s: %q", device, s.config[0].Type)
return nil, fmt.Errorf("unknown sniffer type for %s: %q", device, s.config.Type)
}
}

// Stop marks a sniffer as stopped. The Run method will return once the stop
// signal has been given.
func (s *Sniffer) Stop() {
s.state.Store(snifferClosing)
if s.done != nil {
close(s.done)
for _, c := range s.sniffers {
c.state.Store(snifferClosing)
if c.done != nil {
close(c.done)
}
}
}

Expand Down
14 changes: 13 additions & 1 deletion x-pack/packetbeat/packetbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,19 @@
# Select the network interface to sniff the data. On Linux, you can use the
# "any" keyword to sniff on all connected interfaces. On all platforms, you
# can use "default_route", "default_route_ipv4" or "default_route_ipv6"
# to sniff on the device carrying the default route.
# to sniff on the device carrying the default route. If you wish to sniff
# on multiple network interfaces you may specify an array of distinct interfaces
# as a YAML array with each device's configuration specified individually.
# Each device may only appear once in the array of interfaces.
#
# packetbeat.interfaces:
# - device: en0
# internal_networks:
# - private
# - device: en1
# internal_networks:
# - private
#
packetbeat.interfaces.device: any

# Specify the amount of time between polling for changes in the default
Expand Down

0 comments on commit 59211c1

Please sign in to comment.