Skip to content

Commit

Permalink
move to async
Browse files Browse the repository at this point in the history
  • Loading branch information
fujiapple852 committed Jan 8, 2024
1 parent 23a9476 commit 9ecba02
Show file tree
Hide file tree
Showing 11 changed files with 225 additions and 101 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ jobs:
if: ${{ ! startsWith(matrix.build, 'windows') }}
run: sudo -E env "PATH=$PATH" cargo test --target ${{ matrix.target }} --features sim-tests --test sim -- --exact --nocapture
- name: Run simulation test on ${{ matrix.build }}
run: cargo test --target --target ${{ matrix.target }} --features sim-tests --test sim -- --exact --nocapture
if: startsWith(matrix.build, 'windows')
run: cargo test --target --target ${{ matrix.target }} --features sim-tests --test sim -- --exact --nocapture

fmt:
runs-on: ubuntu-22.04
Expand Down
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ ipnetwork = "0.20.0"
pretty_assertions = "1.4.0"
rand = "0.8.5"
test-case = "3.3.1"
serde_yaml = "0.9.30"
tun = { git = "https://github.com/ssrlive/rust-tun", features = [ "async" ] }
serde_yaml = "0.9.30"
tokio = { version = "1.35.1", features = [ "full" ] }
tokio-util = { version = "0.7.10" }

[features]
# Enable simulation integration tests
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
name: Simple IPV4/ICMP example with 9 hops, 2 of which do not respond
name: Simple IPv4/ICMP example with 9 hops, 2 of which do not respond
target: 10.0.0.109
protocol: Icmp
port_direction: None
icmp_identifier: 314
hops:
- ttl: 1
Expand Down
18 changes: 18 additions & 0 deletions tests/resources/simulation/ipv4_tcp_fixed_dest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: Simple IPv4/TCP example with a fixed dest port
target: 10.0.0.103
protocol: Tcp
port_direction: !FixedDest 80
icmp_identifier: 0
hops:
- ttl: 1
resp: !SingleHost
addr: 10.0.0.101
rtt_ms: 10
- ttl: 2
resp: !SingleHost
addr: 10.0.0.102
rtt_ms: 20
- ttl: 3
resp: !SingleHost
addr: 10.0.0.103
rtt_ms: 20
18 changes: 18 additions & 0 deletions tests/resources/simulation/ipv4_udp_fixed_src.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: Simple IPv4/UDP example with a fixed src port
target: 10.0.0.103
protocol: Udp
port_direction: !FixedSrc 5000
icmp_identifier: 0
hops:
- ttl: 1
resp: !SingleHost
addr: 10.0.0.101
rtt_ms: 10
- ttl: 2
resp: !SingleHost
addr: 10.0.0.102
rtt_ms: 20
- ttl: 3
resp: !SingleHost
addr: 10.0.0.103
rtt_ms: 20
59 changes: 40 additions & 19 deletions tests/sim/network.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,59 @@
use crate::simulation::{Response, Simulation, SingleHost};
use crate::simulation::{Protocol, Response, Simulation, SingleHost};
use crate::tun_device::TunDevice;
use parking_lot::Mutex;
use std::io::{ErrorKind, Read, Write};
use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use trippy::tracing::packet::checksum::{icmp_ipv4_checksum, ipv4_header_checksum};
use trippy::tracing::packet::icmpv4::echo_request::EchoRequestPacket;
use trippy::tracing::packet::icmpv4::time_exceeded::TimeExceededPacket;
use trippy::tracing::packet::icmpv4::{IcmpCode, IcmpType};
use trippy::tracing::packet::ipv4::Ipv4Packet;
use trippy::tracing::packet::tcp::TcpPacket;
use trippy::tracing::packet::udp::UdpPacket;
use trippy::tracing::packet::IpProtocol;

