Skip to content

Commit

Permalink
Rewrite arch/bpf (#4497)
Browse files Browse the repository at this point in the history
* Rewrite arch/bpf

* Adapt for NetBSD

* Adapt for Darwin

* Cleanup VEthPair test

* Cleanup get_if_raw_hwaddr where not useful

* Test on all BSDs

* Some tests only work on Little endian machines
  • Loading branch information
gpotter2 committed Sep 2, 2024
1 parent dcb0e0c commit 528626a
Show file tree
Hide file tree
Showing 24 changed files with 1,772 additions and 1,069 deletions.
31 changes: 7 additions & 24 deletions scapy/arch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
from scapy.config import conf, _set_conf_sockets
from scapy.consts import LINUX, SOLARIS, WINDOWS, BSD
from scapy.data import (
ARPHDR_ETHER,
ARPHDR_LOOPBACK,
ARPHDR_PPP,
ARPHDR_TUN,
IPV6_ADDR_GLOBAL,
IPV6_ADDR_LOOPBACK,
)
from scapy.error import log_loading, Scapy_Exception
from scapy.interfaces import _GlobInterfaceType, network_name
from scapy.error import log_loading
from scapy.interfaces import (
_GlobInterfaceType,
network_name,
resolve_iface,
)
from scapy.pton_ntop import inet_pton, inet_ntop

from scapy.libs.extcap import load_extcap
Expand Down Expand Up @@ -50,7 +50,6 @@
"get_if_list",
"get_if_raw_addr",
"get_if_raw_addr6",
"get_if_raw_hwaddr",
"get_working_if",
"in6_getifaddr",
"read_nameservers",
Expand Down Expand Up @@ -89,12 +88,7 @@ def get_if_hwaddr(iff):
"""
Returns the MAC (hardware) address of an interface
"""
from scapy.arch import get_if_raw_hwaddr
addrfamily, mac = get_if_raw_hwaddr(iff) # noqa: F405
if addrfamily in [ARPHDR_ETHER, ARPHDR_LOOPBACK, ARPHDR_PPP, ARPHDR_TUN]:
return str2mac(mac)
else:
raise Scapy_Exception("Unsupported address family (%i) for interface [%s]" % (addrfamily, iff)) # noqa: E501
return resolve_iface(iff).mac or "00:00:00:00:00:00"


def get_if_addr6(niff):
Expand Down Expand Up @@ -128,9 +122,7 @@ def get_if_raw_addr6(iff):

# Next step is to import following architecture specific functions:
# def attach_filter(s, filter, iface)
# def get_if(iff,cmd)
# def get_if_raw_addr(iff)
# def get_if_raw_hwaddr(iff)
# def in6_getifaddr()
# def read_nameservers()
# def read_routes()
Expand All @@ -140,12 +132,6 @@ def get_if_raw_addr6(iff):
if LINUX:
from scapy.arch.linux import * # noqa F403
elif BSD:
from scapy.arch.unix import ( # noqa F403
read_nameservers,
read_routes,
read_routes6,
in6_getifaddr,
)
from scapy.arch.bpf.core import * # noqa F403
if not conf.use_pcap:
# Native
Expand All @@ -168,9 +154,6 @@ def get_if_raw_addr6(iff):
def get_if_raw_addr(iff: Union['NetworkInterface', str]) -> bytes:
return b"\0\0\0\0"

def get_if_raw_hwaddr(iff: Union['NetworkInterface', str]) -> Tuple[int, bytes]:
return -1, b""

def in6_getifaddr() -> List[Tuple[str, int, str]]:
return []

Expand Down
226 changes: 47 additions & 179 deletions scapy/arch/bpf/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,150 +8,43 @@
"""


from ctypes import cdll, cast, pointer
from ctypes import c_int, c_ulong, c_uint, c_char_p, Structure, POINTER
from ctypes.util import find_library
import fcntl
import os
import re
import socket
import struct
import subprocess

import scapy
from scapy.arch.bpf.consts import BIOCSETF, SIOCGIFFLAGS, BIOCSETIF
from scapy.arch.common import compile_filter, _iff_flags
from scapy.arch.unix import get_if, in6_getifaddr
from scapy.compat import plain_str
from scapy.arch.bpf.consts import BIOCSETF, BIOCSETIF
from scapy.arch.common import compile_filter
from scapy.config import conf
from scapy.consts import LINUX
from scapy.data import ARPHDR_LOOPBACK, ARPHDR_ETHER
from scapy.error import Scapy_Exception, warning
from scapy.error import Scapy_Exception
from scapy.interfaces import (
InterfaceProvider,
NetworkInterface,
network_name,
_GlobInterfaceType,
)
from scapy.pton_ntop import inet_ntop

# re-export
from scapy.arch.bpf.pfroute import ( # noqa F403
read_routes,
read_routes6,
_get_if_list,
)
from scapy.arch.common import get_if_raw_addr, read_nameservers # noqa: F401

# Typing
from typing import (
Dict,
List,
Optional,
Tuple,
)

if LINUX:
raise OSError("BPF conflicts with Linux")


# ctypes definitions

LIBC = cdll.LoadLibrary(find_library("c"))

LIBC.ioctl.argtypes = [c_int, c_ulong, ]
LIBC.ioctl.restype = c_int

# The following is implemented as of Python >= 3.3
# under socket.*. Remember to use them when dropping Py2.7

# See https://docs.python.org/3/library/socket.html#socket.if_nameindex


class if_nameindex(Structure):
_fields_ = [("if_index", c_uint),
("if_name", c_char_p)]


_ptr_ifnameindex_table = POINTER(if_nameindex * 255)

LIBC.if_nameindex.argtypes = []
LIBC.if_nameindex.restype = _ptr_ifnameindex_table
LIBC.if_freenameindex.argtypes = [_ptr_ifnameindex_table]
LIBC.if_freenameindex.restype = None

# Addresses manipulation functions


def get_if_raw_addr(ifname):
# type: (_GlobInterfaceType) -> bytes
"""
Returns the IPv4 address configured on 'ifname', packed with inet_pton.
"""

ifname = network_name(ifname)

# Get ifconfig output
subproc = subprocess.Popen(
[conf.prog.ifconfig, ifname],
close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = subproc.communicate()
if subproc.returncode:
warning("Failed to execute ifconfig: (%s)", plain_str(stderr).strip())
return b"\0\0\0\0"

# Get IPv4 addresses
addresses = [
line.strip() for line in plain_str(stdout).splitlines()
if "inet " in line
]

if not addresses:
warning("No IPv4 address found on %s !", ifname)
return b"\0\0\0\0"

# Pack the first address
address = addresses[0].split(' ')[1]
if '/' in address: # NetBSD 8.0
address = address.split("/")[0]
return socket.inet_pton(socket.AF_INET, address)


def get_if_raw_hwaddr(ifname):
# type: (_GlobInterfaceType) -> Tuple[int, bytes]
"""Returns the packed MAC address configured on 'ifname'."""

NULL_MAC_ADDRESS = b'\x00' * 6

ifname = network_name(ifname)
# Handle the loopback interface separately
if ifname == conf.loopback_name:
return (ARPHDR_LOOPBACK, NULL_MAC_ADDRESS)

# Get ifconfig output
subproc = subprocess.Popen(
[conf.prog.ifconfig, ifname],
close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = subproc.communicate()
if subproc.returncode:
raise Scapy_Exception("Failed to execute ifconfig: (%s)" %
plain_str(stderr).strip())

# Get MAC addresses
addresses = [
line.strip() for line in plain_str(stdout).splitlines() if (
"ether" in line or "lladdr" in line or "address" in line
)
]
if not addresses:
raise Scapy_Exception("No MAC address found on %s !" % ifname)

# Pack and return the MAC address
mac = [int(b, 16) for b in addresses[0].split(' ')[1].split(':')]

# Check that the address length is correct
if len(mac) != 6:
raise Scapy_Exception("No MAC address found on %s !" % ifname)

return (ARPHDR_ETHER, struct.pack("!BBBBBB", *mac))


# BPF specific functions


def get_dev_bpf():
# type: () -> Tuple[int, int]
"""Returns an opened BPF file object"""
Expand All @@ -163,10 +56,13 @@ def get_dev_bpf():
return (fd, bpf)
except OSError as ex:
if ex.errno == 13: # Permission denied
raise Scapy_Exception((
"Permission denied: could not open /dev/bpf%i. "
"Make sure to be running Scapy as root ! (sudo)"
) % bpf)
raise Scapy_Exception(
(
"Permission denied: could not open /dev/bpf%i. "
"Make sure to be running Scapy as root ! (sudo)"
)
% bpf
)
continue

raise Scapy_Exception("No /dev/bpf handle is available !")
Expand All @@ -177,45 +73,31 @@ def attach_filter(fd, bpf_filter, iface):
"""Attach a BPF filter to the BPF file descriptor"""
bp = compile_filter(bpf_filter, iface)
# Assign the BPF program to the interface
ret = LIBC.ioctl(c_int(fd), BIOCSETF, cast(pointer(bp), c_char_p))
ret = fcntl.ioctl(fd, BIOCSETF, bp)
if ret < 0:
raise Scapy_Exception("Can't attach the BPF filter !")


# Interface manipulation functions

def _get_ifindex_list():
# type: () -> List[Tuple[str, int]]
def in6_getifaddr():
# type: () -> List[Tuple[str, int, str]]
"""
Returns a list containing (iface, index)
"""
ptr = LIBC.if_nameindex()
ifaces = []
for i in range(255):
iface = ptr.contents[i]
if not iface.if_name:
break
ifaces.append((plain_str(iface.if_name), iface.if_index))
LIBC.if_freenameindex(ptr)
return ifaces


_IFNUM = re.compile(r"([0-9]*)([ab]?)$")
Returns a list of 3-tuples of the form (addr, scope, iface) where
'addr' is the address of scope 'scope' associated to the interface
'iface'.
This is the list of all addresses of all interfaces available on
the system.
"""
ifaces = _get_if_list()
return [
(ip["address"], ip["scope"], iface["name"])
for iface in ifaces.values()
for ip in iface["ips"]
if ip["af_family"] == socket.AF_INET6
]

def _get_if_flags(ifname):
# type: (_GlobInterfaceType) -> Optional[int]
"""Internal function to get interface flags"""
# Get interface flags
try:
result = get_if(ifname, SIOCGIFFLAGS)
except IOError:
warning("ioctl(SIOCGIFFLAGS) failed on %s !", ifname)
return None

# Convert flags
ifflags = struct.unpack("16xH14x", result)[0]
return ifflags
# Interface provider


class BPFInterfaceProvider(InterfaceProvider):
Expand All @@ -234,8 +116,7 @@ def _is_valid(self, dev):
raise Scapy_Exception("No /dev/bpf are available !")
# Check if the interface can be used
try:
fcntl.ioctl(fd, BIOCSETIF, struct.pack("16s16x",
dev.network_name.encode()))
fcntl.ioctl(fd, BIOCSETIF, struct.pack("16s16x", dev.network_name.encode()))
except IOError:
return False
else:
Expand All @@ -246,30 +127,17 @@ def _is_valid(self, dev):

def load(self):
# type: () -> Dict[str, NetworkInterface]
from scapy.fields import FlagValue
data = {}
ips = in6_getifaddr()
for ifname, index in _get_ifindex_list():
try:
ifflags_int = _get_if_flags(ifname)
if ifflags_int is None:
continue
mac = scapy.utils.str2mac(get_if_raw_hwaddr(ifname)[1])
ip = inet_ntop(socket.AF_INET, get_if_raw_addr(ifname))
except Scapy_Exception:
continue
ifflags = FlagValue(ifflags_int, _iff_flags)
if_data = {
"name": ifname,
"network_name": ifname,
"description": ifname,
"flags": ifflags,
"index": index,
"ip": ip,
"ips": [x[0] for x in ips if x[2] == ifname] + [ip],
"mac": mac
}
data[ifname] = NetworkInterface(self, if_data)
for iface in _get_if_list().values():
if_data = iface.copy()
if_data.update(
{
"network_name": iface["name"],
"description": iface["name"],
"ips": [x["address"] for x in iface["ips"]],
}
)
data[iface["name"]] = NetworkInterface(self, if_data)
return data


Expand Down
Loading

0 comments on commit 528626a

Please sign in to comment.