Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minor release 3.2.1 #101

Merged
merged 19 commits into from
Jun 13, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
issue #100 add tcp reassembly error tracking
  • Loading branch information
weyrick committed May 26, 2021
commit 9fbf73f4a48bc3ca316f984ba116483f6864ca54
21 changes: 21 additions & 0 deletions src/handlers/net/NetStreamHandler.cpp
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@ void NetStreamHandler::start()
_pkt_connection = _stream->packet_signal.connect(&NetStreamHandler::process_packet_cb, this);
_start_tstamp_connection = _stream->start_tstamp_signal.connect(&NetStreamHandler::set_start_tstamp, this);
_end_tstamp_connection = _stream->end_tstamp_signal.connect(&NetStreamHandler::set_end_tstamp, this);
_tcp_reassembly_errors_connection = _stream->tcp_reassembly_error_signal.connect(&NetStreamHandler::process_tcp_reassembly_error, this);

_running = true;
}
@@ -52,6 +53,7 @@ void NetStreamHandler::stop()
_pkt_connection.disconnect();
_start_tstamp_connection.disconnect();
_end_tstamp_connection.disconnect();
_tcp_reassembly_errors_connection.disconnect();

_running = false;
}
@@ -65,6 +67,10 @@ void NetStreamHandler::process_packet_cb(pcpp::Packet &payload, PacketDirection
{
_metrics->process_packet(payload, dir, l3, l4, stamp);
}
void NetStreamHandler::process_tcp_reassembly_error(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, timespec stamp)
{
_metrics->process_tcp_reassembly_error(payload, dir, l3, stamp);
}

void NetStreamHandler::window_json(json &j, uint64_t period, bool merged)
{
@@ -109,6 +115,7 @@ void NetworkMetricsBucket::specialized_merge(const AbstractMetricsBucket &o)

_counters.UDP += other._counters.UDP;
_counters.TCP += other._counters.TCP;
_counters.TCP_reassembly_errors += other._counters.TCP_reassembly_errors;
_counters.OtherL4 += other._counters.OtherL4;
_counters.IPv4 += other._counters.IPv4;
_counters.IPv6 += other._counters.IPv6;
@@ -140,6 +147,7 @@ void NetworkMetricsBucket::to_prometheus(std::stringstream &out) const

_counters.UDP.to_prometheus(out);
_counters.TCP.to_prometheus(out);
_counters.TCP_reassembly_errors.to_prometheus(out);
_counters.OtherL4.to_prometheus(out);
_counters.IPv4.to_prometheus(out);
_counters.IPv6.to_prometheus(out);
@@ -173,6 +181,7 @@ void NetworkMetricsBucket::to_json(json &j) const

_counters.UDP.to_json(j);
_counters.TCP.to_json(j);
_counters.TCP_reassembly_errors.to_json(j);
_counters.OtherL4.to_json(j);
_counters.IPv4.to_json(j);
_counters.IPv6.to_json(j);
@@ -297,6 +306,12 @@ void NetworkMetricsBucket::process_packet(bool deep, pcpp::Packet &payload, Pack
}
}
}
void NetworkMetricsBucket::process_tcp_reassembly_error([[maybe_unused]] bool deep, [[maybe_unused]] pcpp::Packet &payload, [[maybe_unused]] PacketDirection dir, [[maybe_unused]] pcpp::ProtocolType l3)
{

std::unique_lock lock(_mutex);
++_counters.TCP_reassembly_errors;
}

// the general metrics manager entry point
void NetworkMetricsManager::process_packet(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, timespec stamp)
@@ -306,5 +321,11 @@ void NetworkMetricsManager::process_packet(pcpp::Packet &payload, PacketDirectio
// process in the "live" bucket
live_bucket()->process_packet(_deep_sampling_now, payload, dir, l3, l4);
}
void NetworkMetricsManager::process_tcp_reassembly_error(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, [[maybe_unused]] timespec stamp)
{
// note we do not call base event since that should be called at the packet level above, and would duplicate
// process in the "live" bucket
live_bucket()->process_tcp_reassembly_error(_deep_sampling_now, payload, dir, l3);
}

}
8 changes: 8 additions & 0 deletions src/handlers/net/NetStreamHandler.h
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@ class NetworkMetricsBucket final : public visor::AbstractMetricsBucket
struct counters {
Counter UDP;
Counter TCP;
Counter TCP_reassembly_errors;
Counter OtherL4;
Counter IPv4;
Counter IPv6;
@@ -40,6 +41,7 @@ class NetworkMetricsBucket final : public visor::AbstractMetricsBucket
counters()
: UDP("packets", {"udp"}, "Count of UDP packets")
, TCP("packets", {"tcp"}, "Count of TCP packets")
, TCP_reassembly_errors("packets", {"tcp_reassembly_errors"}, "Count of TCP reassembly errors")
, OtherL4("packets", {"other_l4"}, "Count of packets which are not UDP or TCP")
, IPv4("packets", {"ipv4"}, "Count of IPv4 packets")
, IPv6("packets", {"ipv6"}, "Count of IPv6 packets")
@@ -90,6 +92,8 @@ class NetworkMetricsBucket final : public visor::AbstractMetricsBucket
}