pub fn run(tun: &Arc<Mutex<TunDevice>>, sim: Arc<Simulation>) -> anyhow::Result<()> {
let mut tun = tun.lock();
pub async fn run(
tun: Arc<Mutex<TunDevice>>,
sim: Arc<Simulation>,
token: CancellationToken,
) -> anyhow::Result<()> {
let mut tun = tun.lock().await;
loop {
let mut buf = [0_u8; 4096];
let bytes_read = match tun.read(&mut buf) {
Ok(bytes) => Ok(bytes),
Err(err) if err.kind() == ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(10));
continue;
}
Err(err) => Err(err),
}?;
let bytes_read = tokio::select!(
_ = token.cancelled() => {
return Ok(())
},
bytes_read = tun.read(&mut buf) => {
bytes_read?
},
);
let ipv4 = Ipv4Packet::new_view(&buf[..bytes_read])?;
if ipv4.get_version() != 4 {
continue;
}
let echo_request = EchoRequestPacket::new_view(ipv4.payload())?;
if echo_request.get_identifier() != sim.icmp_identifier {
continue;
match (ipv4.get_protocol(), sim.protocol) {
(IpProtocol::Icmp, Protocol::Icmp) => {
let echo_request = EchoRequestPacket::new_view(ipv4.payload())?;
if echo_request.get_identifier() != sim.icmp_identifier {
continue;
}
println!("{:?}", echo_request);
}
(IpProtocol::Udp, Protocol::Udp) => {
let udp = UdpPacket::new_view(ipv4.payload())?;
println!("{:?}", udp);
}
(IpProtocol::Tcp, Protocol::Tcp) => {
let tcp = TcpPacket::new_view(ipv4.payload())?;
println!("{:?}", tcp);
}
_ => {
continue;
}
}

// if the ttl is greater than the largest ttl in our sim we will reply as the last node in the sim
let index = std::cmp::min(usize::from(ipv4.get_ttl()) - 1, sim.hops.len() - 1);
let reply_addr = match sim.hops[index].resp {
Expand All @@ -58,7 +79,7 @@ pub fn run(tun: &Arc<Mutex<TunDevice>>, sim: Arc<Simulation>) -> anyhow::Result<
ipv4.get_source(),
te_packet.packet(),
)?;
tun.write_all(ipv4_packet.packet())?;
tun.write(ipv4_packet.packet()).await?;
}
}

Expand Down
49 changes: 44 additions & 5 deletions tests/sim/simulation.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,50 @@
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use std::net::IpAddr;
use trippy::tracing::Port;

#[derive(Copy, Clone, Debug, Deserialize)]
pub enum Protocol {
Icmp,
Udp,
Tcp,
}

impl From<Protocol> for trippy::tracing::Protocol {
fn from(value: Protocol) -> Self {
match value {
Protocol::Icmp => Self::Icmp,
Protocol::Udp => Self::Udp,
Protocol::Tcp => Self::Tcp,
}
}
}

#[derive(Copy, Clone, Debug, Deserialize)]
pub enum PortDirection {
None,
FixedSrc(u16),
FixedDest(u16),
FixedBoth(u16, u16),
}

impl From<PortDirection> for trippy::tracing::PortDirection {
fn from(value: PortDirection) -> Self {
match value {
PortDirection::None => Self::None,
PortDirection::FixedSrc(src) => Self::FixedSrc(Port(src)),
PortDirection::FixedDest(dest) => Self::FixedDest(Port(dest)),
PortDirection::FixedBoth(src, dest) => Self::FixedBoth(Port(src), Port(dest)),
}
}
}

/// A simulated trace.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Deserialize)]
pub struct Simulation {
pub name: String,
pub target: IpAddr,
pub protocol: Protocol,
pub port_direction: PortDirection,
pub icmp_identifier: u16,
pub hops: Vec<Hop>,
}
Expand All @@ -18,7 +57,7 @@ impl Simulation {
}

