Skip to content

Commit

Permalink
Add ASN cardinality to packet metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed May 29, 2023
1 parent 601d651 commit 64a79fc
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 46 deletions.
2 changes: 1 addition & 1 deletion src/config/providers/k8s/agones.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl GameServer {
value
.split(',')
.map(String::from)
.map(base64::decode)
.map(crate::utils::base64_decode)
.filter_map(Result::ok)
.collect()
})
Expand Down
2 changes: 1 addition & 1 deletion src/maxmind_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl std::ops::DerefMut for MaxmindDb {
}
}

#[derive(Debug, serde::Deserialize)]
#[derive(Clone, Debug, serde::Deserialize)]
pub struct IpNetEntry {
#[serde(default)]
pub allocation: String,
Expand Down
78 changes: 64 additions & 14 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

use once_cell::sync::Lazy;
use prometheus::{
core::Collector, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, Opts,
Registry, DEFAULT_BUCKETS,
core::Collector, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge,
IntGaugeVec, Opts, Registry, DEFAULT_BUCKETS,
};

use crate::maxmind_db::IpNetEntry;

pub use prometheus::Result;

/// "event" is used as a label for Metrics that can apply to both Filter
Expand All @@ -28,6 +30,8 @@ pub const DIRECTION_LABEL: &str = "event";

pub(crate) const READ: Direction = Direction::Read;
pub(crate) const WRITE: Direction = Direction::Write;
pub(crate) const ASN_LABEL: &str = "asn";
pub(crate) const PREFIX_LABEL: &str = "ip_prefix";

/// Label value for [DIRECTION_LABEL] for `read` events
pub const READ_DIRECTION_LABEL: &str = "read";
Expand Down Expand Up @@ -89,68 +93,114 @@ pub(crate) fn processing_time(direction: Direction) -> Histogram {
PROCESSING_TIME.with_label_values(&[direction.label()])
}

pub(crate) fn bytes_total(direction: Direction) -> IntCounter {
pub(crate) fn bytes_total(direction: Direction, asn: Option<&IpNetEntry>) -> IntCounter {
static BYTES_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus::register_int_counter_vec_with_registry! {
prometheus::opts! {
"bytes_total",
"total number of bytes",
},
&[Direction::LABEL],
&[Direction::LABEL, ASN_LABEL, PREFIX_LABEL],
registry(),
}
.unwrap()
});

BYTES_TOTAL.with_label_values(&[direction.label()])
BYTES_TOTAL.with_label_values(&[
direction.label(),
&asn.map(|asn| asn.r#as.to_string()).unwrap_or_default(),
asn.map(|asn| &*asn.prefix).unwrap_or_default(),
])
}

pub(crate) fn errors_total(direction: Direction, display: &str) -> IntCounter {
pub(crate) fn errors_total(
direction: Direction,
display: &str,
asn: Option<&IpNetEntry>,
) -> IntCounter {
static ERRORS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus::register_int_counter_vec_with_registry! {
prometheus::opts! {
"errors_total",
"total number of errors sending packets",
},
&[Direction::LABEL, "display"],
&[Direction::LABEL, "display", ASN_LABEL, PREFIX_LABEL],
registry(),
}
.unwrap()
});

ERRORS_TOTAL.with_label_values(&[
direction.label(),
display,
&asn.map(|asn| asn.r#as.to_string()).unwrap_or_default(),
asn.map(|asn| &*asn.prefix).unwrap_or_default(),
])
}

pub(crate) fn packet_jitter(direction: Direction, asn: Option<&IpNetEntry>) -> IntGauge {
static PACKET_JITTER: Lazy<IntGaugeVec> = Lazy::new(|| {
prometheus::register_int_gauge_vec_with_registry! {
prometheus::opts! {
"packet_jitter",
"The time between new packets",
},
&[Direction::LABEL, ASN_LABEL, PREFIX_LABEL],
registry(),
}
.unwrap()
});

ERRORS_TOTAL.with_label_values(&[direction.label(), display])
PACKET_JITTER.with_label_values(&[
direction.label(),
&asn.map(|asn| asn.r#as.to_string()).unwrap_or_default(),
asn.map(|asn| &*asn.prefix).unwrap_or_default(),
])
}

pub(crate) fn packets_total(direction: Direction) -> IntCounter {
pub(crate) fn packets_total(direction: Direction, asn: Option<&IpNetEntry>) -> IntCounter {
static PACKETS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus::register_int_counter_vec_with_registry! {
prometheus::opts! {
"packets_total",
"Total number of packets",
},
&[Direction::LABEL],
&[Direction::LABEL, ASN_LABEL, PREFIX_LABEL],
registry(),
}
.unwrap()
});

PACKETS_TOTAL.with_label_values(&[direction.label()])
PACKETS_TOTAL.with_label_values(&[
direction.label(),
&asn.map(|asn| asn.r#as.to_string()).unwrap_or_default(),
asn.map(|asn| &*asn.prefix).unwrap_or_default(),
])
}

pub(crate) fn packets_dropped_total(direction: Direction, source: &str) -> IntCounter {
pub(crate) fn packets_dropped_total(
direction: Direction,
source: &str,
asn: Option<&IpNetEntry>,
) -> IntCounter {
static PACKETS_DROPPED: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus::register_int_counter_vec_with_registry! {
prometheus::opts! {
"packets_dropped_total",
"Total number of dropped packets",
},
&[Direction::LABEL, "source"],
&[Direction::LABEL, "source", ASN_LABEL, PREFIX_LABEL],
registry(),
}
.unwrap()
});

PACKETS_DROPPED.with_label_values(&[direction.label(), source])
PACKETS_DROPPED.with_label_values(&[
direction.label(),
source,
&asn.map(|asn| asn.r#as.to_string()).unwrap_or_default(),
asn.map(|asn| &*asn.prefix).unwrap_or_default(),
])
}

/// Create a generic metrics options.
Expand Down
39 changes: 31 additions & 8 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ pub use sessions::{Session, SessionArgs, SessionKey, SessionMap};
/// Packet received from local port
#[derive(Debug)]
struct DownstreamPacket {
source: EndpointAddress,
received_at: i64,
asn_info: Option<crate::maxmind_db::IpNetEntry>,
contents: Vec<u8>,
received_at: i64,
source: std::net::SocketAddr,
}

/// Represents the required arguments to run a worker task that
Expand All @@ -62,20 +63,31 @@ impl DownstreamReceiveWorkerConfig {
// Initialize a buffer for the UDP packet. We use the maximum size of a UDP
// packet, which is the maximum value of 16 a bit integer.
let mut buf = vec![0; 1 << 16];
let mut last_received_at = None;
loop {
tracing::debug!(
id = worker_id,
addr = ?socket.local_addr(),
"Awaiting packet"
);

match socket.recv_from(&mut buf).await {
Ok((size, source)) => {
let packet = DownstreamPacket {
received_at: chrono::Utc::now().timestamp_nanos(),
source: source.into(),
contents: buf[..size].to_vec(),
asn_info: crate::MaxmindDb::lookup(source.ip()),
};

if let Some(last_received_at) = last_received_at {
crate::metrics::packet_jitter(
crate::metrics::READ,
packet.asn_info.as_ref(),
)
.set(packet.received_at - last_received_at);
}
last_received_at = Some(packet.received_at);
Self::spawn_process_task(
packet, source, worker_id, &socket, &config, &sessions,
)
Expand Down Expand Up @@ -114,17 +126,25 @@ impl DownstreamReceiveWorkerConfig {
async move {
let timer = crate::metrics::processing_time(crate::metrics::READ).start_timer();

let asn_info = packet.asn_info.clone();
let asn_info = asn_info.as_ref();
match Self::process_downstream_received_packet(packet, config, socket, sessions)
.await
{
Ok(size) => {
crate::metrics::packets_total(crate::metrics::READ).inc();
crate::metrics::bytes_total(crate::metrics::READ).inc_by(size as u64);
crate::metrics::packets_total(crate::metrics::READ, asn_info).inc();
crate::metrics::bytes_total(crate::metrics::READ, asn_info)
.inc_by(size as u64);
}
Err(error) => {
let source = error.to_string();
crate::metrics::errors_total(crate::metrics::READ, &source).inc();
crate::metrics::packets_dropped_total(crate::metrics::READ, &source).inc();
crate::metrics::errors_total(crate::metrics::READ, &source, asn_info).inc();
crate::metrics::packets_dropped_total(
crate::metrics::READ,
&source,
asn_info,
)
.inc();
}
}

Expand All @@ -150,7 +170,7 @@ impl DownstreamReceiveWorkerConfig {
}

let filters = config.filters.load();
let mut context = ReadContext::new(endpoints, packet.source, packet.contents);
let mut context = ReadContext::new(endpoints, packet.source.into(), packet.contents);
filters.read(&mut context).await?;
let mut bytes_written = 0;

Expand All @@ -162,6 +182,7 @@ impl DownstreamReceiveWorkerConfig {
&downstream_socket,
&config,
&sessions,
packet.asn_info.clone(),
)
.await?;
}
Expand All @@ -181,7 +202,7 @@ impl DownstreamReceiveWorkerConfig {
} => downstream_socket
.send_to(
&Protocol::ping_reply(nonce, client_timestamp, packet.received_at).encode(),
packet.source.to_socket_addr().await?,
packet.source,
)
.await
.map_err(From::from),
Expand All @@ -201,6 +222,7 @@ impl DownstreamReceiveWorkerConfig {
downstream_socket: &Arc<UdpSocket>,
config: &Arc<Config>,
sessions: &SessionMap,
asn_info: Option<crate::maxmind_db::IpNetEntry>,
) -> Result<usize, PipelineError> {
let session_key = SessionKey {
source: recv_addr.clone(),
Expand All @@ -215,6 +237,7 @@ impl DownstreamReceiveWorkerConfig {
source: session_key.source.clone(),
downstream_socket: downstream_socket.clone(),
dest: endpoint.clone(),
asn_info,
};

let session = session_args.into_session().await?;
Expand Down
37 changes: 21 additions & 16 deletions src/proxy/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use tokio::{net::UdpSocket, select, sync::watch, time::Instant};
use crate::{
endpoint::{Endpoint, EndpointAddress},
filters::{Filter, WriteContext},
maxmind_db::IpNetEntry,
utils::Loggable,
};

Expand All @@ -42,7 +43,7 @@ pub struct Session {
/// a channel to broadcast on if we are shutting down this Session
shutdown_tx: watch::Sender<()>,
/// The ASN information.
asn_info: Option<crate::maxmind_db::IpNetEntry>,
asn_info: Option<IpNetEntry>,
}

// A (source, destination) address pair that uniquely identifies a session.
Expand Down Expand Up @@ -72,6 +73,7 @@ pub struct SessionArgs {
pub source: EndpointAddress,
pub downstream_socket: Arc<UdpSocket>,
pub dest: Endpoint,
pub asn_info: Option<IpNetEntry>,
}

impl SessionArgs {
Expand All @@ -93,16 +95,14 @@ impl Session {
.await?;
let (shutdown_tx, shutdown_rx) = watch::channel::<()>(());

let ip = args.source.to_socket_addr().await.unwrap().ip();
let asn_info = crate::MaxmindDb::lookup(ip);
let s = Session {
config: args.config.clone(),
upstream_socket,
source: args.source.clone(),
dest: args.dest,
created_at: Instant::now(),
shutdown_tx,
asn_info,
asn_info: args.asn_info,
};

tracing::debug!(source = %s.source, dest = ?s.dest, "Session created");
Expand All @@ -120,22 +120,31 @@ impl Session {
let config = self.config.clone();
let endpoint = self.dest.clone();
let upstream_socket = self.upstream_socket.clone();
let asn_info = self.asn_info.clone();

tokio::spawn(async move {
let mut buf: Vec<u8> = vec![0; 65535];
let mut last_received_at = None;
loop {
tracing::debug!(source = %source, dest = ?endpoint, "Awaiting incoming packet");
let asn_info = asn_info.as_ref();

select! {
received = upstream_socket.recv_from(&mut buf) => {
match received {
Err(error) => {
crate::metrics::errors_total(crate::metrics::WRITE, &error.to_string()).inc();
crate::metrics::errors_total(crate::metrics::WRITE, &error.to_string(), asn_info).inc();
tracing::error!(%error, %source, dest = ?endpoint, "Error receiving packet");
},
Ok((size, recv_addr)) => {
crate::metrics::bytes_total(crate::metrics::WRITE).inc_by(size as u64);
crate::metrics::packets_total(crate::metrics::WRITE).inc();
let received_at = chrono::Utc::now().timestamp_nanos();
if let Some(last_received_at) = last_received_at {
crate::metrics::packet_jitter(crate::metrics::WRITE, asn_info).set(received_at - last_received_at);
}
last_received_at = Some(received_at);

crate::metrics::packets_total(crate::metrics::WRITE, asn_info).inc();
crate::metrics::bytes_total(crate::metrics::WRITE, asn_info).inc_by(size as u64);

let timer = crate::metrics::processing_time(crate::metrics::WRITE).start_timer();
let result = Session::process_recv_packet(
Expand All @@ -153,9 +162,10 @@ impl Session {
let label = format!("proxy::Session::process_recv_packet: {error}");
crate::metrics::packets_dropped_total(
crate::metrics::WRITE,
&label
&label,
asn_info
).inc();
crate::metrics::errors_total(crate::metrics::WRITE, &label).inc();
crate::metrics::errors_total(crate::metrics::WRITE, &label, asn_info).inc();
}
}
};
Expand All @@ -170,13 +180,7 @@ impl Session {
}

fn active_session_metric(&self) -> prometheus::IntGauge {
let (asn_number, ip_prefix) = self
.asn_info
.as_ref()
.map(|asn| (asn.r#as, &*asn.prefix))
.unwrap_or_else(|| (<_>::default(), <_>::default()));

metrics::active_sessions(asn_number as u16, ip_prefix)
metrics::active_sessions(self.asn_info.as_ref())
}

/// process_recv_packet processes a packet that is received by this session.
Expand Down Expand Up @@ -290,6 +294,7 @@ mod tests {
source: addr.clone(),
downstream_socket: socket.clone(),
dest: endpoint,
asn_info: None,
})
.await
.unwrap();
Expand Down
Loading

0 comments on commit 64a79fc

Please sign in to comment.