void process_packet(bool deep, pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4);
void process_tcp_reassembly_error(bool deep, pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3);

};

class NetworkMetricsManager final : public visor::AbstractMetricsManager<NetworkMetricsBucket>
@@ -101,6 +105,8 @@ class NetworkMetricsManager final : public visor::AbstractMetricsManager<Network
}

void process_packet(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, timespec stamp);
void process_tcp_reassembly_error(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, timespec stamp);

};

class NetStreamHandler final : public visor::StreamMetricsHandler<NetworkMetricsManager>
@@ -111,10 +117,12 @@ class NetStreamHandler final : public visor::StreamMetricsHandler<NetworkMetrics
sigslot::connection _pkt_connection;
sigslot::connection _start_tstamp_connection;
sigslot::connection _end_tstamp_connection;
sigslot::connection _tcp_reassembly_errors_connection;

void process_packet_cb(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, timespec stamp);
void set_start_tstamp(timespec stamp);
void set_end_tstamp(timespec stamp);
void process_tcp_reassembly_error(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, timespec stamp);

public:
NetStreamHandler(const std::string &name, PcapInputStream *stream, uint periods, uint deepSampleRate);
3 changes: 3 additions & 0 deletions src/handlers/net/tests/test_net_layer.cpp
Original file line number Diff line number Diff line change
@@ -59,6 +59,7 @@ TEST_CASE("Parse net (dns) TCP IPv4 tests", "[pcap][ipv4][tcp][net]")
CHECK(net_handler.metrics()->start_tstamp().tv_nsec == 56403000);
CHECK(event_data.num_events->value() == 2100);
CHECK(counters.TCP.value() == 2100);
CHECK(counters.TCP_reassembly_errors.value() == 0);
CHECK(counters.IPv4.value() == 2100);
CHECK(counters.IPv6.value() == 0);
}
@@ -109,6 +110,7 @@ TEST_CASE("Parse net (dns) TCP IPv6 tests", "[pcap][ipv6][tcp][net]")
CHECK(net_handler.metrics()->start_tstamp().tv_nsec == 958184000);
CHECK(event_data.num_events->value() == 1800);
CHECK(counters.TCP.value() == 1800);
CHECK(counters.TCP_reassembly_errors.value() == 0);
CHECK(counters.IPv4.value() == 0);
CHECK(counters.IPv6.value() == 1800);
}
@@ -139,6 +141,7 @@ TEST_CASE("Parse net (dns) random UDP/TCP tests", "[pcap][net]")
CHECK(event_data.num_events->value() == 16147);
CHECK(event_data.num_samples->value() == 16147);
CHECK(counters.TCP.value() == 13176);
CHECK(counters.TCP_reassembly_errors.value() == 0);
CHECK(counters.UDP.value() == 2971);
CHECK(counters.IPv4.value() == 16147);
CHECK(counters.IPv6.value() == 0);
10 changes: 10 additions & 0 deletions src/handlers/net/tests/window-schema.json
Original file line number Diff line number Diff line change
@@ -289,6 +289,16 @@
13176
]
},
"tcp_reassembly_errors": {
"$id": "#/properties/5m/properties/packets/properties/tcp_reassembly_errors",
"type": "integer",
"title": "The tcp_reassembly_errors schema",
"description": "An explanation about the purpose of this instance.",
"default": 0,
"examples": [
13176
]
},
"top_ASN": {
"$id": "#/properties/5m/properties/packets/properties/top_ASN",
"type": "array",
40 changes: 25 additions & 15 deletions src/inputs/pcap/PcapInputStream.cpp
Original file line number Diff line number Diff line change
@@ -3,19 +3,19 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */

#include "PcapInputStream.h"
#include <timer.hpp>
#include <pcap.h>
#include <timer.hpp>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wold-style-cast"
#pragma GCC diagnostic ignored "-Wunused-parameter"
#pragma clang diagnostic ignored "-Wc99-extensions"
#pragma GCC diagnostic ignored "-Wpedantic"
#include <IPv4Layer.h>
#include <IPv6Layer.h>
#include <Logger.h>
#include <PacketUtils.h>
#include <PcapFileDevice.h>
#include <SystemUtils.h>
#include <Logger.h>
#pragma GCC diagnostic pop
#include <Corrade/Utility/Debug.h>
#include <IpUtils.h>
@@ -55,7 +55,7 @@ static void _packet_arrives_cb(pcpp::RawPacket *rawPacket, [[maybe_unused]] pcpp
stream->process_raw_packet(rawPacket);
}

static void _pcap_stats_update([[maybe_unused]] pcpp::IPcapDevice::PcapStats& stats, [[maybe_unused]] void *cookie)
static void _pcap_stats_update([[maybe_unused]] pcpp::IPcapDevice::PcapStats &stats, [[maybe_unused]] void *cookie)
{
// auto stream = static_cast<PcapInputStream *>(cookie);
// TODO expose this
@@ -70,6 +70,7 @@ PcapInputStream::PcapInputStream(const std::string &name)
_tcp_connection_end_cb,
{true, 5, 500, 50})
{
pcpp::LoggerPP::getInstance().suppressErrors();
}

PcapInputStream::~PcapInputStream()
@@ -111,15 +112,13 @@ void PcapInputStream::start()
auto req_source = config_get<std::string>("pcap_source");
if (req_source == "libpcap") {
_cur_pcap_source = PcapSource::libpcap;
}
else if (req_source == "af_packet") {
} else if (req_source == "af_packet") {
#ifndef __linux__
throw PcapException("af_packet is only available on linux");
#else
_cur_pcap_source = PcapSource::af_packet;
#endif
}
else {
} else {
throw PcapException("unknown pcap source");
}
}
@@ -143,15 +142,13 @@ void PcapInputStream::start()
}
_get_hosts_from_libpcap_iface();
_open_libpcap_iface(config_get<std::string>("bpf"));
}
else if (_cur_pcap_source == PcapSource::af_packet) {
} else if (_cur_pcap_source == PcapSource::af_packet) {
#ifndef __linux__
assert(true);
#else
_open_af_packet_iface(TARGET, config_get<std::string>("bpf"));
#endif
}
else {
} else {
assert(true);
}

