Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

forced route in multi net dev #179

Merged
merged 2 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions apps/c/httpclient/expect_info.out
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ Primary CPU 0 init OK.
Hello, Ruxos C HTTP client!
IP: [0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+
HTTP/1.1 200 OK
Date:
Content-Type: text/plain
Content-Length:
Access-Control-Allow-Origin: *
Alt-Svc: h3=":443"; ma=
Cache-Control: no-cache, no-store, must-revalidate
Date:
Content-Length:
Content-Type: text/plain; charset=utf-8

^[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+
Shutting down...
4 changes: 2 additions & 2 deletions modules/ruxfs/src/mounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ pub(crate) fn etcfs() -> VfsResult<Arc<fs::ramfs::RamFileSystem>> {
let file_resolv = etc_root.clone().lookup("resolv.conf")?;
file_resolv.write_at(
0,
b"nameserver 127.0.0.53\n\
nameserver 8.8.8.8\n\
b"nameserver 8.8.8.8\n\
nameserver 114.114.114.114\n\
options edns0 trust-ad\n\
search lan\n
",
Expand Down
2 changes: 1 addition & 1 deletion modules/ruxnet/src/smoltcp_impl/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl DnsSocket {
}
})?;
loop {
SOCKET_SET.poll_interfaces();
SOCKET_SET.poll_interfaces(None);
match SOCKET_SET.with_socket_mut::<dns::Socket, _, _>(handle, |socket| {
socket.get_query_result(query_handle).map_err(|e| match e {
GetQueryResultError::Pending => AxError::WouldBlock,
Expand Down
9 changes: 5 additions & 4 deletions modules/ruxnet/src/smoltcp_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,11 @@ impl<'a> SocketSetWrapper<'a> {
f(socket)
}

pub fn poll_interfaces(&self) {
pub fn poll_interfaces(&self, iface_name: Option<String>) {
for iface in IFACE_LIST.lock().iter() {
iface.poll(&self.0);
if iface_name.is_none() || iface_name.clone().unwrap() == iface.name() {
iface.poll(&self.0);
}
}
}

Expand Down Expand Up @@ -287,7 +289,6 @@ impl<'a> TxToken for AxNetTxToken<'a> {
let mut dev = self.0.borrow_mut();
let mut tx_buf = dev.alloc_tx_buffer(len).unwrap();
let ret = f(tx_buf.packet_mut());
trace!("SEND {} bytes: {:02X?}", len, tx_buf.packet());
dev.transmit(tx_buf).unwrap();
ret
}
Expand Down Expand Up @@ -317,7 +318,7 @@ fn snoop_tcp_packet(buf: &[u8], sockets: &mut SocketSet<'_>) -> Result<(), smolt
/// It may receive packets from the NIC and process them, and transmit queued
/// packets to the NIC.
pub fn poll_interfaces() {
SOCKET_SET.poll_interfaces();
SOCKET_SET.poll_interfaces(None);
}

/// Benchmark raw socket transmit bandwidth.
Expand Down
180 changes: 100 additions & 80 deletions modules/ruxnet/src/smoltcp_impl/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use core::cell::UnsafeCell;
use core::net::SocketAddr;
use core::sync::atomic::{AtomicBool, AtomicU8, Ordering};

use alloc::string::String;
use axerrno::{ax_err, ax_err_type, AxError, AxResult};
use axio::PollState;
use axsync::Mutex;
Expand Down Expand Up @@ -53,6 +54,7 @@ pub struct TcpSocket {
local_addr: UnsafeCell<IpEndpoint>,
peer_addr: UnsafeCell<IpEndpoint>,
nonblock: AtomicBool,
iface_name: Mutex<Option<String>>,
}

unsafe impl Sync for TcpSocket {}
Expand All @@ -66,6 +68,7 @@ impl TcpSocket {
local_addr: UnsafeCell::new(UNSPECIFIED_ENDPOINT),
peer_addr: UnsafeCell::new(UNSPECIFIED_ENDPOINT),
nonblock: AtomicBool::new(false),
iface_name: Mutex::new(None),
}
}

Expand All @@ -81,6 +84,7 @@ impl TcpSocket {
local_addr: UnsafeCell::new(local_addr),
peer_addr: UnsafeCell::new(peer_addr),
nonblock: AtomicBool::new(false),
iface_name: Mutex::new(None),
}
}

Expand Down Expand Up @@ -137,6 +141,11 @@ impl TcpSocket {
///
/// The local port is generated automatically.
pub fn connect(&self, remote_addr: SocketAddr) -> AxResult {
let iface_name = Some(match remote_addr {
SocketAddr::V4(addr) => route_dev(addr.ip().octets()),
_ => panic!("IPv6 not supported"),
});
*self.iface_name.lock() = iface_name;
self.update_state(STATE_CLOSED, STATE_CONNECTING, || {
// SAFETY: no other threads can read or write these fields.
let handle = unsafe { self.handle.get().read() }
Expand All @@ -146,13 +155,9 @@ impl TcpSocket {
let remote_endpoint = from_core_sockaddr(remote_addr);
let bound_endpoint = self.bound_endpoint()?;
let binding = IFACE_LIST.lock();
let iface_name = match remote_addr {
SocketAddr::V4(addr) => route_dev(addr.ip().octets()),
_ => panic!("IPv6 not supported"),
};
let iface = &binding
.iter()
.find(|iface| iface.name() == iface_name)
.find(|iface| iface.name() == self.iface_name.lock().clone().unwrap())
.unwrap()
.iface;
let (local_endpoint, remote_endpoint) = SOCKET_SET
Expand Down Expand Up @@ -183,24 +188,27 @@ impl TcpSocket {
})
.unwrap_or_else(|_| ax_err!(AlreadyExists, "socket connect() failed: already connected"))?; // EISCONN

self.block_on(|| {
let PollState { writable, .. } = self.poll_connect()?;
if !writable {
// When set to non_blocking, directly return inporgress
if self.is_nonblocking() {
return Err(AxError::InProgress);
}
Err(AxError::WouldBlock)
} else if self.get_state() == STATE_CONNECTED {
Ok(())
} else {
// When set to non_blocking, directly return inporgress
if self.is_nonblocking() {
return Err(AxError::InProgress);
self.block_on(
|| {
let PollState { writable, .. } = self.poll_connect()?;
if !writable {
// When set to non_blocking, directly return inporgress
if self.is_nonblocking() {
return Err(AxError::InProgress);
}
Err(AxError::WouldBlock)
} else if self.get_state() == STATE_CONNECTED {
Ok(())
} else {
// When set to non_blocking, directly return inporgress
if self.is_nonblocking() {
return Err(AxError::InProgress);
}
ax_err!(ConnectionRefused, "socket connect() failed")
}
ax_err!(ConnectionRefused, "socket connect() failed")
}
})
},
self.iface_name.lock().clone(),
)
}

/// Binds an unbound socket to the given address and port.
Expand Down Expand Up @@ -259,11 +267,14 @@ impl TcpSocket {

// SAFETY: `self.local_addr` should be initialized after `bind()`.
let local_port = unsafe { self.local_addr.get().read().port };
self.block_on(|| {
let (handle, (local_addr, peer_addr)) = LISTEN_TABLE.accept(local_port)?;
debug!("TCP socket accepted a new connection {}", peer_addr);
Ok(TcpSocket::new_connected(handle, local_addr, peer_addr))
})
self.block_on(
|| {
let (handle, (local_addr, peer_addr)) = LISTEN_TABLE.accept(local_port)?;
debug!("TCP socket accepted a new connection {}", peer_addr);
Ok(TcpSocket::new_connected(handle, local_addr, peer_addr))
},
None,
)
}

/// Close the connection.
Expand All @@ -278,7 +289,7 @@ impl TcpSocket {
socket.close();
});
unsafe { self.local_addr.get().write(UNSPECIFIED_ENDPOINT) }; // clear bound address
SOCKET_SET.poll_interfaces();
SOCKET_SET.poll_interfaces(None);
Ok(())
})
.unwrap_or(Ok(()))?;
Expand All @@ -290,7 +301,7 @@ impl TcpSocket {
let local_port = unsafe { self.local_addr.get().read().port };
unsafe { self.local_addr.get().write(UNSPECIFIED_ENDPOINT) }; // clear bound address
LISTEN_TABLE.unlisten(local_port);
SOCKET_SET.poll_interfaces();
SOCKET_SET.poll_interfaces(None);
Ok(())
})
.unwrap_or(Ok(()))?;
Expand All @@ -309,37 +320,40 @@ impl TcpSocket {

// SAFETY: `self.handle` should be initialized in a connected socket.
let handle = unsafe { self.handle.get().read().unwrap() };
self.block_on(|| {
SOCKET_SET.with_socket_mut::<tcp::Socket, _, _>(handle, |socket| {
if !socket.is_active() {
// not open
ax_err!(ConnectionRefused, "socket recv() failed")
} else if !socket.may_recv() {
// connection closed
Ok(0)
} else if socket.recv_queue() > 0 {
// data available
// TODO: use socket.recv(|buf| {...})
if flags & MSG_DONTWAIT != 0 {
self.set_nonblocking(true);
}
if flags & MSG_PEEK != 0 {
let len = socket
.peek_slice(buf)
.map_err(|_| ax_err_type!(BadState, "socket recv() failed"))?;
Ok(len)
self.block_on(
|| {
SOCKET_SET.with_socket_mut::<tcp::Socket, _, _>(handle, |socket| {
if !socket.is_active() {
// not open
ax_err!(ConnectionRefused, "socket recv() failed")
} else if !socket.may_recv() {
// connection closed
Ok(0)
} else if socket.recv_queue() > 0 {
// data available
// TODO: use socket.recv(|buf| {...})
if flags & MSG_DONTWAIT != 0 {
self.set_nonblocking(true);
}
if flags & MSG_PEEK != 0 {
let len = socket
.peek_slice(buf)
.map_err(|_| ax_err_type!(BadState, "socket recv() failed"))?;
Ok(len)
} else {
let len = socket
.recv_slice(buf)
.map_err(|_| ax_err_type!(BadState, "socket recv() failed"))?;
Ok(len)
}
} else {
let len = socket
.recv_slice(buf)
.map_err(|_| ax_err_type!(BadState, "socket recv() failed"))?;
Ok(len)
// no more data
Err(AxError::WouldBlock)
}
} else {
// no more data
Err(AxError::WouldBlock)
}
})
})
})
},
None,
)
}

/// Transmits data in the given buffer.
Expand All @@ -353,24 +367,27 @@ impl TcpSocket {

// SAFETY: `self.handle` should be initialized in a connected socket.
let handle = unsafe { self.handle.get().read().unwrap() };
self.block_on(|| {
SOCKET_SET.with_socket_mut::<tcp::Socket, _, _>(handle, |socket| {
if !socket.is_active() || !socket.may_send() {
// closed by remote
ax_err!(ConnectionReset, "socket send() failed")
} else if socket.can_send() {
// connected, and the tx buffer is not full
// TODO: use socket.send(|buf| {...})
let len = socket
.send_slice(buf)
.map_err(|_| ax_err_type!(BadState, "socket send() failed"))?;
Ok(len)
} else {
// tx buffer is full
Err(AxError::WouldBlock)
}
})
})
self.block_on(
|| {
SOCKET_SET.with_socket_mut::<tcp::Socket, _, _>(handle, |socket| {
if !socket.is_active() || !socket.may_send() {
// closed by remote
ax_err!(ConnectionReset, "socket send() failed")
} else if socket.can_send() {
// connected, and the tx buffer is not full
// TODO: use socket.send(|buf| {...})
let len = socket
.send_slice(buf)
.map_err(|_| ax_err_type!(BadState, "socket send() failed"))?;
Ok(len)
} else {
// tx buffer is full
Err(AxError::WouldBlock)
}
})
},
self.iface_name.lock().clone(),
)
}

/// Whether the socket is readable or writable.
Expand Down Expand Up @@ -517,16 +534,19 @@ impl TcpSocket {
/// If the socket is non-blocking, it calls the function once and returns
/// immediately. Otherwise, it may call the function multiple times if it
/// returns [`Err(WouldBlock)`](AxError::WouldBlock).
fn block_on<F, T>(&self, mut f: F) -> AxResult<T>
fn block_on<F, T>(&self, mut f: F, iface: Option<String>) -> AxResult<T>
where
F: FnMut() -> AxResult<T>,
{
if self.is_nonblocking() {
f()
let res = f();
SOCKET_SET.poll_interfaces(iface.clone());
res
} else {
loop {
SOCKET_SET.poll_interfaces();
match f() {
let res = f();
SOCKET_SET.poll_interfaces(iface.clone());
match res {
Ok(t) => return Ok(t),
Err(AxError::WouldBlock) => ruxtask::yield_now(),
Err(e) => return Err(e),
Expand Down
Loading
Loading