From 8b3297a2ca6526ef469a5afde2eafb03bb88f9a4 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Sat, 26 Jun 2021 19:55:43 +0000 Subject: [PATCH] Add af_packet capture engine It is a high performant alternative to libpcap engine. Only Linux supported. Performance gain can be up to 50% depending on traffic type. Can be enabled using: `--input-raw-engine af_packet` --- capture/af_packet.go | 115 +++++++++++++++++++++++++++++++++++++++++++ capture/capture.go | 39 ++++++++++++++- 2 files changed, 153 insertions(+), 1 deletion(-) create mode 100644 capture/af_packet.go diff --git a/capture/af_packet.go b/capture/af_packet.go new file mode 100644 index 000000000..1c014db4e --- /dev/null +++ b/capture/af_packet.go @@ -0,0 +1,115 @@ +package capture + +import ( + "fmt" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/afpacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcap" + "golang.org/x/net/bpf" + + _ "github.com/google/gopacket/layers" +) + +type afpacketHandle struct { + TPacket *afpacket.TPacket +} + +func newAfpacketHandle(device string, snaplen int, block_size int, num_blocks int, + useVLAN bool, timeout time.Duration) (*afpacketHandle, error) { + + h := &afpacketHandle{} + var err error + + if device == "any" { + h.TPacket, err = afpacket.NewTPacket( + afpacket.OptFrameSize(snaplen), + afpacket.OptBlockSize(block_size), + afpacket.OptNumBlocks(num_blocks), + afpacket.OptAddVLANHeader(useVLAN), + afpacket.OptPollTimeout(timeout), + afpacket.SocketRaw, + afpacket.TPacketVersion3) + } else { + h.TPacket, err = afpacket.NewTPacket( + afpacket.OptInterface(device), + afpacket.OptFrameSize(snaplen), + afpacket.OptBlockSize(block_size), + afpacket.OptNumBlocks(num_blocks), + afpacket.OptAddVLANHeader(useVLAN), + afpacket.OptPollTimeout(timeout), + afpacket.SocketRaw, + afpacket.TPacketVersion3) + } + return h, err +} + +// ZeroCopyReadPacketData satisfies ZeroCopyPacketDataSource interface +func (h *afpacketHandle) ReadPacketData() (data []byte, ci gopacket.CaptureInfo, err error) { + return h.TPacket.ReadPacketData() +} + +// SetBPFFilter translates a BPF filter string into BPF RawInstruction and applies them. +func (h *afpacketHandle) SetBPFFilter(filter string, snaplen int) (err error) { + pcapBPF, err := pcap.CompileBPFFilter(layers.LinkTypeEthernet, snaplen, filter) + if err != nil { + return err + } + bpfIns := []bpf.RawInstruction{} + for _, ins := range pcapBPF { + bpfIns2 := bpf.RawInstruction{ + Op: ins.Code, + Jt: ins.Jt, + Jf: ins.Jf, + K: ins.K, + } + bpfIns = append(bpfIns, bpfIns2) + } + if h.TPacket.SetBPF(bpfIns); err != nil { + return err + } + return h.TPacket.SetBPF(bpfIns) +} + +// LinkType returns ethernet link type. +func (h *afpacketHandle) LinkType() layers.LinkType { + return layers.LinkTypeEthernet +} + +// Close will close afpacket source. +func (h *afpacketHandle) Close() { + h.TPacket.Close() +} + +// SocketStats prints received, dropped, queue-freeze packet stats. +func (h *afpacketHandle) SocketStats() (as afpacket.SocketStats, asv afpacket.SocketStatsV3, err error) { + return h.TPacket.SocketStats() +} + +// afpacketComputeSize computes the block_size and the num_blocks in such a way that the +// allocated mmap buffer is close to but smaller than target_size_mb. +// The restriction is that the block_size must be divisible by both the +// frame size and page size. +func afpacketComputeSize(targetSizeMb int, snaplen int, pageSize int) ( + frameSize int, blockSize int, numBlocks int, err error) { + + if snaplen < pageSize { + frameSize = pageSize / (pageSize / snaplen) + } else { + frameSize = (snaplen/pageSize + 1) * pageSize + } + + // 128 is the default from the gopacket library so just use that + blockSize = frameSize * 128 + numBlocks = (targetSizeMb * 1024 * 1024) / blockSize + + fmt.Println(blockSize, (targetSizeMb * 1024 * 1024), pageSize, snaplen) + + if numBlocks == 0 { + return 0, 0, 0, fmt.Errorf("Interface buffersize is too small") + } + + return frameSize, blockSize, numBlocks, nil +} diff --git a/capture/capture.go b/capture/capture.go index 56fc4c3b0..ab99ddc21 100644 --- a/capture/capture.go +++ b/capture/capture.go @@ -65,6 +65,7 @@ const ( EnginePcap EngineType = 1 << iota EnginePcapFile EngineRawSocket + EngineAFPacket ) // Set is here so that EngineType can implement flag.Var @@ -74,8 +75,10 @@ func (eng *EngineType) Set(v string) error { *eng = EnginePcap case "pcap_file": *eng = EnginePcapFile - case "raw_socket", "af_packet": + case "raw_socket": *eng = EngineRawSocket + case "af_packet": + *eng = EngineAFPacket default: return fmt.Errorf("invalid engine %s", v) } @@ -90,6 +93,8 @@ func (eng *EngineType) String() (e string) { e = "libpcap" case EngineRawSocket: e = "raw_socket" + case EngineAFPacket: + e = "af_packet" default: e = "" } @@ -124,6 +129,9 @@ func NewListener(host string, ports []uint16, transport string, engine EngineTyp case EngineRawSocket: l.Engine = EngineRawSocket l.Activate = l.activateRawSocket + case EngineAFPacket: + l.Engine = EngineAFPacket + l.Activate = l.activateAFPacket case EnginePcapFile: l.Engine = EnginePcapFile l.Activate = l.activatePcapFile @@ -427,6 +435,35 @@ func (l *Listener) activatePcapFile() (err error) { return } +func (l *Listener) activateAFPacket() error { + szFrame, szBlock, numBlocks, err := afpacketComputeSize(32, 32<<10, os.Getpagesize()) + if err != nil { + return err + } + + var msg string + for _, ifi := range l.Interfaces { + handle, err := newAfpacketHandle(ifi.Name, szFrame, szBlock, numBlocks, false, pcap.BlockForever) + + if err != nil { + msg += ("\n" + err.Error()) + continue + } + + l.BPFFilter = l.Filter(ifi) + fmt.Println("Interface:", ifi.Name, ". BPF Filter:", l.BPFFilter) + handle.SetBPFFilter(l.BPFFilter, 64<<10) + + l.Handles[ifi.Name] = handle + } + + if len(l.Handles) == 0 { + return fmt.Errorf("pcap handles error:%s", msg) + } + + return nil +} + func (l *Listener) setInterfaces() (err error) { var pifis []pcap.Interface pifis, err = pcap.FindAllDevs()