@@ -245,7 +242,20 @@ void PcapInputStream::process_raw_packet(pcpp::RawPacket *rawPacket)
if (l4 == pcpp::UDP) {
udp_signal(packet, dir, l3, pcpp::hash5Tuple(&packet), rawPacket->getPacketTimeStamp());
} else if (l4 == pcpp::TCP) {
_tcp_reassembly.reassemblePacket(rawPacket);
auto result = _tcp_reassembly.reassemblePacket(packet);
switch (result) {
case pcpp::TcpReassembly::Error_PacketDoesNotMatchFlow:
case pcpp::TcpReassembly::NonTcpPacket:
case pcpp::TcpReassembly::NonIpPacket:
tcp_reassembly_error_signal(packet, dir, l3, rawPacket->getPacketTimeStamp());
case pcpp::TcpReassembly::TcpMessageHandled:
case pcpp::TcpReassembly::OutOfOrderTcpMessageBuffered:
case pcpp::TcpReassembly::FIN_RSTWithNoData:
case pcpp::TcpReassembly::Ignore_PacketWithNoData:
case pcpp::TcpReassembly::Ignore_PacketOfClosedFlow:
case pcpp::TcpReassembly::Ignore_Retransimission:
break;
}
} else {
// unsupported layer3 protocol
}
@@ -304,11 +314,11 @@ void PcapInputStream::_open_pcap(const std::string &fileName, const std::string
}

#ifdef __linux__
void PcapInputStream::_open_af_packet_iface(const std::string &iface, const std::string &bpfFilter) {
void PcapInputStream::_open_af_packet_iface(const std::string &iface, const std::string &bpfFilter)
{

_af_device = std::make_unique<AFPacket>(this, _packet_arrives_cb, bpfFilter, iface);
_af_device->start_capture();

}
#endif

@@ -387,7 +397,7 @@ void PcapInputStream::_get_hosts_from_libpcap_iface()
}
}

void PcapInputStream::info_json(json& j) const
void PcapInputStream::info_json(json &j) const
{
_common_info_json(j);
json info;
3 changes: 2 additions & 1 deletion src/inputs/pcap/PcapInputStream.h
Original file line number Diff line number Diff line change
@@ -82,7 +82,7 @@ class PcapInputStream : public visor::InputStream
void info_json(json &j) const override;
size_t consumer_count() const override
{
return packet_signal.slot_count() + udp_signal.slot_count() + start_tstamp_signal.slot_count() + tcp_message_ready_signal.slot_count() + tcp_connection_start_signal.slot_count() + tcp_connection_end_signal.slot_count();
return packet_signal.slot_count() + udp_signal.slot_count() + start_tstamp_signal.slot_count() + tcp_message_ready_signal.slot_count() + tcp_connection_start_signal.slot_count() + tcp_connection_end_signal.slot_count() + tcp_reassembly_error_signal.slot_count();
}

// utilities
@@ -104,6 +104,7 @@ class PcapInputStream : public visor::InputStream
mutable sigslot::signal<int8_t, const pcpp::TcpStreamData &> tcp_message_ready_signal;
mutable sigslot::signal<const pcpp::ConnectionData &> tcp_connection_start_signal;
mutable sigslot::signal<const pcpp::ConnectionData &, pcpp::TcpReassembly::ConnectionEndReason> tcp_connection_end_signal;
mutable sigslot::signal<pcpp::Packet &, PacketDirection, pcpp::ProtocolType, timespec> tcp_reassembly_error_signal;
};

}