/// A simulated hop.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Deserialize)]
pub struct Hop {
/// The simulated time-to-live (TTL).
pub ttl: u8,
Expand All @@ -27,7 +66,7 @@ pub struct Hop {
}

/// A simulated probe response.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Deserialize)]
pub enum Response {
/// Simulate a hop which does not response to probes.
NoResponse,
Expand All @@ -36,7 +75,7 @@ pub enum Response {
}

/// A simulated probe response with a single addr and fixed ttl.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Deserialize)]
pub struct SingleHost {
/// The simulated host responding to the probe.
pub addr: IpAddr,
Expand Down
65 changes: 50 additions & 15 deletions tests/sim/tests.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,60 @@
use crate::simulation::Simulation;
use crate::tun_device::{tun, tun_lock};
use crate::tun_device::tun;
use crate::{network, tracer};
use std::sync::Arc;
use std::thread;
use std::sync::{Arc, Mutex, OnceLock};
use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;

static RUNTIME: OnceLock<Arc<Mutex<Runtime>>> = OnceLock::new();

pub fn tokio() -> &'static Arc<Mutex<Runtime>> {
RUNTIME.get_or_init(|| {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
Arc::new(Mutex::new(runtime))
})
}

macro_rules! sim {
($path:expr) => {{
println!("simulating {}", $path);
let yaml = include_str!(concat!("../resources/simulation/", $path));
serde_yaml::from_str(yaml)?
}};
}

#[test]
fn test_simulations_icmp() -> anyhow::Result<()> {
let runtime = tokio().lock().unwrap();
let sim = sim!("ipv4_icmp.yaml");
runtime.block_on(run_simulation(sim))?;
Ok(())
}

#[test]
fn test_simulation() -> anyhow::Result<()> {
let sim = serde_yaml::from_str(include_str!(
"../resources/simulation/ipv4_icmp_simple.yaml"
))?;
run_test(sim)
fn test_simulations_udp() -> anyhow::Result<()> {
let runtime = tokio().lock().unwrap();
let sim = sim!("ipv4_udp_fixed_src.yaml");
runtime.block_on(run_simulation(sim))?;
Ok(())
}

#[test]
fn test_simulations_tcp() -> anyhow::Result<()> {
let runtime = tokio().lock().unwrap();
let sim = sim!("ipv4_tcp_fixed_dest.yaml");
runtime.block_on(run_simulation(sim))?;
Ok(())
}

fn run_test(simulation: Simulation) -> anyhow::Result<()> {
let _lock = tun_lock().lock();
async fn run_simulation(simulation: Simulation) -> anyhow::Result<()> {
let tun = tun();
let sim = Arc::new(simulation);
let _handle = {
let sim = sim.clone();
thread::spawn(move || network::run(tun, sim).unwrap())
};
tracer::Tracer::new(sim).trace()?;
let token = CancellationToken::new();
let handle = tokio::spawn(network::run(tun.clone(), sim.clone(), token.clone()));
tokio::task::spawn_blocking(move || tracer::Tracer::new(sim, token).trace()).await??;
handle.await??;
Ok(())
}
12 changes: 9 additions & 3 deletions tests/sim/tracer.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
use crate::simulation::{Response, Simulation, SingleHost};
use std::num::NonZeroUsize;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use trippy::tracing::{
Builder, CompletionReason, MaxRounds, ProbeStatus, TimeToLive, TraceId, TracerRound,
Builder, CompletionReason, MaxRounds, PortDirection, ProbeStatus, Protocol, TimeToLive,
TraceId, TracerRound,
};

pub struct Tracer {
sim: Arc<Simulation>,
pub token: CancellationToken,
}

impl Tracer {
pub fn new(sim: Arc<Simulation>) -> Self {
Self { sim }
pub fn new(sim: Arc<Simulation>, token: CancellationToken) -> Self {
Self { sim, token }
}

pub fn trace(&self) -> anyhow::Result<()> {
Builder::new(self.sim.target, |round| self.validate_round(round))
.trace_identifier(TraceId(self.sim.icmp_identifier))
.protocol(Protocol::from(self.sim.protocol))
.port_direction(PortDirection::from(self.sim.port_direction))
.max_rounds(MaxRounds(NonZeroUsize::MIN))
.start()?;
self.token.cancel();
Ok(())
}

Expand Down
Loading

0 comments on commit 9ecba02

Please sign in to comment.