Skip to content

Commit

Permalink
feat: listen multiple UDP addresses (#175)
Browse files Browse the repository at this point in the history
* feat: bind multi UDP addresses

* fix clippy warns
  • Loading branch information
giangndm authored Jul 22, 2024
1 parent 2cbfec4 commit fa3102d
Show file tree
Hide file tree
Showing 33 changed files with 407 additions and 323 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ clap = { version = "4.4", features = ["derive", "env"] }
mockall = "0.12.1"
num_enum = "0.7.2"
convert-enum = "0.1.0"
sans-io-runtime = { version = "0.1.0", default-features = false }
sans-io-runtime = { version = "0.2", default-features = false }
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ cargo build --release
Running first seed node as a network structure collector:

```bash
cargo run -- --collector --local-tags demo --connect-tags demo --node-id 1 --bind-addr 127.0.0.1:10001 --web-addr 0.0.0.0:3000
cargo run -- --collector --local-tags demo --connect-tags demo --node-id 1 --udp-port 10001 --web-addr 0.0.0.0:3000
```

Running second nodes and join to network with seed node (you need to replace with seed node IP if it running on another device):

```bash
cargo run -- --local-tags demo --connect-tags mode --seeds 1@/ip4/127.0.0.1/udp/10001 --node-id 2 --bind-addr 127.0.0.1:10002
cargo run -- --local-tags demo --connect-tags mode --seeds 1@/ip4/127.0.0.1/udp/10001 --node-id 2 --udp-port 10002
```

Same with this, we can run more nodes and connect to the network. Remember change node-id and port for not conflict with other nodes.
Expand Down
1 change: 1 addition & 0 deletions bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ rust-embed = { version = "8.2", optional = true }
futures-util = "0.3"
tracing-subscriber = "0.3"
serde_json = "1.0"
local-ip-address = "0.6"

[features]
default = ["embed"]
Expand Down
18 changes: 15 additions & 3 deletions bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ struct Args {
node_id: NodeId,

/// Listen address
#[arg(env, short, long, default_value = "127.0.0.1:10000")]
bind_addr: SocketAddr,
#[arg(env, short, long, default_value_t = 10000)]
udp_port: u16,

/// Address of node we should connect to
#[arg(env, short, long)]
Expand Down Expand Up @@ -248,7 +248,19 @@ async fn main() {
let mut shutdown_wait = 0;
let args = Args::parse();
tracing_subscriber::fmt::init();
let mut builder = SdnBuilder::<(), SC, SE, TC, TW, VisualNodeInfo>::new(args.node_id, &[args.bind_addr], args.custom_addrs);
let addrs = local_ip_address::list_afinet_netifas()
.expect("Should have list interfaces")
.into_iter()
.filter(|(_, ip)| {
if ip.is_unspecified() || ip.is_multicast() {
false
} else {
std::net::UdpSocket::bind(SocketAddr::new(*ip, 0)).is_ok()
}
})
.map(|(_name, ip)| SocketAddr::new(ip, args.udp_port))
.collect::<Vec<_>>();
let mut builder = SdnBuilder::<(), SC, SE, TC, TW, VisualNodeInfo>::new(args.node_id, &addrs, args.custom_addrs);

builder.set_authorization(StaticKeyAuthorization::new(&args.password));
builder.set_manual_discovery(args.local_tags, args.connect_tags);
Expand Down
2 changes: 1 addition & 1 deletion bin/start_agent.sh
Original file line number Diff line number Diff line change
@@ -1 +1 @@
cargo run -- --local-tags vpn --connect-tags vpn --seeds $1 --node-id $2 --bind-addr $3
cargo run -- --local-tags vpn --connect-tags vpn --seeds $1 --node-id $2 --udp-port $3
4 changes: 2 additions & 2 deletions bin/start_collector.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# If provided $3, it will be seeds
if [ -n "$4" ]; then
# $4 is defined
cargo run -- --collector --local-tags vpn --connect-tags vpn --node-id $1 --bind-addr $2 --web-addr $3 --seeds $4
cargo run -- --collector --local-tags vpn --connect-tags vpn --node-id $1 --udp-port $2 --web-addr $3 --seeds $4
else
# $4 is not defined
cargo run -- --collector --local-tags vpn --connect-tags vpn --node-id $1 --bind-addr $2 --web-addr $3
cargo run -- --collector --local-tags vpn --connect-tags vpn --node-id $1 --udp-port $2 --web-addr $3
fi
12 changes: 6 additions & 6 deletions packages/network/src/base/feature.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::net::SocketAddr;

use atm0s_sdn_identity::{ConnId, NodeAddr, NodeId};
use atm0s_sdn_router::{shadow::ShadowRouter, RouteRule};
use sans_io_runtime::TaskSwitcherChild;

use crate::data_plane::NetPair;

use super::{Buffer, ConnectionCtx, ConnectionEvent, ServiceId, TransportMsgHeader, Ttl};

#[derive(Debug, Default, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -172,8 +172,8 @@ pub enum FeatureWorkerOutput<UserData, Control, Event, ToController> {
SendRoute(RouteRule, NetOutgoingMeta, Buffer),
RawDirect(ConnId, Buffer),
RawBroadcast(Vec<ConnId>, Buffer),
RawDirect2(SocketAddr, Buffer),
RawBroadcast2(Vec<SocketAddr>, Buffer),
RawDirect2(NetPair, Buffer),
RawBroadcast2(Vec<NetPair>, Buffer),
#[cfg(feature = "vpn")]
TunPkt(Buffer),
}
Expand Down Expand Up @@ -206,12 +206,12 @@ impl<UserData, Control, Event, ToController> FeatureWorkerOutput<UserData, Contr

pub struct FeatureWorkerContext {
pub node_id: NodeId,
pub router: ShadowRouter<SocketAddr>,
pub router: ShadowRouter<NetPair>,
}

pub trait FeatureWorker<UserData, SdkControl, SdkEvent, ToController, ToWorker>: TaskSwitcherChild<FeatureWorkerOutput<UserData, SdkControl, SdkEvent, ToController>> {
fn on_tick(&mut self, _ctx: &mut FeatureWorkerContext, _now: u64, _tick_count: u64) {}
fn on_network_raw(&mut self, ctx: &mut FeatureWorkerContext, now: u64, conn: ConnId, _remote: SocketAddr, header: TransportMsgHeader, mut buf: Buffer) {
fn on_network_raw(&mut self, ctx: &mut FeatureWorkerContext, now: u64, conn: ConnId, _pair: NetPair, header: TransportMsgHeader, mut buf: Buffer) {
let header_len = header.serialize_size();
buf.move_front_right(header_len).expect("Buffer should bigger or equal header");
self.on_input(ctx, now, FeatureWorkerInput::Network(conn, (&header).into(), buf));
Expand Down
6 changes: 3 additions & 3 deletions packages/network/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ mod msg;
mod secure;
mod service;

use std::net::SocketAddr;

use atm0s_sdn_identity::{ConnId, NodeId};
pub use control::*;
pub use feature::*;
Expand All @@ -14,11 +12,13 @@ pub use sans_io_runtime::Buffer;
pub use secure::*;
pub use service::*;

use crate::data_plane::NetPair;

#[derive(Debug, Clone)]
pub struct ConnectionCtx {
pub conn: ConnId,
pub node: NodeId,
pub remote: SocketAddr,
pub pair: NetPair,
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
16 changes: 10 additions & 6 deletions packages/network/src/controller_plane.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::VecDeque, fmt::Debug, hash::Hash, sync::Arc};
use std::{collections::VecDeque, fmt::Debug, hash::Hash, net::SocketAddr, sync::Arc};

use atm0s_sdn_identity::NodeId;
use atm0s_sdn_router::shadow::ShadowRouterHistory;
Expand Down Expand Up @@ -46,6 +46,7 @@ enum TaskType {

pub struct ControllerPlaneCfg<UserData, SC, SE, TC, TW> {
pub session: u64,
pub bind_addrs: Vec<SocketAddr>,
#[allow(clippy::type_complexity)]
pub services: Vec<Arc<dyn ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>>>,
pub authorization: Arc<dyn Authorization>,
Expand Down Expand Up @@ -89,7 +90,10 @@ where
tick_count: 0,
feature_ctx: FeatureContext { node_id, session: cfg.session },
service_ctx: ServiceCtx { node_id, session: cfg.session },
neighbours: TaskSwitcherBranch::new(NeighboursManager::new(node_id, cfg.authorization, cfg.handshake_builder, cfg.random), TaskType::Neighbours),
neighbours: TaskSwitcherBranch::new(
NeighboursManager::new(node_id, cfg.bind_addrs, cfg.authorization, cfg.handshake_builder, cfg.random),
TaskType::Neighbours,
),
features: TaskSwitcherBranch::new(FeatureManager::new(node_id, cfg.session, service_ids), TaskType::Feature),
services: TaskSwitcherBranch::new(ServiceManager::new(cfg.services), TaskType::Service),
switcher: TaskSwitcher::new(3), //3 types: Neighbours, Feature, Service
Expand Down Expand Up @@ -132,8 +136,8 @@ where
.input(&mut self.switcher)
.on_input(&self.service_ctx, now_ms, service, ServiceInput::Control(ServiceControlActor::Controller(userdata), control));
}
Input::Control(LogicControl::NetNeighbour(remote, control)) => {
self.neighbours.input(&mut self.switcher).on_input(now_ms, neighbours::Input::Control(remote, control));
Input::Control(LogicControl::NetNeighbour(pair, control)) => {
self.neighbours.input(&mut self.switcher).on_input(now_ms, neighbours::Input::Control(pair, control));
}
Input::Control(LogicControl::Feature(to)) => {
self.features
Expand Down Expand Up @@ -188,7 +192,7 @@ where
.input(&mut self.switcher)
.on_shared_input(&self.service_ctx, now_ms, ServiceSharedInput::Connection(event.clone()));
match event {
ConnectionEvent::Connected(ctx, secure) => self.queue.push_back(Output::Event(LogicEvent::Pin(ctx.conn, ctx.node, ctx.remote, secure))),
ConnectionEvent::Connected(ctx, secure) => self.queue.push_back(Output::Event(LogicEvent::Pin(ctx.conn, ctx.node, ctx.pair, secure))),
ConnectionEvent::Stats(_ctx, _stats) => {}
ConnectionEvent::Disconnected(ctx) => self.queue.push_back(Output::Event(LogicEvent::UnPin(ctx.conn))),
}
Expand All @@ -214,7 +218,7 @@ where
FeatureOutput::SendDirect(conn, meta, buf) => {
log::debug!("[ControllerPlane] SendDirect to conn: {:?}, len: {}", conn, buf.len());
let conn_ctx = return_if_none!(self.neighbours.conn(conn));
self.queue.push_back(Output::Event(LogicEvent::NetDirect(feature, conn_ctx.remote, conn, meta, buf)))
self.queue.push_back(Output::Event(LogicEvent::NetDirect(feature, conn_ctx.pair, conn, meta, buf)))
}
FeatureOutput::SendRoute(rule, ttl, buf) => {
log::debug!("[ControllerPlane] SendRoute to rule: {:?}, len: {}", rule, buf.len());
Expand Down
36 changes: 24 additions & 12 deletions packages/network/src/controller_plane/neighbours.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::{
use atm0s_sdn_identity::{ConnId, NodeAddr, NodeId, Protocol};
use sans_io_runtime::TaskSwitcherChild;

use crate::base::{self, Authorization, ConnectionCtx, HandshakeBuilder, NeighboursControl, NeighboursControlCmds, SecureContext};
use crate::{
base::{self, Authorization, ConnectionCtx, HandshakeBuilder, NeighboursControl, NeighboursControlCmds, SecureContext},
data_plane::NetPair,
};

use self::connection::{ConnectionEvent, NeighbourConnection};

Expand All @@ -16,19 +19,20 @@ mod connection;
pub enum Input {
ConnectTo(NodeAddr),
DisconnectFrom(NodeId),
Control(SocketAddr, NeighboursControl),
Control(NetPair, NeighboursControl),
ShutdownRequest,
}

pub enum Output {
Control(SocketAddr, NeighboursControl),
Control(NetPair, NeighboursControl),
Event(base::ConnectionEvent),
ShutdownResponse,
}

pub struct NeighboursManager {
node_id: NodeId,
connections: HashMap<SocketAddr, NeighbourConnection>,
bind_addrs: Vec<SocketAddr>,
connections: HashMap<NetPair, NeighbourConnection>,
neighbours: HashMap<ConnId, ConnectionCtx>,
queue: VecDeque<Output>,
shutdown: bool,
Expand All @@ -38,9 +42,10 @@ pub struct NeighboursManager {
}

impl NeighboursManager {
pub fn new(node_id: NodeId, authorization: Arc<dyn Authorization>, handshake_builder: Arc<dyn HandshakeBuilder>, random: Box<dyn rand::RngCore>) -> Self {
pub fn new(node_id: NodeId, bind_addrs: Vec<SocketAddr>, authorization: Arc<dyn Authorization>, handshake_builder: Arc<dyn HandshakeBuilder>, random: Box<dyn rand::RngCore>) -> Self {
Self {
node_id,
bind_addrs,
connections: HashMap::new(),
neighbours: HashMap::new(),
queue: VecDeque::new(),
Expand All @@ -66,14 +71,21 @@ impl NeighboursManager {
Input::ConnectTo(addr) => {
let dest_node = addr.node_id();
let dests = get_node_addr_dests(addr);
for remote in dests {
if self.connections.contains_key(&remote) {
continue;
for local in &self.bind_addrs {
for remote in &dests {
if local.is_ipv4() != remote.is_ipv4() {
continue;
}

let pair = NetPair::new(*local, *remote);
if self.connections.contains_key(&pair) {
continue;
}
log::info!("[Neighbours] Sending connect request from {local} to {remote}, dest_node {dest_node}");
let session_id = self.random.next_u64();
let conn = NeighbourConnection::new_outgoing(self.handshake_builder.clone(), self.node_id, dest_node, session_id, pair, now_ms);
self.connections.insert(pair, conn);
}
log::info!("[Neighbours] Sending connect request to {}, dest_node {}", remote, dest_node);
let session_id = self.random.next_u64();
let conn = NeighbourConnection::new_outgoing(self.handshake_builder.clone(), self.node_id, dest_node, session_id, remote, now_ms);
self.connections.insert(remote, conn);
}
}
Input::DisconnectFrom(node) => {
Expand Down
Loading

0 comments on commit fa3102d

Please sign in to comment.