Skip to content

Commit

Permalink
Preliminary code for multi-server rpc over ethernet
Browse files Browse the repository at this point in the history
  • Loading branch information
zmckevitt committed Nov 22, 2023
1 parent 8f301ef commit 22da29b
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 24 deletions.
65 changes: 46 additions & 19 deletions kernel/src/arch/x86_64/rackscale/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,63 @@ pub(crate) const CONTROLLER_PORT_BASE: u16 = 6970;
static ClientReadyCount: AtomicU64 = AtomicU64::new(0);
static DCMServerReady: AtomicBool = AtomicBool::new(false);

// Used for port allocation ranges for rpc servers
const MAX_CORES_PER_CLIENT: u16 = 24;

/// Controller main method
pub(crate) fn run() {
let mid = *crate::environment::CORE_ID;

let transports =
create_shmem_transport(mid.try_into().unwrap()).expect("Failed to create shmem transport");

// TODO: still dependent on NUM_SHMEM_TRANSPORTS, ensure it works for eth too
let mut servers: ArrayVec<Server<'_>, { NUM_SHMEM_TRANSPORTS as usize }> = ArrayVec::new();
for transport in transports.into_iter() {
let mut server = Server::new(Box::new(transport));

if crate::CMDLINE
.get()
.map_or(false, |c| c.transport == Transport::Ethernet)
{
let transport = Box::new(
TCPTransport::new(
None,
CONTROLLER_PORT_BASE + (mid as u16 - 1) * MAX_CORES_PER_CLIENT,
Arc::clone(&ETHERNET_IFACE),
)
.expect("Failed to create TCP transport"),
);
let mut server = Server::new(transport);
register_rpcs(&mut server);
servers.push(server);
}
ClientReadyCount.fetch_add(1, Ordering::SeqCst);
while !DCMServerReady.load(Ordering::SeqCst) {}
servers[0]
.add_client(&CLIENT_REGISTRAR)
.expect("Failed to accept client");
} else if crate::CMDLINE
.get()
.map_or(false, |c| c.transport == Transport::Shmem)
{
let transports = create_shmem_transport(mid.try_into().unwrap())
.expect("Failed to create shmem transport");

// let mut servers: ArrayVec<Server<'_>, { NUM_SHMEM_TRANSPORTS as usize }> = ArrayVec::new();
for transport in transports.into_iter() {
let mut server = Server::new(Box::new(transport));
register_rpcs(&mut server);
servers.push(server);
}

ClientReadyCount.fetch_add(1, Ordering::SeqCst);

/*
// Wait for all clients to connect before fulfilling any RPCs.
while !DCMServerReady.load(Ordering::SeqCst) {}

for s_index in 0..servers.len() {
servers[s_index]
.add_client(&CLIENT_REGISTRAR)
.expect("Failed to accept client");
}
} else {
unreachable!("No supported transport layer specified in kernel argument");
};
*/

ClientReadyCount.fetch_add(1, Ordering::SeqCst);

// Wait for all clients to connect before fulfilling any RPCs.
while !DCMServerReady.load(Ordering::SeqCst) {}

for s_index in 0..servers.len() {
servers[s_index]
.add_client(&CLIENT_REGISTRAR)
.expect("Failed to accept client");
}

ClientReadyCount.fetch_add(1, Ordering::SeqCst);

Expand Down
6 changes: 5 additions & 1 deletion kernel/src/arch/x86_64/rackscale/dcm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ unsafe_abomonate!(DCMOps);

lazy_static! {
pub(crate) static ref DCM_CLIENT: Arc<Mutex<Client>> = Arc::new(Mutex::new(
init_ethernet_rpc(IpAddress::v4(172, 31, 0, 20), DCM_CLIENT_PORT, false).unwrap(),
init_ethernet_rpc(IpAddress::v4(172, 31, 0, 20), DCM_CLIENT_PORT, 1)
.unwrap()
.into_iter()
.nth(0)
.unwrap(),
));
}
18 changes: 14 additions & 4 deletions kernel/src/transport/ethernet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,28 @@ pub(crate) fn init_network<'a>() -> KResult<Arc<Mutex<Interface<'a, DevQueuePhy>
#[allow(unused)]
pub(crate) fn init_ethernet_rpc(
server_ip: smoltcp::wire::IpAddress,
server_port: u16,
send_client_data: bool, // This field is used to indicate if init_client() should send ClientRegistrationRequest
) -> KResult<rpc::client::Client> {
server_port_base: u16,
num_cores: u16,
) -> KResult<Vec<rpc::client::Client>> {
use crate::arch::rackscale::registration::initialize_client;
use alloc::boxed::Box;
use rpc::client::Client;
use rpc::transport::TCPTransport;

let mut clients: Vec<rpc::client::Client> = Vec::new();

let offset = 0;
let server_port = server_port_base + offset;

let rpc_transport = Box::new(
TCPTransport::new(Some(server_ip), server_port, Arc::clone(&ETHERNET_IFACE))
.map_err(|err| KError::RackscaleRPCError { err })?,
);
let mut client = Client::new(rpc_transport);
initialize_client(client, send_client_data, false)

// DCM sets send_client_data (second param) false
client = initialize_client(client, false, false).expect("Failed to initialize client");
clients.push(client);

Ok(clients)
}

0 comments on commit 22da29b

Please sign in to comment.