Skip to content

Commit

Permalink
One shmem transport per client core
Browse files Browse the repository at this point in the history
  • Loading branch information
hunhoffe committed Nov 16, 2023
1 parent e293f23 commit 2ab2732
Show file tree
Hide file tree
Showing 20 changed files with 207 additions and 126 deletions.
16 changes: 13 additions & 3 deletions kernel/src/arch/x86_64/rackscale/client_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ use crate::error::{KError, KResult};
use crate::memory::backends::MemManager;
use crate::memory::shmem_affinity::{local_shmem_affinity, mid_to_shmem_affinity};
use crate::process::MAX_PROCESSES;
use crate::transport::shmem::NUM_SHMEM_TRANSPORTS;

/// This is the state the client records about itself
pub(crate) struct ClientState {
/// The RPC client used to communicate with the controller
pub(crate) rpc_client: Arc<Mutex<Client>>,
pub(crate) rpc_clients: Arc<ArrayVec<Mutex<Client>, { NUM_SHMEM_TRANSPORTS as usize }>>,

/// Used to store shmem affinity base pages
pub(crate) affinity_base_pages: Arc<ArrayVec<Mutex<Box<dyn MemManager + Send>>, MAX_MACHINES>>,
Expand All @@ -41,7 +42,9 @@ pub(crate) struct ClientState {
impl ClientState {
pub(crate) fn new() -> ClientState {
// Create network stack and instantiate RPC Client
let rpc_client = if crate::CMDLINE
// TODO(rackscale, hack): only allow shmem for now
/*
let rpc_clients = if crate::CMDLINE
.get()
.map_or(false, |c| c.transport == Transport::Ethernet)
{
Expand All @@ -60,6 +63,13 @@ impl ClientState {
.expect("Failed to initialize shmem RPC"),
))
};
*/
let clients =
crate::transport::shmem::init_shmem_rpc(true).expect("Failed to initialize shmem RPC");
let mut rpc_clients = ArrayVec::new();
for client in clients.into_iter() {
rpc_clients.push(Mutex::new(client));
}

let mut per_process_base_pages = ArrayVec::new();
for _i in 0..MAX_PROCESSES {
Expand All @@ -76,7 +86,7 @@ impl ClientState {

log::debug!("Finished initializing client state");
ClientState {
rpc_client,
rpc_clients: Arc::new(rpc_clients),
affinity_base_pages: Arc::new(affinity_base_pages),
per_process_base_pages: Arc::new(per_process_base_pages),
}
Expand Down
34 changes: 23 additions & 11 deletions kernel/src/arch/x86_64/rackscale/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::arch::rackscale::dcm::{
use crate::arch::MAX_MACHINES;
use crate::cmdline::Transport;
use crate::transport::ethernet::ETHERNET_IFACE;
use crate::transport::shmem::create_shmem_transport;
use crate::transport::shmem::{create_shmem_transport, NUM_SHMEM_TRANSPORTS};

use super::*;

Expand All @@ -32,6 +32,8 @@ pub(crate) fn run() {
let mid = *crate::environment::CORE_ID;

// Initialize one server per controller thread
// TODO(rackscale, hack): only support shmem for now
/*
let mut server = if crate::CMDLINE
.get()
.map_or(false, |c| c.transport == Transport::Ethernet)
Expand All @@ -51,26 +53,34 @@ pub(crate) fn run() {
.get()
.map_or(false, |c| c.transport == Transport::Shmem)
{
let transport = Box::new(
create_shmem_transport(mid.try_into().unwrap())
.expect("Failed to create shmem transport"),
);
*/
let transports =
create_shmem_transport(mid.try_into().unwrap()).expect("Failed to create shmem transport");

let mut server = Server::new(transport);
let mut servers: ArrayVec<Server<'_>, { NUM_SHMEM_TRANSPORTS as usize }> = ArrayVec::new();
for transport in transports.into_iter() {
let mut server = Server::new(Box::new(transport));
register_rpcs(&mut server);
server
servers.push(server);
}

/*
} else {
unreachable!("No supported transport layer specified in kernel argument");
};
*/

ClientReadyCount.fetch_add(1, Ordering::SeqCst);

// Wait for all clients to connect before fulfilling any RPCs.
while !DCMServerReady.load(Ordering::SeqCst) {}

server
// TODO(rackscale, hack): only register core 0
//for s_index in 0..servers.len() {
servers[0]
.add_client(&CLIENT_REGISTRAR)
.expect("Failed to accept client");
//}

ClientReadyCount.fetch_add(1, Ordering::SeqCst);

Expand Down Expand Up @@ -114,9 +124,11 @@ pub(crate) fn run() {
// Start running the RPC server
log::info!("Starting RPC server for client {:?}!", mid);
loop {
let _handled = server
.try_handle()
.expect("Controller failed to handle RPC");
for s_index in 0..servers.len() {
let _handled = servers[s_index]
.try_handle()
.expect("Controller failed to handle RPC");
}
}
}

Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ pub(crate) fn rpc_close(pid: usize, fd: FileDescriptor) -> KResult<(u64, u64)> {
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];

// Call Close() RPC
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::Close as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::Close as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;

// Decode and return result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ pub(crate) fn rpc_delete(pid: usize, pathname: String) -> KResult<(u64, u64)> {
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];

// Call RPC
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::Delete as RPCType,
&[&req_data, &pathname.as_bytes()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::Delete as RPCType,
&[&req_data, &pathname.as_bytes()],
&mut [&mut res_data],
)?;

// Decode result - return result if decoding successful
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/getinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ pub(crate) fn rpc_getinfo<P: AsRef<[u8]> + Debug>(pid: usize, name: P) -> KResul

// Construct result buffer and call RPC
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::GetInfo as RPCType,
&[&req_data, name.as_ref()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::GetInfo as RPCType,
&[&req_data, name.as_ref()],
&mut [&mut res_data],
)?;

// Decode and return the result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/mkdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ pub(crate) fn rpc_mkdir<P: AsRef<[u8]> + Debug>(
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];

// Call RPC
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::MkDir as RPCType,
&[&req_data, pathname.as_ref()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::MkDir as RPCType,
&[&req_data, pathname.as_ref()],
&mut [&mut res_data],
)?;

// Parse and return result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ fn rpc_open_create<P: AsRef<[u8]> + Debug>(
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];

// Call the RPC
CLIENT_STATE.rpc_client.lock().call(
rpc_type,
&[&req_data, pathname.as_ref()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
rpc_type,
&[&req_data, pathname.as_ref()],
&mut [&mut res_data],
)?;

// Decode and return the result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ pub(crate) fn rpc_rename<P: AsRef<[u8]> + Debug>(
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];

// Call the RPC
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::FileRename as RPCType,
&[&req_data, oldname.as_ref(), newname.as_ref()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::FileRename as RPCType,
&[&req_data, oldname.as_ref(), newname.as_ref()],
&mut [&mut res_data],
)?;

// Parse and return the result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
15 changes: 8 additions & 7 deletions kernel/src/arch/x86_64/rackscale/fileops/rw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ pub(crate) fn rpc_writeat(
} else {
KernelRpc::WriteAt as RPCType
};
CLIENT_STATE
.rpc_client
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(rpc_type, &[&req_data, &data], &mut [&mut res_data])?;

Expand Down Expand Up @@ -129,11 +128,13 @@ pub(crate) fn rpc_readat(
KernelRpc::ReadAt as RPCType
};

CLIENT_STATE.rpc_client.lock().call(
KernelRpc::ReadAt as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::ReadAt as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;

// Decode result, if successful, return result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
9 changes: 6 additions & 3 deletions kernel/src/arch/x86_64/rackscale/get_shmem_frames.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ unsafe_abomonate!(ShmemRegion: base, affinity);
// This isn't truly a syscall
pub(crate) fn rpc_get_shmem_frames(pid: Option<Pid>, num_frames: usize) -> KResult<Vec<Frame>> {
assert!(num_frames > 0);
log::debug!("GetShmemFrames({:?})", num_frames);
log::debug!(
"GetShmemFrames({:?}) core={:?}",
num_frames,
kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)
);

let mid = if pid.is_none() {
Some(*crate::environment::MACHINE_ID)
Expand All @@ -66,8 +70,7 @@ pub(crate) fn rpc_get_shmem_frames(pid: Option<Pid>, num_frames: usize) -> KResu
for i in 0..max_res_size {
res_data.push(0u8);
}
CLIENT_STATE
.rpc_client
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::GetShmemFrames as RPCType,
Expand Down
3 changes: 1 addition & 2 deletions kernel/src/arch/x86_64/rackscale/get_shmem_structure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ pub(crate) fn rpc_get_shmem_structure(

// Make buffer max size of MAX_PROCESS (for NrProcLogs), 1 (for NrLog)
let mut res_data = [0u8; core::mem::size_of::<[u64; MAX_PROCESSES]>()];
CLIENT_STATE
.rpc_client
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::GetShmemStructure as RPCType,
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/processops/allocate_physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ pub(crate) fn rpc_allocate_physical(pid: Pid, size: u64, affinity: u64) -> KResu

// Create result buffer
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::AllocatePhysical as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::AllocatePhysical as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;

// Decode result, return result if decoded successfully
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/processops/print.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ pub(crate) fn rpc_log(msg: String) -> KResult<(u64, u64)> {

// Construct result buffer and call RPC
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::Log as RPCType,
&[&req_data, print_str.as_ref()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::Log as RPCType,
&[&req_data, print_str.as_ref()],
&mut [&mut res_data],
)?;

// Decode and return the result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/processops/release_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ pub(crate) fn rpc_release_core(pid: Pid, gtid: ThreadId) -> KResult<(u64, u64)>

// Create result buffer
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::ReleaseCore as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::ReleaseCore as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;

// Decode result, return result if decoded successfully
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/processops/release_physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ pub(crate) fn rpc_release_physical(pid: Pid, frame_id: u64) -> KResult<(u64, u64

// Create result buffer
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::ReleasePhysical as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::ReleasePhysical as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;

// Decode result, return result if decoded successfully
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/processops/request_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ pub(crate) fn rpc_request_core(pid: Pid, new_pid: bool, entry_point: u64) -> KRe

// Construct result buffer and call RPC
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::RequestCore as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::RequestCore as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;

// Decode and return the result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
Loading

0 comments on commit 2ab2732

Please sign in to comment.