Skip to content

Commit

Permalink
feat: moving to http using unix socket
Browse files Browse the repository at this point in the history
  • Loading branch information
sousandrei committed Sep 17, 2022
1 parent 371ec50 commit acb8590
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 113 deletions.
5 changes: 4 additions & 1 deletion src/consts.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
pub const TMP_DIR: &str = "/tmp/firesquid";
pub const LOG_DIR: &str = "/tmp/firesquid/logs";

pub const ASSETS_DIR: &str = "/etc/firesquid";
pub const SOCKET: &str = "/tmp/firesquid/http.sock";

// pub const ASSETS_DIR: &str = "/etc/firesquid";
pub const ASSETS_DIR: &str = "./assets";

pub const DRIVE_NAME: &str = "rootfs";
pub const KERNEL_NAME: &str = "vmlinux";
116 changes: 116 additions & 0 deletions src/daemon.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use crate::state::StatePtr;
use hyper::service::{make_service_fn, service_fn};
use hyper::Server;
use hyperlocal::UnixServerExt;
use std::sync::Arc;
use std::{fs, path::Path};
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use tracing::{error, info};

use crate::api;
use crate::consts::{LOG_DIR, SOCKET, TMP_DIR};
use crate::folders;
use crate::vm;

pub async fn start() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let state = Arc::new(RwLock::new(Vec::new()));

folders::init(TMP_DIR)?;
folders::init(LOG_DIR)?;

let path = Path::new(SOCKET);

if path.exists() {
fs::remove_file(path)?;
}

let state_ptr = Arc::new(state);

let service = make_service_fn(|_| {
let state = state_ptr.clone();
async {
Ok::<_, hyper::Error>(service_fn(move |req| {
let state = state.clone();
async move { api::router(req, state).await }
}))
}
});

let server = Server::bind_unix(path)?.serve(service);

info!("Listening on {}", SOCKET);

let (tx, mut rx) = mpsc::channel(1);

let graceful = server.with_graceful_shutdown(async {
rx.recv().await;
info!("Shutting down hyper server");
});

listen_for_signal(tx.clone(), SignalKind::interrupt());
listen_for_signal(tx.clone(), SignalKind::terminate());

graceful.await?;

terminate_all_vms(state_ptr).await;

Ok(())
}

async fn terminate_all_vms(state_ptr: StatePtr) {
let vms = state_ptr.read().await;

for v in vms.iter() {
info!("Terminating [{}]", v.name);

if let Err(e) = vm::terminate(&v.name).await {
error!("Error on termination [{}, {}]", v.name, e.to_string());
}
}

// Waits for last machines
for v in vms.iter() {
loop {
match get_process(v.pid).await {
Ok(value) => match value {
false => break,
true => {}
},
Err(e) => {
error!("Error on checking process [{}, {}]", v.name, e.to_string());
break;
}
}
}
}
}

async fn get_process(pid: u32) -> Result<bool, std::io::Error> {
match tokio::process::Command::new("ls")
.arg("/proc")
.output()
.await
{
Err(e) => Err(e),
Ok(output) => {
let output = String::from_utf8_lossy(&output.stdout);
let output: Vec<&str> = output.split('\n').collect();

Ok(output.contains(&pid.to_string().as_str()))
}
}
}

fn listen_for_signal(tx: tokio::sync::mpsc::Sender<SignalKind>, kind: SignalKind) {
tokio::task::spawn(async move {
let mut stream =
signal(kind).unwrap_or_else(|_| panic!("Error opening signal stream [{:?}]", kind));

stream.recv().await;
info!("Termination signal received");

tx.send(kind).await.unwrap();
});
}
114 changes: 3 additions & 111 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,16 @@
use hyper::service::{make_service_fn, service_fn};
use hyper::Server;
use std::sync::Arc;
use std::{env, net::SocketAddr};
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use tracing::{error, info};
use std::env;

// mod cli;

mod api;
mod consts;
mod daemon;
mod error;
mod folders;
mod io;
mod state;
mod vm;

use crate::state::StatePtr;

use crate::consts::{LOG_DIR, TMP_DIR};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if env::var_os("RUST_LOG").is_none() {
Expand All @@ -34,105 +24,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// return Ok(());
// }

start_daemon().await?;

Ok(())
}

async fn start_daemon() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let state = Arc::new(RwLock::new(Vec::new()));

folders::init(TMP_DIR)?;
folders::init(LOG_DIR)?;

// TODO: check if HTTP is the way, then change port if needed
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));

let state_ptr = Arc::new(state);

let service = make_service_fn(|_| {
let state = state_ptr.clone();
async {
Ok::<_, hyper::Error>(service_fn(move |req| {
let state = state.clone();
async move { api::router(req, state).await }
}))
}
});

let server = Server::bind(&addr).serve(service);

info!("Listening on http://{}", addr);

let (tx, mut rx) = mpsc::channel(1);

let graceful = server.with_graceful_shutdown(async {
rx.recv().await;
info!("Shutting down hyper server");
});

listen_for_signal(tx.clone(), SignalKind::interrupt());
listen_for_signal(tx.clone(), SignalKind::terminate());

graceful.await?;

terminate_all_vms(state_ptr).await;
daemon::start().await?;

Ok(())
}

async fn terminate_all_vms(state_ptr: StatePtr) {
let vms = state_ptr.read().await;

for v in vms.iter() {
info!("Terminating [{}]", v.name);

if let Err(e) = vm::terminate(&v.name).await {
error!("Error on termination [{}, {}]", v.name, e.to_string());
}
}

// Waits for last machines
for v in vms.iter() {
loop {
match get_process(v.pid).await {
Ok(value) => match value {
false => break,
true => {}
},
Err(e) => {
error!("Error on checking process [{}, {}]", v.name, e.to_string());
break;
}
}
}
}
}

async fn get_process(pid: u32) -> Result<bool, std::io::Error> {
match tokio::process::Command::new("ls")
.arg("/proc")
.output()
.await
{
Err(e) => Err(e),
Ok(output) => {
let output = String::from_utf8_lossy(&output.stdout);
let output: Vec<&str> = output.split('\n').collect();

Ok(output.contains(&pid.to_string().as_str()))
}
}
}

fn listen_for_signal(tx: tokio::sync::mpsc::Sender<SignalKind>, kind: SignalKind) {
tokio::task::spawn(async move {
let mut stream =
signal(kind).unwrap_or_else(|_| panic!("Error opening signal stream [{:?}]", kind));

stream.recv().await;
info!("Termination signal received");

tx.send(kind).await.unwrap();
});
}
2 changes: 1 addition & 1 deletion src/vm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub async fn spawn(name: &str, state_ptr: StatePtr) -> Result<(), RuntimeError>
)));
}

if let Err(error) = drive::create_drive(&name) {
if drive::create_drive(&name).is_err() {
drive::delete_drive(&name)?;
socket::delete_socket(&name)?;

Expand Down

0 comments on commit acb8590

Please sign in to comment.