diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 16336195457c..ec492d483689 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -143,6 +143,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] *Packetbeat* +- Add option to allow sniffer to change device when default route changes. {issue}31905[31905] {pull}32681[32681] *Functionbeat* diff --git a/packetbeat/_meta/config/beat.yml.tmpl b/packetbeat/_meta/config/beat.yml.tmpl index 22d58338f2b7..bf990dd1e0d0 100644 --- a/packetbeat/_meta/config/beat.yml.tmpl +++ b/packetbeat/_meta/config/beat.yml.tmpl @@ -15,6 +15,11 @@ # to sniff on the device carrying the default route. packetbeat.interfaces.device: {{ call .device .GOOS }} +# Specify the amount of time between polling for changes in the default +# route. This option is only used when one of the default route devices +# is specified. +packetbeat.interfaces.poll_default_route: 1m + # The network CIDR blocks that are considered "internal" networks for # the purpose of network perimeter boundary classification. The valid # values for internal_networks are the same as those that can be used diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index a0dde1e28a68..3f040036f743 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -58,7 +58,7 @@ var cmdLineArgs = flags{ loop: flag.Int("l", 1, "Loop file. 0 - loop forever"), oneAtAtime: flag.Bool("O", false, "Read packets one at a time (press Enter)"), topSpeed: flag.Bool("t", false, "Read packets as fast as possible, without sleeping"), - dumpfile: flag.String("dump", "", "Write all captured packets to this libpcap file"), + dumpfile: flag.String("dump", "", "Write all captured packets to libpcap files with this prefix - a timestamp and pcap extension are added"), } func initialConfig() config.Config { diff --git a/packetbeat/config/config.go b/packetbeat/config/config.go index cb6a2e4add73..a53fe8f3b281 100644 --- a/packetbeat/config/config.go +++ b/packetbeat/config/config.go @@ -43,6 +43,9 @@ func (c Config) FromStatic(cfg *conf.C) (Config, error) { if err != nil { return c, err } + if 0 < c.Interfaces.PollDefaultRoute && c.Interfaces.PollDefaultRoute < time.Second { + c.Interfaces.PollDefaultRoute = time.Second + } return c, nil } @@ -76,17 +79,18 @@ func (c Config) ICMP() (*conf.C, error) { } type InterfacesConfig struct { - Device string `config:"device"` - Type string `config:"type"` - File string `config:"file"` - WithVlans bool `config:"with_vlans"` - BpfFilter string `config:"bpf_filter"` - Snaplen int `config:"snaplen"` - BufferSizeMb int `config:"buffer_size_mb"` - EnableAutoPromiscMode bool `config:"auto_promisc_mode"` - InternalNetworks []string `config:"internal_networks"` + Device string `config:"device"` + PollDefaultRoute time.Duration `config:"poll_default_route"` + Type string `config:"type"` + File string `config:"file"` + WithVlans bool `config:"with_vlans"` + BpfFilter string `config:"bpf_filter"` + Snaplen int `config:"snaplen"` + BufferSizeMb int `config:"buffer_size_mb"` + EnableAutoPromiscMode bool `config:"auto_promisc_mode"` + InternalNetworks []string `config:"internal_networks"` TopSpeed bool - Dumpfile string + Dumpfile string // Dumpfile is the basename of pcap dumpfiles. The file names will have a creation time stamp and .pcap extension appended. OneAtATime bool Loop int } diff --git a/packetbeat/packetbeat.yml b/packetbeat/packetbeat.yml index 565d31e17c45..ace50f29d3ec 100644 --- a/packetbeat/packetbeat.yml +++ b/packetbeat/packetbeat.yml @@ -15,6 +15,11 @@ # to sniff on the device carrying the default route. packetbeat.interfaces.device: any +# Specify the amount of time between polling for changes in the default +# route. This option is only used when one of the default route devices +# is specified. +packetbeat.interfaces.poll_default_route: 1m + # The network CIDR blocks that are considered "internal" networks for # the purpose of network perimeter boundary classification. The valid # values for internal_networks are the same as those that can be used diff --git a/packetbeat/sniffer/device.go b/packetbeat/sniffer/device.go index 72f733955cf7..4ee0f08e3a42 100644 --- a/packetbeat/sniffer/device.go +++ b/packetbeat/sniffer/device.go @@ -22,12 +22,14 @@ import ( "runtime" "strconv" "strings" + "sync" "syscall" "github.com/google/gopacket/pcap" "github.com/elastic/beats/v7/packetbeat/route" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/monitoring" ) var deviceAnySupported = runtime.GOOS == "linux" @@ -93,6 +95,7 @@ func resolveDeviceName(name string) (string, error) { iface string err error ) + registerDefaultRouteMetricOnce() switch name { case "default_route": for _, inet := range []int{syscall.AF_INET, syscall.AF_INET6} { @@ -114,6 +117,7 @@ func resolveDeviceName(name string) (string, error) { if err != nil { return "", fmt.Errorf("failed to get default route device: %w", err) } + defaultRouteMetric.Set(iface) devices, err := ListDeviceNames(false, false) if err != nil { @@ -148,6 +152,17 @@ func resolveDeviceName(name string) (string, error) { return name, nil } +var ( + registerRoute sync.Once + defaultRouteMetric *monitoring.String +) + +func registerDefaultRouteMetricOnce() { + registerRoute.Do(func() { + defaultRouteMetric = monitoring.NewString(nil, "packetbeat.default_route") + }) +} + func sameDevice(route, pcap string) bool { if runtime.GOOS == "windows" { // The device returned by route does not have the same device tree diff --git a/packetbeat/sniffer/sniffer.go b/packetbeat/sniffer/sniffer.go index d1c4e29b0189..b7349733e5ef 100644 --- a/packetbeat/sniffer/sniffer.go +++ b/packetbeat/sniffer/sniffer.go @@ -22,6 +22,7 @@ import ( "io" "os" "runtime" + "strings" "time" "github.com/google/gopacket" @@ -33,6 +34,7 @@ import ( "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/beats/v7/packetbeat/config" + "github.com/elastic/beats/v7/packetbeat/decoder" ) // Sniffer provides packet sniffing capabilities, forwarding packets read @@ -40,7 +42,16 @@ import ( type Sniffer struct { config config.InterfacesConfig - state atomic.Int32 // store snifferState + state atomic.Int32 // store snifferState + done chan struct{} // done is required to wire state into a select. + + // device is the first active device after calling New. + // It is not updated by default route polling. + device string + + // followDefault indicates that the sniffer has + // been configured to follow the default route. + followDefault bool // filter is the bpf filter program used by the sniffer. filter string @@ -66,10 +77,11 @@ const ( // is done by the Run method. func New(testMode bool, filter string, decoders Decoders, interfaces config.InterfacesConfig) (*Sniffer, error) { s := &Sniffer{ - filter: filter, - config: interfaces, - decoders: decoders, - state: atomic.MakeInt32(snifferInactive), + filter: filter, + config: interfaces, + decoders: decoders, + state: atomic.MakeInt32(snifferInactive), + followDefault: interfaces.PollDefaultRoute > 0 && strings.HasPrefix(interfaces.Device, "default_route"), } logp.Debug("sniffer", "BPF filter: '%s'", filter) @@ -95,7 +107,7 @@ func New(testMode bool, filter string, decoders Decoders, interfaces config.Inte return nil, err } } else { - s.config.Device = name + s.device = name if name == "any" && !deviceAnySupported { return nil, fmt.Errorf("any interface is not supported on %s", runtime.GOOS) } @@ -110,7 +122,7 @@ func New(testMode bool, filter string, decoders Decoders, interfaces config.Inte if t := s.config.Type; t == "autodetect" || t == "" { s.config.Type = "pcap" } - logp.Debug("sniffer", "Sniffer type: %s device: %s", s.config.Type, s.config.Device) + logp.Debug("sniffer", "Sniffer type: %s device: %s", s.config.Type, s.device) } } @@ -125,15 +137,147 @@ func New(testMode bool, filter string, decoders Decoders, interfaces config.Inte // Run opens the sniffing device and processes packets being read from that device. // Worker instances are instantiated as needed. func (s *Sniffer) Run() error { - handle, err := s.open() + var ( + defaultRoute chan string + refresh chan struct{} + ) + if s.followDefault { + s.done = make(chan struct{}) + defaultRoute = make(chan string) + refresh = make(chan struct{}, 1) + go s.pollDefaultRoute(defaultRoute, refresh) + } + if defaultRoute == nil { + return s.sniffStatic(s.device) + } + return s.sniffDynamic(defaultRoute, refresh) +} + +// pollDefaultRoute repeatedly polls the default route's device at intervals +// specified in config.PollDefaultRoute. The poller is terminated by closing +// done and the device chan can be read for changes in the default route. +// Changes in default route will put the Sniffer into the inactive state to +// trigger a new sniffer connection. Termination of the sniffer is not under +// the control of the poller. +func (s *Sniffer) pollDefaultRoute(device chan<- string, refresh <-chan struct{}) { + go func() { + logp.Info("starting default route poller") + + // Prime the channel. + current := s.device + device <- current + defaultRouteMetric.Set(current) + + tick := time.NewTicker(s.config.PollDefaultRoute) + for { + select { + case <-tick.C: + logp.Debug("sniffer", "polling default route") + current = s.poll(current, device) + case <-refresh: + logp.Debug("sniffer", "requested new default route") + current = s.poll(current, device) + case <-s.done: + logp.Info("closing default route poller") + close(device) + tick.Stop() + return + } + // Purge any unused refresh request. The chan has a cap + // of one and the send is conditional so we don't need + // to do this in a loop. + select { + case <-refresh: + default: + } + } + }() +} + +// poll returns the current default route interface and sends it on device +// if it has change from the old default route interface. If device resolution +// fails, the default route interface is left unchanged. +func (s *Sniffer) poll(old string, device chan<- string) (current string) { + current, err := resolveDeviceName(s.config.Device) + if err != nil { + logp.Warn("sniffer failed to poll default route device: %v", err) + return old + } + if current != old { + logp.Info("sniffer changing default route device: %s -> %s", old, current) + s.state.Store(snifferInactive) // Mark current device as stale. ¯\_(ツ)_/¯ + device <- current // Pass the new device name. + defaultRouteMetric.Set(current) + } + return current +} + +// sniffStatic performs the sniffing work on a single static interface. +func (s *Sniffer) sniffStatic(device string) error { + handle, err := s.open(device) if err != nil { return fmt.Errorf("failed to start sniffer: %w", err) } defer handle.Close() + dec, err := s.decoders(handle.LinkType()) + if err != nil { + return err + } + + return s.sniffHandle(handle, dec, nil) +} + +// sniffDynamic performs sniffing work on a stream of dynamic interfaces from +// defaultRoute decoders are retained between successive interfaces if they are +// the same link type. +func (s *Sniffer) sniffDynamic(defaultRoute <-chan string, refresh chan<- struct{}) error { + var ( + last layers.LinkType + dec *decoder.Decoder + ) + for device := range defaultRoute { + var err error + last, dec, err = s.sniffOneDynamic(device, last, dec, refresh) + if err != nil { + return err + } + } + return nil +} + +// sniffOneDynamic handles sniffing a single device that may change link type. +// If the link type associated with the device differs from the last link +// type or dec is nil, a new decoder is returned. The link type associated +// with the device is returned. +func (s *Sniffer) sniffOneDynamic(device string, last layers.LinkType, dec *decoder.Decoder, refresh chan<- struct{}) (layers.LinkType, *decoder.Decoder, error) { + handle, err := s.open(device) + if err != nil { + return last, dec, fmt.Errorf("failed to start sniffer: %w", err) + } + defer handle.Close() + + linkType := handle.LinkType() + if dec == nil || linkType != last { + logp.Info("changing link type: %d -> %d", last, linkType) + dec, err = s.decoders(linkType) + if err != nil { + return linkType, dec, err + } + } + + err = s.sniffHandle(handle, dec, refresh) + return linkType, dec, err +} + +// sniff performs the sniffing work and writing dump files if requested. +func (s *Sniffer) sniffHandle(handle snifferHandle, dec *decoder.Decoder, refresh chan<- struct{}) error { var w *pcapgo.Writer if s.config.Dumpfile != "" { - f, err := os.Create(s.config.Dumpfile) + const timeSuffixFormat = "20060102150405" + filename := fmt.Sprintf("%s-%s.pcap", s.config.Dumpfile, time.Now().Format(timeSuffixFormat)) + logp.Info("creating new dump file %s", filename) + f, err := os.Create(filename) if err != nil { return err } @@ -146,11 +290,6 @@ func (s *Sniffer) Run() error { } } - decoder, err := s.decoders(handle.LinkType()) - if err != nil { - return err - } - // Mark inactive sniffer as active. In case of the sniffer/packetbeat closing // before/while Run is executed, the state will be snifferClosing. // => return if state is already snifferClosing. @@ -159,7 +298,10 @@ func (s *Sniffer) Run() error { } defer s.state.Store(snifferInactive) - var packets int + var ( + packets int + timeouts int + ) for s.state.Load() == snifferActive { if s.config.OneAtATime { fmt.Fprintln(os.Stdout, "Press enter to read packet") @@ -168,9 +310,23 @@ func (s *Sniffer) Run() error { data, ci, err := handle.ReadPacketData() if err == pcap.NextErrorTimeoutExpired || isAfpacketErrTimeout(err) { //nolint:errorlint // pcap.NextErrorTimeoutExpired is not wrapped. - logp.Debug("sniffer", "timedout") + logp.Debug("sniffer", "timed out") + + // If we have timed out too many times and we are following + // a default route, request a new default route interface. + const maxTimeouts = 10 // Place-holder until we have a sensible notion of how big this should be. + timeouts++ + if s.followDefault && timeouts > maxTimeouts { + select { + case refresh <- struct{}{}: + default: + // Don't request to refresh if already requested. + } + timeouts = 0 + } continue } + timeouts = 0 if err != nil { // ignore EOF, if sniffer was driven from file @@ -178,6 +334,18 @@ func (s *Sniffer) Run() error { return nil } + // If we are following a default route, request an interface + // refresh and log the error. + if s.followDefault { + select { + case refresh <- struct{}{}: + default: + // Don't request to refresh if already requested. + } + logp.Warn("error during packet capture: %v", err) + continue + } + s.state.Store(snifferInactive) return fmt.Errorf("sniffing error: %w", err) } @@ -197,22 +365,22 @@ func (s *Sniffer) Run() error { } logp.Debug("sniffer", "Packet number: %d", packets) - decoder.OnPacket(data, &ci) + dec.OnPacket(data, &ci) } return nil } -func (s *Sniffer) open() (snifferHandle, error) { +func (s *Sniffer) open(device string) (snifferHandle, error) { if s.config.File != "" { return newFileHandler(s.config.File, s.config.TopSpeed, s.config.Loop) } switch s.config.Type { case "pcap": - return openPcap(s.filter, &s.config) + return openPcap(device, s.filter, &s.config) case "af_packet": - return openAFPacket(s.filter, &s.config) + return openAFPacket(device, s.filter, &s.config) default: return nil, fmt.Errorf("unknown sniffer type: %s", s.config.Type) } @@ -222,6 +390,9 @@ func (s *Sniffer) open() (snifferHandle, error) { // signal has been given. func (s *Sniffer) Stop() { s.state.Store(snifferClosing) + if s.done != nil { + close(s.done) + } } func validateConfig(filter string, cfg *config.InterfacesConfig) error { @@ -258,10 +429,10 @@ func validatePcapFilter(expr string) error { return err } -func openPcap(filter string, cfg *config.InterfacesConfig) (snifferHandle, error) { +func openPcap(device, filter string, cfg *config.InterfacesConfig) (snifferHandle, error) { snaplen := int32(cfg.Snaplen) timeout := 500 * time.Millisecond - h, err := pcap.OpenLive(cfg.Device, snaplen, true, timeout) + h, err := pcap.OpenLive(device, snaplen, true, timeout) if err != nil { return nil, err } @@ -275,14 +446,14 @@ func openPcap(filter string, cfg *config.InterfacesConfig) (snifferHandle, error return h, nil } -func openAFPacket(filter string, cfg *config.InterfacesConfig) (snifferHandle, error) { +func openAFPacket(device, filter string, cfg *config.InterfacesConfig) (snifferHandle, error) { szFrame, szBlock, numBlocks, err := afpacketComputeSize(cfg.BufferSizeMb, cfg.Snaplen, os.Getpagesize()) if err != nil { return nil, err } timeout := 500 * time.Millisecond - h, err := newAfpacketHandle(cfg.Device, szFrame, szBlock, numBlocks, timeout, cfg.EnableAutoPromiscMode) + h, err := newAfpacketHandle(device, szFrame, szBlock, numBlocks, timeout, cfg.EnableAutoPromiscMode) if err != nil { return nil, err } diff --git a/x-pack/packetbeat/packetbeat.yml b/x-pack/packetbeat/packetbeat.yml index 565d31e17c45..ace50f29d3ec 100644 --- a/x-pack/packetbeat/packetbeat.yml +++ b/x-pack/packetbeat/packetbeat.yml @@ -15,6 +15,11 @@ # to sniff on the device carrying the default route. packetbeat.interfaces.device: any +# Specify the amount of time between polling for changes in the default +# route. This option is only used when one of the default route devices +# is specified. +packetbeat.interfaces.poll_default_route: 1m + # The network CIDR blocks that are considered "internal" networks for # the purpose of network perimeter boundary classification. The valid # values for internal_networks are the same as those that can be used