From bd9024108907abf06b3abc47c022de040bd0a8ca Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Mon, 25 Oct 2021 20:30:23 +0200 Subject: [PATCH 1/9] flow/worker: track local evictions & injected flows --- src/flow-worker.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/flow-worker.c b/src/flow-worker.c index 984b81a875c6..fd68150ea87d 100644 --- a/src/flow-worker.c +++ b/src/flow-worker.c @@ -79,7 +79,10 @@ typedef struct FlowWorkerThreadData_ { struct { uint16_t flows_injected; + uint16_t flows_injected_avg; + uint16_t flows_injected_max; uint16_t flows_removed; + uint16_t flows_removed_max; uint16_t flows_aside_needs_work; uint16_t flows_aside_pkt_inject; } cnt; @@ -251,7 +254,10 @@ static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void * fw->cnt.flows_aside_needs_work = StatsRegisterCounter("flow.wrk.flows_evicted_needs_work", tv); fw->cnt.flows_aside_pkt_inject = StatsRegisterCounter("flow.wrk.flows_evicted_pkt_inject", tv); fw->cnt.flows_removed = StatsRegisterCounter("flow.wrk.flows_evicted", tv); + fw->cnt.flows_removed_max = StatsRegisterMaxCounter("flow.wrk.flows_evicted_max", tv); fw->cnt.flows_injected = StatsRegisterCounter("flow.wrk.flows_injected", tv); + fw->cnt.flows_injected_avg = StatsRegisterAvgCounter("flow.wrk.flows_injected_avg", tv); + fw->cnt.flows_injected_max = StatsRegisterMaxCounter("flow.wrk.flows_injected_max", tv); fw->fls.dtv = fw->dtv = DecodeThreadVarsAlloc(tv); if (fw->dtv == NULL) { @@ -453,7 +459,9 @@ static inline void FlowWorkerProcessInjectedFlows(ThreadVars *tv, if (SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) injected = FlowQueueExtractPrivate(tv->flow_queue); if (injected.len > 0) { + StatsAddUI64(tv, fw->cnt.flows_injected_avg, (uint64_t)injected.len); StatsAddUI64(tv, fw->cnt.flows_injected, (uint64_t)injected.len); + StatsSetUI64(tv, fw->cnt.flows_injected_max, (uint64_t)injected.len); FlowTimeoutCounters counters = { 0, 0, }; CheckWorkQueue(tv, fw, detect_thread, &counters, &injected); @@ -471,6 +479,7 @@ static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED); if (fw->fls.work_queue.len) { StatsAddUI64(tv, fw->cnt.flows_removed, (uint64_t)fw->fls.work_queue.len); + StatsSetUI64(tv, fw->cnt.flows_removed_max, (uint64_t)fw->fls.work_queue.len); FlowTimeoutCounters counters = { 0, 0, }; CheckWorkQueue(tv, fw, detect_thread, &counters, &fw->fls.work_queue); From 4a60f80fed58d22bdb0d2d6b21027569c533ebef Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Wed, 27 Oct 2021 16:36:59 +0200 Subject: [PATCH 2/9] afpacket: detect threads getting stuck Add tpacket v2 ring diag code that dumps the ring if we appear stuck for a long time. --- src/source-af-packet.c | 171 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 170 insertions(+), 1 deletion(-) diff --git a/src/source-af-packet.c b/src/source-af-packet.c index 60303919b342..e858079db803 100644 --- a/src/source-af-packet.c +++ b/src/source-af-packet.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2011-2018 Open Information Security Foundation +/* Copyright (C) 2011-2021 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -169,6 +169,10 @@ TmEcode NoAFPSupportExit(ThreadVars *tv, const void *initdata, void **data) #define POLL_TIMEOUT 100 +#ifndef TP_STATUS_TS_SOFTWARE +#define TP_STATUS_TS_SOFTWARE (1 << 29) +#endif + #ifndef TP_STATUS_USER_BUSY /* for new use latest bit available in tp_status */ #define TP_STATUS_USER_BUSY BIT_U32(31) @@ -238,6 +242,8 @@ typedef struct AFPThreadVars_ uint16_t capture_kernel_drops; uint16_t capture_errors; + int32_t last_drops; + /* handle state */ uint8_t afp_state; uint8_t copy_mode; @@ -334,6 +340,128 @@ void TmModuleReceiveAFPRegister (void) } +/* debug dump of the ring */ +static void AFPCheckTpacketv2Ring(AFPThreadVars *ptv) +{ + if (ptv->flags & AFP_TPACKET_V3) + return; + + union thdr h; + uint32_t kernel = 0; + uint32_t ready = 0; + uint32_t inprogress = 0; + uint32_t something_else = 0; + uint32_t just_losing = 0; + uint32_t has_losing = 0; + uint32_t all_flags = 0; + uint32_t first_something_else = 0; + + uint32_t csum_not_ready = 0; + uint32_t csum_valid = 0; + uint32_t vlan_valid = 0; + + int32_t unknown_flag_cnt = 0; + int32_t unknown_flags = 0; + + /* mask of expected flags */ + uint32_t mask = TP_STATUS_USER | TP_STATUS_USER_BUSY | TP_STATUS_LOSING | TP_STATUS_CSUM_VALID | + TP_STATUS_VLAN_VALID | TP_STATUS_CSUMNOTREADY | TP_STATUS_TS_SOFTWARE; + + int state = -1; + int state_changes = 0; + + unsigned int frame_offset = 0; + while (1) { + /* Read packet from ring */ + h.raw = (((union thdr **)ptv->ring.v2)[frame_offset]); + if (h.raw == NULL) { + SCLogNotice("ERROR at frame_offset %u", frame_offset); + break; + } + const uint32_t tp_status = h.h2->tp_status; // read once + all_flags |= tp_status; + + if (tp_status & ~mask) { + unknown_flags |= (tp_status & ~mask); + unknown_flag_cnt++; + } + + bool state_change = false; + const char *s; + if (tp_status == TP_STATUS_KERNEL) { + kernel++; + s = "kernel"; + + if (state == -1) + state = 0; + if (state != 0) { + state = 0; + state_changes++; + state_change = true; + } + + } else if (tp_status & 1) { + if (state == -1) + state = 1; + if (state != 1) { + state = 1; + state_changes++; + state_change = true; + } + ready++; + s = "ready"; + } else if (tp_status & TP_STATUS_USER_BUSY) { + inprogress++; + s = "in progress"; + if (state == -1) + state = 2; + if (state != 2) { + state = 2; + state_changes++; + state_change = true; + } + } else { + if (first_something_else == 0) { + first_something_else = tp_status; + } + + if (tp_status == TP_STATUS_LOSING) { + just_losing++; + } + if (tp_status & TP_STATUS_LOSING) { + has_losing++; + } + if (tp_status & TP_STATUS_CSUM_VALID) { + csum_valid++; + } + if (tp_status & TP_STATUS_VLAN_VALID) { + vlan_valid++; + } + if (tp_status & TP_STATUS_CSUMNOTREADY) { + csum_not_ready++; + } + something_else++; + s = "weird"; + } + + SCLogNotice("ring[%03u]: %08x (%s) %s (state %d change %s)", frame_offset, tp_status, s, + (frame_offset == ptv->frame_offset) ? " <---- SURI " : "", state, + state_change ? "TRUE" : "false"); + + if (++frame_offset >= ptv->req.v2.tp_frame_nr) { + break; + } + } + SCLogNotice("scan result: frames %u kernel %u ready %u inprogress %u something_else %u", + ptv->req.v2.tp_frame_nr, kernel, ready, inprogress, something_else); + SCLogNotice("scan result: all_flags %08x just_losing %u has_losing %u first_something_else " + "%08x", + all_flags, just_losing, has_losing, first_something_else); + SCLogNotice("scan result: unknown_flag_cnt %u unknown_flags %08x csum_not_ready %u " + "csum_valid %u vlan_valid %u", + unknown_flag_cnt, unknown_flags, csum_not_ready, csum_valid, vlan_valid); + SCLogNotice("scan result: state_changes %d", state_changes); +} /** * \defgroup afppeers AFP peers list @@ -567,6 +695,7 @@ static inline void AFPDumpCounters(AFPThreadVars *ptv) StatsAddUI64(ptv->tv, ptv->capture_kernel_drops, kstats.tp_drops); (void) SC_ATOMIC_ADD(ptv->livedev->drop, (uint64_t) kstats.tp_drops); (void) SC_ATOMIC_ADD(ptv->livedev->pkts, (uint64_t) kstats.tp_packets); + ptv->last_drops = kstats.tp_drops; } #endif } @@ -1462,6 +1591,9 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot) int (*AFPReadFunc) (AFPThreadVars *); uint64_t discarded_pkts = 0; + uint64_t timeout = 0; + time_t timeout_ts = 0; + ptv->slot = s->slot_next; if (ptv->flags & AFP_RING_MODE) { @@ -1542,6 +1674,14 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot) PacketPoolWait(); r = poll(&fds, 1, POLL_TIMEOUT); + if (unlikely(r != 0)) { + if (r > 0 && timeout_ts != 0) { + /* this means we misdetected the hang. The issue in bug 4785 would never recover */ + SCLogNotice("looks like thread %s recovered from a suspected hang", tv->name); + } + timeout_ts = 0; + timeout = 0; + } if (suricata_ctl_flags != 0) { break; @@ -1593,11 +1733,40 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot) break; } } else if (unlikely(r == 0)) { + timeout++; + /* Trigger one dump of stats every second */ current_time = time(NULL); if (current_time != last_dump) { AFPDumpCounters(ptv); last_dump = current_time; + + /* our hang detection relies on continued traffic, which is case of a hang would all + * be drops. So no drops, no hang detection. */ + if (!ptv->last_drops) { + timeout_ts = 0; + timeout = 0; + } else { + /* lets get at least a 100 timeouts in a row, each of them with drops */ + if (timeout > 100) { + if (timeout_ts == 0) { + SCLogWarning(SC_ERR_AFP_READ, + "looks like thread %s has an invalid socket just giving poll() " + "timeouts", + tv->name); + timeout_ts = current_time; + } else if ((current_time - timeout_ts) % 100 == 0) { + SCLogWarning(SC_ERR_AFP_READ, + "looks like thread %s has an invalid socket just giving poll() " + "timeouts (timed out %" PRIu64 + " times and %ld seconds in a row)", + tv->name, timeout, current_time - timeout_ts); + if (current_time - timeout_ts > 60) { + AFPCheckTpacketv2Ring(ptv); + } + } + } + } } /* poll timed out, lets see handle our timeout path */ TmThreadsCaptureHandleTimeout(tv, NULL); From 14d61a2d907a37e038289215f6188f0b81857781 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Sun, 31 Oct 2021 22:13:19 +0100 Subject: [PATCH 3/9] af-packet: fix soft lockup issues The Suricata AF_PACKET code opens a socket per thread, then after some minor setup enters a loop where the socket is poll()'d with a timeout. When the poll() call returns a non zero positive value, the AF_PACKET ring will be processed. The ringbuffer processing logic has a pointer into the ring where we last checked the ring. From this position we will inspect each frame until we find a frame with tp_status == TP_STATUS_KERNEL (so essentially 0). This means the frame is currently owned by the kernel. There is a special case handling for starting the ring processing but finding a TP_STATUS_KERNEL immediately. This logic then skip to the next frame, rerun the check, etc until it either finds an initialized frame or the last frame of the ringbuffer. The problem was, however, that the initial uninitialized frame was possibly (likely?) still being initialized by the kernel. A data race between the notification through the socket (the poll()) and the updating of the `tp_status` field in the frame could lead to a valid frame getting skipped. Of note is that for example libpcap does not do frame scanning. Instead it simply exits it ring processing loop. Also interesting is that libpcap uses atomic loads and stores on the tp_status field. This skipping of frames had 2 bad side effects: 1. in most cases, the buffer would be full enough that the frame would be processed in the next pass of the ring, but now the frame would out of order. This might have lead to packets belong to the same flow getting processed in the wrong order. 2. more severe is the soft lockup case. The skipped frame sits at ring buffer index 0. The rest of the ring has been cleared, after the initial frame was skipped. As our pass of the ring stops at the end of the ring (ptv->frame_offset + 1 == ptv->req.v2.tp_frame_nr) the code exits the ring processing loop at goes back to poll(). However, poll() will not indicate that there is more data, as the stale frame in the ring blocks the kernel from populating more frames beyond it. This is now a dead lock, as the kernel waits for Suricata and Suricata never touches the ring until it hears from the kernel. The scan logic will scan the whole ring at most once, so it won't reconsider the stale frame either. This patch addresses the issues in several ways: 1. the startup "discard" logic was fixed to not skip over kernel frames. Doing so would get us in a bad state at start up. 2. Instead of scanning the ring, we now enter a busy wait loop when encountering a kernel frame where we didn't expect one. This means that if we got a > 0 poll() result, we'll busy wait until we get at least one frame. 3. Error handling is unified and cleaned up. Any frame error now returns the frame to the kernel and progresses the frame pointer. 4. If we find a frame that is owned by us (TP_STATUS_USER_BUSY) we yield to poll() immediately, as the next expected status of that frame is TP_STATUS_KERNEL. 5. the ring is no longer processed until the "end" of the ring (so highest index), but instead we process at most one full ring size per run. 6. Work with a copy of `tp_status` instead of accessing original touched also by the kernel. Bug: #4785. --- src/source-af-packet.c | 329 +++++++++++++++++++++-------------------- 1 file changed, 169 insertions(+), 160 deletions(-) diff --git a/src/source-af-packet.c b/src/source-af-packet.c index e858079db803..bf23f3d81ef8 100644 --- a/src/source-af-packet.c +++ b/src/source-af-packet.c @@ -241,6 +241,7 @@ typedef struct AFPThreadVars_ uint16_t capture_kernel_packets; uint16_t capture_kernel_drops; uint16_t capture_errors; + uint16_t afpacket_spin; int32_t last_drops; @@ -967,6 +968,138 @@ static void AFPReleasePacket(Packet *p) PacketFreeOrRelease(p); } +/** \internal + * \brief recoverable error - release packet and + * return AFP_SURI_FAILURE + */ +static inline int AFPSuriFailure(AFPThreadVars *ptv, union thdr h) +{ + h.h2->tp_status = TP_STATUS_KERNEL; + if (++ptv->frame_offset >= ptv->req.v2.tp_frame_nr) { + ptv->frame_offset = 0; + } + SCReturnInt(AFP_SURI_FAILURE); +} + +static inline void AFPReadApplyBypass(const AFPThreadVars *ptv, Packet *p) +{ + if (ptv->flags & AFP_BYPASS) { + p->BypassPacketsFlow = AFPBypassCallback; +#ifdef HAVE_PACKET_EBPF + p->afp_v.v4_map_fd = ptv->v4_map_fd; + p->afp_v.v6_map_fd = ptv->v6_map_fd; + p->afp_v.nr_cpus = ptv->ebpf_t_config.cpus_count; +#endif + } + if (ptv->flags & AFP_XDPBYPASS) { + p->BypassPacketsFlow = AFPXDPBypassCallback; +#ifdef HAVE_PACKET_EBPF + p->afp_v.v4_map_fd = ptv->v4_map_fd; + p->afp_v.v6_map_fd = ptv->v6_map_fd; + p->afp_v.nr_cpus = ptv->ebpf_t_config.cpus_count; +#endif + } +} + +/** \internal + * \brief setup packet for AFPReadFromRing + */ +static bool AFPReadFromRingSetupPacket( + AFPThreadVars *ptv, union thdr h, const unsigned int tp_status, Packet *p) +{ + PKT_SET_SRC(p, PKT_SRC_WIRE); + + /* Suricata will treat packet so telling it is busy, this + * status will be reset to 0 (ie TP_STATUS_KERNEL) in the release + * function. */ + h.h2->tp_status |= TP_STATUS_USER_BUSY; + p->livedev = ptv->livedev; + p->datalink = ptv->datalink; + ptv->pkts++; + + AFPReadApplyBypass(ptv, p); + + if (h.h2->tp_len > h.h2->tp_snaplen) { + SCLogDebug("Packet length (%d) > snaplen (%d), truncating", h.h2->tp_len, h.h2->tp_snaplen); + } + + /* get vlan id from header */ + if ((ptv->flags & AFP_VLAN_IN_HEADER) && + (tp_status & TP_STATUS_VLAN_VALID || h.h2->tp_vlan_tci)) { + p->vlan_id[0] = h.h2->tp_vlan_tci & 0x0fff; + p->vlan_idx = 1; + } + + if (ptv->flags & AFP_ZERO_COPY) { + if (PacketSetData(p, (unsigned char *)h.raw + h.h2->tp_mac, h.h2->tp_snaplen) == -1) { + return false; + } + + p->afp_v.relptr = h.raw; + p->ReleasePacket = AFPReleasePacket; + p->afp_v.mpeer = ptv->mpeer; + AFPRefSocket(ptv->mpeer); + + p->afp_v.copy_mode = ptv->copy_mode; + if (p->afp_v.copy_mode != AFP_COPY_MODE_NONE) { + p->afp_v.peer = ptv->mpeer->peer; + } else { + p->afp_v.peer = NULL; + } + } else { + if (PacketCopyData(p, (unsigned char *)h.raw + h.h2->tp_mac, h.h2->tp_snaplen) == -1) { + return false; + } + } + /* Timestamp */ + p->ts.tv_sec = h.h2->tp_sec; + p->ts.tv_usec = h.h2->tp_nsec / 1000; + SCLogDebug("pktlen: %" PRIu32 " (pkt %p, pkt data %p)", GET_PKT_LEN(p), p, GET_PKT_DATA(p)); + + /* We only check for checksum disable */ + if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) { + p->flags |= PKT_IGNORE_CHECKSUM; + } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_AUTO) { + if (ChecksumAutoModeCheck(ptv->pkts, SC_ATOMIC_GET(ptv->livedev->pkts), + SC_ATOMIC_GET(ptv->livedev->invalid_checksums))) { + ptv->checksum_mode = CHECKSUM_VALIDATION_DISABLE; + p->flags |= PKT_IGNORE_CHECKSUM; + } + } else { + if (tp_status & TP_STATUS_CSUMNOTREADY) { + p->flags |= PKT_IGNORE_CHECKSUM; + } + } + return true; +} + +static inline int AFPReadFromRingWaitForPacket(AFPThreadVars *ptv) +{ + union thdr h; + uint64_t busy_loop_iter = 0; + + /* busy wait loop until we have packets available */ + while (1) { + if (unlikely(suricata_ctl_flags != 0)) { + break; + } + h.raw = (((union thdr **)ptv->ring.v2)[ptv->frame_offset]); + if (unlikely(h.raw == NULL)) { + return AFP_READ_FAILURE; + } + const unsigned int tp_status = h.h2->tp_status; + if (tp_status == TP_STATUS_KERNEL) { + busy_loop_iter++; + continue; + } + break; + } + if (busy_loop_iter) { + StatsAddUI64(ptv->tv, ptv->afpacket_spin, busy_loop_iter); + } + return AFP_READ_OK; +} + /** * \brief AF packet read function for ring * @@ -978,181 +1111,66 @@ static void AFPReleasePacket(Packet *p) */ static int AFPReadFromRing(AFPThreadVars *ptv) { - Packet *p = NULL; union thdr h; - uint8_t emergency_flush = 0; - int read_pkts = 0; - int loop_start = -1; + bool emergency_flush = false; + const unsigned int start_pos = ptv->frame_offset; + /* poll() told us there are frames, so lets wait for at least + * one frame to become available. */ + if (AFPReadFromRingWaitForPacket(ptv) != AFP_READ_OK) + return AFP_READ_FAILURE; - /* Loop till we have packets available */ + /* process the frames in the ring */ while (1) { if (unlikely(suricata_ctl_flags != 0)) { break; } - - /* Read packet from ring */ h.raw = (((union thdr **)ptv->ring.v2)[ptv->frame_offset]); if (unlikely(h.raw == NULL)) { - /* Impossible we reach this point in normal condition, so trigger - * a failure in reading */ - SCReturnInt(AFP_READ_FAILURE); + return AFP_READ_FAILURE; } - - if ((! h.h2->tp_status) || (h.h2->tp_status & TP_STATUS_USER_BUSY)) { - if (read_pkts == 0) { - if (loop_start == -1) { - loop_start = ptv->frame_offset; - } else if (unlikely(loop_start == (int)ptv->frame_offset)) { - SCReturnInt(AFP_READ_OK); - } - if (++ptv->frame_offset >= ptv->req.v2.tp_frame_nr) { - ptv->frame_offset = 0; - } - continue; - } - if ((emergency_flush) && (ptv->flags & AFP_EMERGENCY_MODE)) { - SCReturnInt(AFP_KERNEL_DROP); - } else { - SCReturnInt(AFP_READ_OK); - } - } - - read_pkts++; - loop_start = -1; - - /* Our packet is still used by suricata, we exit read loop to - * gain some time */ - if (h.h2->tp_status & TP_STATUS_USER_BUSY) { - SCReturnInt(AFP_READ_OK); + const unsigned int tp_status = h.h2->tp_status; + /* if we find a kernel frame we are done */ + if (tp_status == TP_STATUS_KERNEL) + break; + /* if in autofp mode the frame is still busy, return to poll */ + if (tp_status & TP_STATUS_USER_BUSY) { + break; } + emergency_flush |= ((tp_status & TP_STATUS_LOSING) != 0); - if ((ptv->flags & AFP_EMERGENCY_MODE) && (emergency_flush == 1)) { + if ((ptv->flags & AFP_EMERGENCY_MODE) && emergency_flush) { h.h2->tp_status = TP_STATUS_KERNEL; goto next_frame; } - p = PacketGetFromQueueOrAlloc(); + Packet *p = PacketGetFromQueueOrAlloc(); if (p == NULL) { - SCReturnInt(AFP_SURI_FAILURE); - } - PKT_SET_SRC(p, PKT_SRC_WIRE); - if (ptv->flags & AFP_BYPASS) { - p->BypassPacketsFlow = AFPBypassCallback; -#ifdef HAVE_PACKET_EBPF - p->afp_v.v4_map_fd = ptv->v4_map_fd; - p->afp_v.v6_map_fd = ptv->v6_map_fd; - p->afp_v.nr_cpus = ptv->ebpf_t_config.cpus_count; -#endif - } - if (ptv->flags & AFP_XDPBYPASS) { - p->BypassPacketsFlow = AFPXDPBypassCallback; -#ifdef HAVE_PACKET_EBPF - p->afp_v.v4_map_fd = ptv->v4_map_fd; - p->afp_v.v6_map_fd = ptv->v6_map_fd; - p->afp_v.nr_cpus = ptv->ebpf_t_config.cpus_count; -#endif - } - - /* Suricata will treat packet so telling it is busy, this - * status will be reset to 0 (ie TP_STATUS_KERNEL) in the release - * function. */ - h.h2->tp_status |= TP_STATUS_USER_BUSY; - - ptv->pkts++; - p->livedev = ptv->livedev; - p->datalink = ptv->datalink; - - if (h.h2->tp_len > h.h2->tp_snaplen) { - SCLogDebug("Packet length (%d) > snaplen (%d), truncating", - h.h2->tp_len, h.h2->tp_snaplen); - } - - /* get vlan id from header */ - if ((ptv->flags & AFP_VLAN_IN_HEADER) && - (h.h2->tp_status & TP_STATUS_VLAN_VALID || h.h2->tp_vlan_tci)) { - p->vlan_id[0] = h.h2->tp_vlan_tci & 0x0fff; - p->vlan_idx = 1; - } - - if (ptv->flags & AFP_ZERO_COPY) { - if (PacketSetData(p, (unsigned char*)h.raw + h.h2->tp_mac, h.h2->tp_snaplen) == -1) { - TmqhOutputPacketpool(ptv->tv, p); - SCReturnInt(AFP_SURI_FAILURE); - } else { - p->afp_v.relptr = h.raw; - p->ReleasePacket = AFPReleasePacket; - p->afp_v.mpeer = ptv->mpeer; - AFPRefSocket(ptv->mpeer); - - p->afp_v.copy_mode = ptv->copy_mode; - if (p->afp_v.copy_mode != AFP_COPY_MODE_NONE) { - p->afp_v.peer = ptv->mpeer->peer; - } else { - p->afp_v.peer = NULL; - } - } - } else { - if (PacketCopyData(p, (unsigned char*)h.raw + h.h2->tp_mac, h.h2->tp_snaplen) == -1) { - /* As we can possibly fail to copy the data due to invalid data, let's - * skip this packet and switch to the next one. - */ - h.h2->tp_status = TP_STATUS_KERNEL; - if (++ptv->frame_offset >= ptv->req.v2.tp_frame_nr) { - ptv->frame_offset = 0; - } - TmqhOutputPacketpool(ptv->tv, p); - SCReturnInt(AFP_SURI_FAILURE); - } + return AFPSuriFailure(ptv, h); } - - /* Timestamp */ - p->ts.tv_sec = h.h2->tp_sec; - p->ts.tv_usec = h.h2->tp_nsec/1000; - SCLogDebug("pktlen: %" PRIu32 " (pkt %p, pkt data %p)", - GET_PKT_LEN(p), p, GET_PKT_DATA(p)); - - /* We only check for checksum disable */ - if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) { - p->flags |= PKT_IGNORE_CHECKSUM; - } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_AUTO) { - if (ChecksumAutoModeCheck(ptv->pkts, - SC_ATOMIC_GET(ptv->livedev->pkts), - SC_ATOMIC_GET(ptv->livedev->invalid_checksums))) { - ptv->checksum_mode = CHECKSUM_VALIDATION_DISABLE; - p->flags |= PKT_IGNORE_CHECKSUM; - } - } else { - if (h.h2->tp_status & TP_STATUS_CSUMNOTREADY) { - p->flags |= PKT_IGNORE_CHECKSUM; - } - } - if (h.h2->tp_status & TP_STATUS_LOSING) { - emergency_flush = 1; - AFPDumpCounters(ptv); + if (AFPReadFromRingSetupPacket(ptv, h, tp_status, p) == false) { + TmqhOutputPacketpool(ptv->tv, p); + return AFPSuriFailure(ptv, h); } - /* release frame if not in zero copy mode */ - if (!(ptv->flags & AFP_ZERO_COPY)) { + if (!(ptv->flags & AFP_ZERO_COPY)) { h.h2->tp_status = TP_STATUS_KERNEL; } if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) { - h.h2->tp_status = TP_STATUS_KERNEL; - if (++ptv->frame_offset >= ptv->req.v2.tp_frame_nr) { - ptv->frame_offset = 0; - } - SCReturnInt(AFP_SURI_FAILURE); + return AFPSuriFailure(ptv, h); } - next_frame: if (++ptv->frame_offset >= ptv->req.v2.tp_frame_nr) { ptv->frame_offset = 0; /* Get out of loop to be sure we will reach maintenance tasks */ - SCReturnInt(AFP_READ_OK); + if (ptv->frame_offset == start_pos) + break; } } - + if (emergency_flush) { + AFPDumpCounters(ptv); + } SCReturnInt(AFP_READ_OK); } @@ -1169,21 +1187,8 @@ static inline int AFPParsePacketV3(AFPThreadVars *ptv, struct tpacket_block_desc SCReturnInt(AFP_SURI_FAILURE); } PKT_SET_SRC(p, PKT_SRC_WIRE); - if (ptv->flags & AFP_BYPASS) { - p->BypassPacketsFlow = AFPBypassCallback; -#ifdef HAVE_PACKET_EBPF - p->afp_v.v4_map_fd = ptv->v4_map_fd; - p->afp_v.v6_map_fd = ptv->v6_map_fd; - p->afp_v.nr_cpus = ptv->ebpf_t_config.cpus_count; -#endif - } else if (ptv->flags & AFP_XDPBYPASS) { - p->BypassPacketsFlow = AFPXDPBypassCallback; -#ifdef HAVE_PACKET_EBPF - p->afp_v.v4_map_fd = ptv->v4_map_fd; - p->afp_v.v6_map_fd = ptv->v6_map_fd; - p->afp_v.nr_cpus = ptv->ebpf_t_config.cpus_count; -#endif - } + + AFPReadApplyBypass(ptv, p); ptv->pkts++; p->livedev = ptv->livedev; @@ -1473,13 +1478,16 @@ static int AFPReadAndDiscardFromRing(AFPThreadVars *ptv, struct timeval *synctv, if (h.raw == NULL) { return -1; } - (*discarded_pkts)++; + if (h.h2->tp_status == TP_STATUS_KERNEL) + return 0; + if (((time_t)h.h2->tp_sec > synctv->tv_sec) || ((time_t)h.h2->tp_sec == synctv->tv_sec && (suseconds_t) (h.h2->tp_nsec / 1000) > synctv->tv_usec)) { return 1; } + (*discarded_pkts)++; h.h2->tp_status = TP_STATUS_KERNEL; if (++ptv->frame_offset >= ptv->req.v2.tp_frame_nr) { ptv->frame_offset = 0; @@ -2928,6 +2936,7 @@ TmEcode ReceiveAFPThreadInit(ThreadVars *tv, const void *initdata, void **data) ptv->tv); ptv->capture_errors = StatsRegisterCounter("capture.errors", ptv->tv); + ptv->afpacket_spin = StatsRegisterAvgCounter("afpacket.busy_loop_avg", ptv->tv); #endif ptv->copy_mode = afpconfig->copy_mode; From eed99f1e6cad0f4d6d858f60ade117d90e86473b Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Sun, 31 Oct 2021 10:28:18 +0100 Subject: [PATCH 4/9] af-packet: avoid flag colision with kernel Avoid colision of TP_STATUS_USER_BUSY with TP_STATUS_TS_RAW_HARDWARE, both were using bit 31. --- src/source-af-packet.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/source-af-packet.c b/src/source-af-packet.c index bf23f3d81ef8..cf2b53d5abaa 100644 --- a/src/source-af-packet.c +++ b/src/source-af-packet.c @@ -174,9 +174,17 @@ TmEcode NoAFPSupportExit(ThreadVars *tv, const void *initdata, void **data) #endif #ifndef TP_STATUS_USER_BUSY -/* for new use latest bit available in tp_status */ -#define TP_STATUS_USER_BUSY BIT_U32(31) +/* HACK special setting in the tp_status field for frames we are + * still working on. This can happen in autofp mode where the + * capture thread goes around the ring and finds a frame that still + * hasn't been released by a worker thread. + * + * We use bits 29, 30, 31. 29 and 31 are software and hardware + * timestamps. 30 should not be used. Combined they should never + * be set on the rx-ring together. */ +#define TP_STATUS_USER_BUSY (BIT_U32(29) | BIT_U32(30) | BIT_U32(31)) #endif +#define FRAME_BUSY(tp_status) (((tp_status)&TP_STATUS_USER_BUSY) == TP_STATUS_USER_BUSY) #ifndef TP_STATUS_VLAN_VALID #define TP_STATUS_VLAN_VALID BIT_U32(4) @@ -411,7 +419,7 @@ static void AFPCheckTpacketv2Ring(AFPThreadVars *ptv) } ready++; s = "ready"; - } else if (tp_status & TP_STATUS_USER_BUSY) { + } else if (FRAME_BUSY(tp_status)) { inprogress++; s = "in progress"; if (state == -1) @@ -1134,7 +1142,7 @@ static int AFPReadFromRing(AFPThreadVars *ptv) if (tp_status == TP_STATUS_KERNEL) break; /* if in autofp mode the frame is still busy, return to poll */ - if (tp_status & TP_STATUS_USER_BUSY) { + if (FRAME_BUSY(tp_status)) { break; } emergency_flush |= ((tp_status & TP_STATUS_LOSING) != 0); From 9b85be8ab9bd70184fcbbad4e13b4cf391b3e297 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Sun, 31 Oct 2021 22:36:29 +0100 Subject: [PATCH 5/9] af-packet: remove zero copy flag Flag was always set for tpacket v2 and v3, which meant the check for it in the packet setup paths were useless. --- src/runmode-af-packet.c | 10 +------ src/source-af-packet.c | 65 ++++++++++++++++------------------------- src/source-af-packet.h | 2 +- 3 files changed, 27 insertions(+), 50 deletions(-) diff --git a/src/runmode-af-packet.c b/src/runmode-af-packet.c index f8053ab0f9c2..20131cd591e5 100644 --- a/src/runmode-af-packet.c +++ b/src/runmode-af-packet.c @@ -690,20 +690,12 @@ static void *ParseAFPConfig(const char *iface) break; } - if (active_runmode && !strcmp("workers", active_runmode)) { - aconf->flags |= AFP_ZERO_COPY; - } else { + if (active_runmode == NULL || strcmp("workers", active_runmode) != 0) { /* If we are using copy mode we need a lock */ aconf->flags |= AFP_SOCK_PROTECT; } - /* If we are in RING mode, then we can use ZERO copy - * by using the data release mechanism */ if (aconf->flags & AFP_RING_MODE) { - aconf->flags |= AFP_ZERO_COPY; - } - - if (aconf->flags & AFP_ZERO_COPY) { SCLogConfig("%s: enabling zero copy mode by using data release call", iface); } diff --git a/src/source-af-packet.c b/src/source-af-packet.c index cf2b53d5abaa..f58be684cbd5 100644 --- a/src/source-af-packet.c +++ b/src/source-af-packet.c @@ -1038,27 +1038,22 @@ static bool AFPReadFromRingSetupPacket( p->vlan_idx = 1; } - if (ptv->flags & AFP_ZERO_COPY) { - if (PacketSetData(p, (unsigned char *)h.raw + h.h2->tp_mac, h.h2->tp_snaplen) == -1) { - return false; - } + if (PacketSetData(p, (unsigned char *)h.raw + h.h2->tp_mac, h.h2->tp_snaplen) == -1) { + return false; + } - p->afp_v.relptr = h.raw; - p->ReleasePacket = AFPReleasePacket; - p->afp_v.mpeer = ptv->mpeer; - AFPRefSocket(ptv->mpeer); + p->afp_v.relptr = h.raw; + p->ReleasePacket = AFPReleasePacket; + p->afp_v.mpeer = ptv->mpeer; + AFPRefSocket(ptv->mpeer); - p->afp_v.copy_mode = ptv->copy_mode; - if (p->afp_v.copy_mode != AFP_COPY_MODE_NONE) { - p->afp_v.peer = ptv->mpeer->peer; - } else { - p->afp_v.peer = NULL; - } + p->afp_v.copy_mode = ptv->copy_mode; + if (p->afp_v.copy_mode != AFP_COPY_MODE_NONE) { + p->afp_v.peer = ptv->mpeer->peer; } else { - if (PacketCopyData(p, (unsigned char *)h.raw + h.h2->tp_mac, h.h2->tp_snaplen) == -1) { - return false; - } + p->afp_v.peer = NULL; } + /* Timestamp */ p->ts.tv_sec = h.h2->tp_sec; p->ts.tv_usec = h.h2->tp_nsec / 1000; @@ -1160,10 +1155,6 @@ static int AFPReadFromRing(AFPThreadVars *ptv) TmqhOutputPacketpool(ptv->tv, p); return AFPSuriFailure(ptv, h); } - /* release frame if not in zero copy mode */ - if (!(ptv->flags & AFP_ZERO_COPY)) { - h.h2->tp_status = TP_STATUS_KERNEL; - } if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) { return AFPSuriFailure(ptv, h); @@ -1208,28 +1199,22 @@ static inline int AFPParsePacketV3(AFPThreadVars *ptv, struct tpacket_block_desc p->vlan_idx = 1; } - if (ptv->flags & AFP_ZERO_COPY) { - if (PacketSetData(p, (unsigned char*)ppd + ppd->tp_mac, ppd->tp_snaplen) == -1) { - TmqhOutputPacketpool(ptv->tv, p); - SCReturnInt(AFP_SURI_FAILURE); - } - p->afp_v.relptr = ppd; - p->ReleasePacket = AFPReleasePacketV3; - p->afp_v.mpeer = ptv->mpeer; - AFPRefSocket(ptv->mpeer); + if (PacketSetData(p, (unsigned char *)ppd + ppd->tp_mac, ppd->tp_snaplen) == -1) { + TmqhOutputPacketpool(ptv->tv, p); + SCReturnInt(AFP_SURI_FAILURE); + } + p->afp_v.relptr = ppd; + p->ReleasePacket = AFPReleasePacketV3; + p->afp_v.mpeer = ptv->mpeer; + AFPRefSocket(ptv->mpeer); - p->afp_v.copy_mode = ptv->copy_mode; - if (p->afp_v.copy_mode != AFP_COPY_MODE_NONE) { - p->afp_v.peer = ptv->mpeer->peer; - } else { - p->afp_v.peer = NULL; - } + p->afp_v.copy_mode = ptv->copy_mode; + if (p->afp_v.copy_mode != AFP_COPY_MODE_NONE) { + p->afp_v.peer = ptv->mpeer->peer; } else { - if (PacketCopyData(p, (unsigned char*)ppd + ppd->tp_mac, ppd->tp_snaplen) == -1) { - TmqhOutputPacketpool(ptv->tv, p); - SCReturnInt(AFP_SURI_FAILURE); - } + p->afp_v.peer = NULL; } + /* Timestamp */ p->ts.tv_sec = ppd->tp_sec; p->ts.tv_usec = ppd->tp_nsec/1000; diff --git a/src/source-af-packet.h b/src/source-af-packet.h index b3643b081db7..d599a28d626c 100644 --- a/src/source-af-packet.h +++ b/src/source-af-packet.h @@ -56,7 +56,7 @@ struct ebpf_timeout_config { /* value for flags */ #define AFP_RING_MODE (1<<0) -#define AFP_ZERO_COPY (1<<1) +// (1<<1) vacant #define AFP_SOCK_PROTECT (1<<2) #define AFP_EMERGENCY_MODE (1<<3) #define AFP_TPACKET_V3 (1<<4) From 924234bb70b5887670e18e90fdbdad3f4c71eeef Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Sun, 31 Oct 2021 21:17:41 +0100 Subject: [PATCH 6/9] af-packet: remove obsolete define --- src/source-af-packet.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/source-af-packet.h b/src/source-af-packet.h index d599a28d626c..af45312c37a8 100644 --- a/src/source-af-packet.h +++ b/src/source-af-packet.h @@ -69,7 +69,6 @@ struct ebpf_timeout_config { #define AFP_COPY_MODE_TAP 1 #define AFP_COPY_MODE_IPS 2 -#define AFP_FILE_MAX_PKTS 256 #define AFP_IFACE_NAME_LENGTH 48 /* In kernel the allocated block size is allocated using the formula From a75b66c1cb838a3c6a0609a8eba641a7aef6c4dd Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Sun, 31 Oct 2021 21:27:49 +0100 Subject: [PATCH 7/9] af-packet: remove tpacket-v1 support Ticket: #4796. V2 (for IDS and IPS) and V3 (for IDS) are widely supported. V2 was introduced in 2008, so we can safely assume that all systems can run V2+. --- src/runmode-af-packet.c | 68 +++++------- src/source-af-packet.c | 228 ++-------------------------------------- src/source-af-packet.h | 2 +- 3 files changed, 34 insertions(+), 264 deletions(-) diff --git a/src/runmode-af-packet.c b/src/runmode-af-packet.c index 20131cd591e5..cd54b72ceb5b 100644 --- a/src/runmode-af-packet.c +++ b/src/runmode-af-packet.c @@ -142,7 +142,7 @@ static void *ParseAFPConfig(const char *iface) aconf->promisc = 1; aconf->checksum_mode = CHECKSUM_VALIDATION_KERNEL; aconf->DerefFunc = AFPDerefConfig; - aconf->flags = AFP_RING_MODE; + aconf->flags = 0; aconf->bpf_filter = NULL; aconf->ebpf_lb_file = NULL; aconf->ebpf_lb_fd = -1; @@ -213,52 +213,42 @@ static void *ParseAFPConfig(const char *iface) if (ConfGetChildValueBoolWithDefault(if_root, if_default, "use-mmap", (int *)&boolval) == 1) { if (!boolval) { - SCLogConfig("Disabling mmaped capture on iface %s", - aconf->iface); - aconf->flags &= ~(AFP_RING_MODE|AFP_TPACKET_V3); + SCLogWarning(SC_WARN_OPTION_OBSOLETE, + "%s: \"use-mmap\" option is obsolete: mmap is always enabled", aconf->iface); } } - if (aconf->flags & AFP_RING_MODE) { - (void)ConfGetChildValueBoolWithDefault(if_root, if_default, - "mmap-locked", (int *)&boolval); - if (boolval) { - SCLogConfig("Enabling locked memory for mmap on iface %s", - aconf->iface); - aconf->flags |= AFP_MMAP_LOCKED; - } + (void)ConfGetChildValueBoolWithDefault(if_root, if_default, "mmap-locked", (int *)&boolval); + if (boolval) { + SCLogConfig("Enabling locked memory for mmap on iface %s", aconf->iface); + aconf->flags |= AFP_MMAP_LOCKED; + } - if (ConfGetChildValueBoolWithDefault(if_root, if_default, - "tpacket-v3", (int *)&boolval) == 1) - { - if (boolval) { - if (strcasecmp(RunmodeGetActive(), "workers") == 0) { + if (ConfGetChildValueBoolWithDefault(if_root, if_default, "tpacket-v3", (int *)&boolval) == 1) { + if (boolval) { + if (strcasecmp(RunmodeGetActive(), "workers") == 0) { #ifdef HAVE_TPACKET_V3 - SCLogConfig("Enabling tpacket v3 capture on iface %s", - aconf->iface); - aconf->flags |= AFP_TPACKET_V3; + SCLogConfig("Enabling tpacket v3 capture on iface %s", aconf->iface); + aconf->flags |= AFP_TPACKET_V3; #else - SCLogNotice("System too old for tpacket v3 switching to v2"); - aconf->flags &= ~AFP_TPACKET_V3; + SCLogNotice("System too old for tpacket v3 switching to v2"); + aconf->flags &= ~AFP_TPACKET_V3; #endif - } else { - SCLogWarning(SC_ERR_RUNMODE, - "tpacket v3 is only implemented for 'workers' runmode." - " Switching to tpacket v2."); - aconf->flags &= ~AFP_TPACKET_V3; - } } else { + SCLogWarning(SC_ERR_RUNMODE, "tpacket v3 is only implemented for 'workers' runmode." + " Switching to tpacket v2."); aconf->flags &= ~AFP_TPACKET_V3; } + } else { + aconf->flags &= ~AFP_TPACKET_V3; } + } - (void)ConfGetChildValueBoolWithDefault(if_root, if_default, - "use-emergency-flush", (int *)&boolval); - if (boolval) { - SCLogConfig("Enabling ring emergency flush on iface %s", - aconf->iface); - aconf->flags |= AFP_EMERGENCY_MODE; - } + (void)ConfGetChildValueBoolWithDefault( + if_root, if_default, "use-emergency-flush", (int *)&boolval); + if (boolval) { + SCLogConfig("Enabling ring emergency flush on iface %s", aconf->iface); + aconf->flags |= AFP_EMERGENCY_MODE; } aconf->copy_mode = AFP_COPY_MODE_NONE; @@ -266,9 +256,6 @@ static void *ParseAFPConfig(const char *iface) if (aconf->out_iface == NULL) { SCLogInfo("Copy mode activated but no destination" " iface. Disabling feature"); - } else if (!(aconf->flags & AFP_RING_MODE)) { - SCLogInfo("Copy mode activated but use-mmap " - "set to no. Disabling feature"); } else if (strlen(copymodestr) <= 0) { aconf->out_iface = NULL; } else if (strcmp(copymodestr, "ips") == 0) { @@ -695,10 +682,7 @@ static void *ParseAFPConfig(const char *iface) aconf->flags |= AFP_SOCK_PROTECT; } - if (aconf->flags & AFP_RING_MODE) { - SCLogConfig("%s: enabling zero copy mode by using data release call", iface); - } - + SCLogConfig("%s: enabling zero copy mode by using data release call", iface); return aconf; } diff --git a/src/source-af-packet.c b/src/source-af-packet.c index f58be684cbd5..fa76d5bd6df6 100644 --- a/src/source-af-packet.c +++ b/src/source-af-packet.c @@ -261,11 +261,6 @@ typedef struct AFPThreadVars_ /* IPS peer */ AFPPeer *mpeer; - /* no mmap mode */ - uint8_t *data; /** Per function and thread data */ - int datalen; /** Length of per function and thread data */ - int cooked; - /* * Init related members */ @@ -709,140 +704,6 @@ static inline void AFPDumpCounters(AFPThreadVars *ptv) #endif } -/** - * \brief AF packet read function. - * - * This function fills - * From here the packets are picked up by the DecodeAFP thread. - * - * \param user pointer to AFPThreadVars - * \retval TM_ECODE_FAILED on failure and TM_ECODE_OK on success - */ -static int AFPRead(AFPThreadVars *ptv) -{ - Packet *p = NULL; - /* XXX should try to use read that get directly to packet */ - int offset = 0; - int caplen; - struct sockaddr_ll from; - struct iovec iov; - struct msghdr msg; - struct cmsghdr *cmsg; - union { - struct cmsghdr cmsg; - char buf[CMSG_SPACE(sizeof(struct tpacket_auxdata))]; - } cmsg_buf; - unsigned char aux_checksum = 0; - - msg.msg_name = &from; - msg.msg_namelen = sizeof(from); - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - msg.msg_control = &cmsg_buf; - msg.msg_controllen = sizeof(cmsg_buf); - msg.msg_flags = 0; - - if (ptv->cooked) - offset = SLL_HEADER_LEN; - else - offset = 0; - iov.iov_len = ptv->datalen - offset; - iov.iov_base = ptv->data + offset; - - caplen = recvmsg(ptv->socket, &msg, MSG_TRUNC); - - if (caplen < 0) { - SCLogWarning(SC_ERR_AFP_READ, "recvmsg failed with error code %" PRId32, - errno); - SCReturnInt(AFP_READ_FAILURE); - } - - p = PacketGetFromQueueOrAlloc(); - if (p == NULL) { - SCReturnInt(AFP_SURI_FAILURE); - } - PKT_SET_SRC(p, PKT_SRC_WIRE); - if (ptv->flags & AFP_BYPASS) { - p->BypassPacketsFlow = AFPBypassCallback; -#ifdef HAVE_PACKET_EBPF - p->afp_v.v4_map_fd = ptv->v4_map_fd; - p->afp_v.v6_map_fd = ptv->v6_map_fd; - p->afp_v.nr_cpus = ptv->ebpf_t_config.cpus_count; -#endif - } - if (ptv->flags & AFP_XDPBYPASS) { - p->BypassPacketsFlow = AFPXDPBypassCallback; -#ifdef HAVE_PACKET_EBPF - p->afp_v.v4_map_fd = ptv->v4_map_fd; - p->afp_v.v6_map_fd = ptv->v6_map_fd; - p->afp_v.nr_cpus = ptv->ebpf_t_config.cpus_count; -#endif - } - - /* get timestamp of packet via ioctl */ - if (ioctl(ptv->socket, SIOCGSTAMP, &p->ts) == -1) { - SCLogWarning(SC_ERR_AFP_READ, "recvmsg failed with error code %" PRId32, - errno); - TmqhOutputPacketpool(ptv->tv, p); - SCReturnInt(AFP_READ_FAILURE); - } - - ptv->pkts++; - p->livedev = ptv->livedev; - - /* add forged header */ - if (ptv->cooked) { - SllHdr * hdrp = (SllHdr *)ptv->data; - /* XXX this is minimalist, but this seems enough */ - hdrp->sll_protocol = from.sll_protocol; - } - - p->datalink = ptv->datalink; - SET_PKT_LEN(p, caplen + offset); - if (PacketCopyData(p, ptv->data, GET_PKT_LEN(p)) == -1) { - TmqhOutputPacketpool(ptv->tv, p); - SCReturnInt(AFP_SURI_FAILURE); - } - SCLogDebug("pktlen: %" PRIu32 " (pkt %p, pkt data %p)", - GET_PKT_LEN(p), p, GET_PKT_DATA(p)); - - /* We only check for checksum disable */ - if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) { - p->flags |= PKT_IGNORE_CHECKSUM; - } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_AUTO) { - if (ChecksumAutoModeCheck(ptv->pkts, - SC_ATOMIC_GET(ptv->livedev->pkts), - SC_ATOMIC_GET(ptv->livedev->invalid_checksums))) { - ptv->checksum_mode = CHECKSUM_VALIDATION_DISABLE; - p->flags |= PKT_IGNORE_CHECKSUM; - } - } else { - aux_checksum = 1; - } - - /* List is NULL if we don't have activated auxiliary data */ - for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { - struct tpacket_auxdata *aux; - - if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct tpacket_auxdata)) || - cmsg->cmsg_level != SOL_PACKET || - cmsg->cmsg_type != PACKET_AUXDATA) - continue; - - aux = (struct tpacket_auxdata *)CMSG_DATA(cmsg); - - if (aux_checksum && (aux->tp_status & TP_STATUS_CSUMNOTREADY)) { - p->flags |= PKT_IGNORE_CHECKSUM; - } - break; - } - - if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) { - SCReturnInt(AFP_SURI_FAILURE); - } - SCReturnInt(AFP_READ_OK); -} - /** * \brief AF packet write function. * @@ -1394,49 +1255,6 @@ static void AFPSwitchState(AFPThreadVars *ptv, int state) } } -static int AFPReadAndDiscard(AFPThreadVars *ptv, struct timeval *synctv, - uint64_t *discarded_pkts) -{ - struct sockaddr_ll from; - struct iovec iov; - struct msghdr msg; - struct timeval ts; - union { - struct cmsghdr cmsg; - char buf[CMSG_SPACE(sizeof(struct tpacket_auxdata))]; - } cmsg_buf; - - - if (unlikely(suricata_ctl_flags != 0)) { - return 1; - } - - msg.msg_name = &from; - msg.msg_namelen = sizeof(from); - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - msg.msg_control = &cmsg_buf; - msg.msg_controllen = sizeof(cmsg_buf); - msg.msg_flags = 0; - - iov.iov_len = ptv->datalen; - iov.iov_base = ptv->data; - - (void)recvmsg(ptv->socket, &msg, MSG_TRUNC); - - if (ioctl(ptv->socket, SIOCGSTAMP, &ts) == -1) { - /* FIXME */ - return -1; - } - - if ((ts.tv_sec > synctv->tv_sec) || - (ts.tv_sec >= synctv->tv_sec && - ts.tv_usec > synctv->tv_usec)) { - return 1; - } - return 0; -} - static int AFPReadAndDiscardFromRing(AFPThreadVars *ptv, struct timeval *synctv, uint64_t *discarded_pkts) { @@ -1523,11 +1341,7 @@ static int AFPSynchronizeStart(AFPThreadVars *ptv, uint64_t *discarded_pkts) if (AFPPeersListStarted() && synctv.tv_sec == (time_t) 0xffffffff) { gettimeofday(&synctv, NULL); } - if (ptv->flags & AFP_RING_MODE) { - r = AFPReadAndDiscardFromRing(ptv, &synctv, discarded_pkts); - } else { - r = AFPReadAndDiscard(ptv, &synctv, discarded_pkts); - } + r = AFPReadAndDiscardFromRing(ptv, &synctv, discarded_pkts); SCLogDebug("Discarding on %s", ptv->tv->name); switch (r) { case 1: @@ -1597,14 +1411,10 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot) ptv->slot = s->slot_next; - if (ptv->flags & AFP_RING_MODE) { - if (ptv->flags & AFP_TPACKET_V3) { - AFPReadFunc = AFPReadFromRingV3; - } else { - AFPReadFunc = AFPReadFromRing; - } + if (ptv->flags & AFP_TPACKET_V3) { + AFPReadFunc = AFPReadFromRingV3; } else { - AFPReadFunc = AFPRead; + AFPReadFunc = AFPReadFromRing; } if (ptv->afp_state == AFP_STATE_DOWN) { @@ -2347,21 +2157,13 @@ static int AFPCreateSocket(AFPThreadVars *ptv, char *devname, int verbose) } #endif - if (ptv->flags & AFP_RING_MODE) { - ret = AFPSetupRing(ptv, devname); - if (ret != 0) - goto socket_err; - } + ret = AFPSetupRing(ptv, devname); + if (ret != 0) + goto socket_err; SCLogDebug("Using interface '%s' via socket %d", (char *)devname, ptv->socket); ptv->datalink = AFPGetDevLinktype(ptv->socket, ptv->iface); - switch (ptv->datalink) { - case ARPHRD_PPP: - case ARPHRD_ATM: - ptv->cooked = 1; - break; - } TmEcode rc = AFPSetBPFFilter(ptv); if (rc == TM_ECODE_FAILED) { @@ -2858,7 +2660,6 @@ TmEcode ReceiveAFPThreadInit(ThreadVars *tv, const void *initdata, void **data) memset(ptv, 0, sizeof(AFPThreadVars)); ptv->tv = tv; - ptv->cooked = 0; strlcpy(ptv->iface, afpconfig->iface, AFP_IFACE_NAME_LENGTH); ptv->iface[AFP_IFACE_NAME_LENGTH - 1]= '\0'; @@ -2950,16 +2751,6 @@ TmEcode ReceiveAFPThreadInit(ThreadVars *tv, const void *initdata, void **data) SCReturnInt(TM_ECODE_FAILED); } -#define T_DATA_SIZE 70000 - ptv->data = SCMalloc(T_DATA_SIZE); - if (ptv->data == NULL) { - afpconfig->DerefFunc(afpconfig); - SCFree(ptv); - SCReturnInt(TM_ECODE_FAILED); - } - ptv->datalen = T_DATA_SIZE; -#undef T_DATA_SIZE - *data = (void *)ptv; afpconfig->DerefFunc(afpconfig); @@ -3010,11 +2801,6 @@ TmEcode ReceiveAFPThreadDeinit(ThreadVars *tv, void *data) EBPFSetupXDP(ptv->iface, -1, ptv->xdp_mode); } #endif - if (ptv->data != NULL) { - SCFree(ptv->data); - ptv->data = NULL; - } - ptv->datalen = 0; ptv->bpf_filter = NULL; if ((ptv->flags & AFP_TPACKET_V3) && ptv->ring.v3) { diff --git a/src/source-af-packet.h b/src/source-af-packet.h index af45312c37a8..4065699130a8 100644 --- a/src/source-af-packet.h +++ b/src/source-af-packet.h @@ -55,7 +55,7 @@ struct ebpf_timeout_config { #endif /* value for flags */ -#define AFP_RING_MODE (1<<0) +// (1<<0) vacant // (1<<1) vacant #define AFP_SOCK_PROTECT (1<<2) #define AFP_EMERGENCY_MODE (1<<3) From f235efd034c743233862a272f23384c3b94aeb93 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Sun, 31 Oct 2021 21:34:31 +0100 Subject: [PATCH 8/9] af-packet: minor code cleanups --- src/source-af-packet.c | 36 +++++++++++++----------------------- 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/src/source-af-packet.c b/src/source-af-packet.c index fa76d5bd6df6..ea64ecb7cd83 100644 --- a/src/source-af-packet.c +++ b/src/source-af-packet.c @@ -170,7 +170,7 @@ TmEcode NoAFPSupportExit(ThreadVars *tv, const void *initdata, void **data) #define POLL_TIMEOUT 100 #ifndef TP_STATUS_TS_SOFTWARE -#define TP_STATUS_TS_SOFTWARE (1 << 29) +#define TP_STATUS_TS_SOFTWARE BIT_U32(29) #endif #ifndef TP_STATUS_USER_BUSY @@ -1107,14 +1107,11 @@ static inline int AFPParsePacketV3(AFPThreadVars *ptv, struct tpacket_block_desc static inline int AFPWalkBlock(AFPThreadVars *ptv, struct tpacket_block_desc *pbd) { - int num_pkts = pbd->hdr.bh1.num_pkts, i; - uint8_t *ppd; - int ret = 0; - - ppd = (uint8_t *)pbd + pbd->hdr.bh1.offset_to_first_pkt; - for (i = 0; i < num_pkts; ++i) { - ret = AFPParsePacketV3(ptv, pbd, - (struct tpacket3_hdr *)ppd); + const int num_pkts = pbd->hdr.bh1.num_pkts; + uint8_t *ppd = (uint8_t *)pbd + pbd->hdr.bh1.offset_to_first_pkt; + + for (int i = 0; i < num_pkts; ++i) { + int ret = AFPParsePacketV3(ptv, pbd, (struct tpacket3_hdr *)ppd); switch (ret) { case AFP_READ_OK: break; @@ -1146,9 +1143,6 @@ static inline int AFPWalkBlock(AFPThreadVars *ptv, struct tpacket_block_desc *pb static int AFPReadFromRingV3(AFPThreadVars *ptv) { #ifdef HAVE_TPACKET_V3 - struct tpacket_block_desc *pbd; - int ret = 0; - /* Loop till we have packets available */ while (1) { if (unlikely(suricata_ctl_flags != 0)) { @@ -1156,14 +1150,15 @@ static int AFPReadFromRingV3(AFPThreadVars *ptv) break; } - pbd = (struct tpacket_block_desc *) ptv->ring.v3[ptv->frame_offset].iov_base; + struct tpacket_block_desc *pbd = + (struct tpacket_block_desc *)ptv->ring.v3[ptv->frame_offset].iov_base; /* block is not ready to be read */ if ((pbd->hdr.bh1.block_status & TP_STATUS_USER) == 0) { SCReturnInt(AFP_READ_OK); } - ret = AFPWalkBlock(ptv, pbd); + int ret = AFPWalkBlock(ptv, pbd); if (unlikely(ret != AFP_READ_OK)) { AFPFlushBlock(pbd); SCReturnInt(ret); @@ -1258,8 +1253,6 @@ static void AFPSwitchState(AFPThreadVars *ptv, int state) static int AFPReadAndDiscardFromRing(AFPThreadVars *ptv, struct timeval *synctv, uint64_t *discarded_pkts) { - union thdr h; - if (unlikely(suricata_ctl_flags != 0)) { return 1; } @@ -1267,8 +1260,8 @@ static int AFPReadAndDiscardFromRing(AFPThreadVars *ptv, struct timeval *synctv, #ifdef HAVE_TPACKET_V3 if (ptv->flags & AFP_TPACKET_V3) { int ret = 0; - struct tpacket_block_desc *pbd; - pbd = (struct tpacket_block_desc *) ptv->ring.v3[ptv->frame_offset].iov_base; + struct tpacket_block_desc *pbd = + (struct tpacket_block_desc *)ptv->ring.v3[ptv->frame_offset].iov_base; *discarded_pkts += pbd->hdr.bh1.num_pkts; struct tpacket3_hdr *ppd = (struct tpacket3_hdr *)((uint8_t *)pbd + pbd->hdr.bh1.offset_to_first_pkt); @@ -1285,6 +1278,7 @@ static int AFPReadAndDiscardFromRing(AFPThreadVars *ptv, struct timeval *synctv, #endif { /* Read packet from ring */ + union thdr h; h.raw = (((union thdr **)ptv->ring.v2)[ptv->frame_offset]); if (h.raw == NULL) { return -1; @@ -1305,7 +1299,6 @@ static int AFPReadAndDiscardFromRing(AFPThreadVars *ptv, struct timeval *synctv, } } - return 0; } @@ -2850,10 +2843,7 @@ TmEcode DecodeAFP(ThreadVars *tv, Packet *p, void *data) TmEcode DecodeAFPThreadInit(ThreadVars *tv, const void *initdata, void **data) { SCEnter(); - DecodeThreadVars *dtv = NULL; - - dtv = DecodeThreadVarsAlloc(tv); - + DecodeThreadVars *dtv = DecodeThreadVarsAlloc(tv); if (dtv == NULL) SCReturnInt(TM_ECODE_FAILED); From f4cb25510c0de4bdfff4acd4c0245aa50bbbcfd6 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Sun, 31 Oct 2021 21:47:21 +0100 Subject: [PATCH 9/9] af-packet: hide all ebpf/bypass logic behind guards Leave no runtime checks for bypass/ebpf/xdp if not compiled in. --- src/source-af-packet.c | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/src/source-af-packet.c b/src/source-af-packet.c index ea64ecb7cd83..3aa1b355a0a7 100644 --- a/src/source-af-packet.c +++ b/src/source-af-packet.c @@ -211,8 +211,10 @@ union thdr { void *raw; }; +#ifdef HAVE_PACKET_EBPF static int AFPBypassCallback(Packet *p); static int AFPXDPBypassCallback(Packet *p); +#endif #define MAX_MAPS 32 /** @@ -275,8 +277,6 @@ typedef struct AFPThreadVars_ int buffer_size; /* Filter */ const char *bpf_filter; - int ebpf_lb_fd; - int ebpf_filter_fd; int promisc; @@ -302,9 +302,10 @@ typedef struct AFPThreadVars_ unsigned int ring_buflen; uint8_t *ring_buf; - uint8_t xdp_mode; - #ifdef HAVE_PACKET_EBPF + uint8_t xdp_mode; + int ebpf_lb_fd; + int ebpf_filter_fd; struct ebpf_timeout_config ebpf_t_config; #endif @@ -852,22 +853,20 @@ static inline int AFPSuriFailure(AFPThreadVars *ptv, union thdr h) static inline void AFPReadApplyBypass(const AFPThreadVars *ptv, Packet *p) { +#ifdef HAVE_PACKET_EBPF if (ptv->flags & AFP_BYPASS) { p->BypassPacketsFlow = AFPBypassCallback; -#ifdef HAVE_PACKET_EBPF p->afp_v.v4_map_fd = ptv->v4_map_fd; p->afp_v.v6_map_fd = ptv->v6_map_fd; p->afp_v.nr_cpus = ptv->ebpf_t_config.cpus_count; -#endif } if (ptv->flags & AFP_XDPBYPASS) { p->BypassPacketsFlow = AFPXDPBypassCallback; -#ifdef HAVE_PACKET_EBPF p->afp_v.v4_map_fd = ptv->v4_map_fd; p->afp_v.v6_map_fd = ptv->v6_map_fd; p->afp_v.nr_cpus = ptv->ebpf_t_config.cpus_count; -#endif } +#endif } /** \internal @@ -2315,8 +2314,6 @@ static int AFPSetFlowStorage(Packet *p, int map_fd, void *key0, void* key1, return 1; } -#endif - /** * Bypass function for AF_PACKET capture in eBPF mode * @@ -2333,7 +2330,6 @@ static int AFPSetFlowStorage(Packet *p, int map_fd, void *key0, void* key1, */ static int AFPBypassCallback(Packet *p) { -#ifdef HAVE_PACKET_EBPF SCLogDebug("Calling af_packet callback function"); /* Only bypass TCP and UDP */ if (!(PKT_IS_TCP(p) || PKT_IS_UDP(p))) { @@ -2469,7 +2465,6 @@ static int AFPBypassCallback(Packet *p) EBPFUpdateFlow(p->flow, p, NULL); return AFPSetFlowStorage(p, p->afp_v.v6_map_fd, keys[0], keys[1], AF_INET6); } -#endif return 0; } @@ -2486,7 +2481,6 @@ static int AFPBypassCallback(Packet *p) */ static int AFPXDPBypassCallback(Packet *p) { -#ifdef HAVE_PACKET_XDP SCLogDebug("Calling af_packet callback function"); /* Only bypass TCP and UDP */ if (!(PKT_IS_TCP(p) || PKT_IS_UDP(p))) { @@ -2618,14 +2612,14 @@ static int AFPXDPBypassCallback(Packet *p) } return AFPSetFlowStorage(p, p->afp_v.v6_map_fd, keys[0], keys[1], AF_INET6); } -#endif return 0; } - bool g_flowv4_ok = true; bool g_flowv6_ok = true; +#endif /* HAVE_PACKET_EBPF */ + /** * \brief Init function for ReceiveAFP. * @@ -2689,10 +2683,10 @@ TmEcode ReceiveAFPThreadInit(ThreadVars *tv, const void *initdata, void **data) if (afpconfig->bpf_filter) { ptv->bpf_filter = afpconfig->bpf_filter; } +#ifdef HAVE_PACKET_EBPF ptv->ebpf_lb_fd = afpconfig->ebpf_lb_fd; ptv->ebpf_filter_fd = afpconfig->ebpf_filter_fd; ptv->xdp_mode = afpconfig->xdp_mode; -#ifdef HAVE_PACKET_EBPF ptv->ebpf_t_config.cpus_count = UtilCpuGetNumProcessorsConfigured(); if (ptv->flags & (AFP_BYPASS|AFP_XDPBYPASS)) {