Skip to content

Commit

Permalink
remove tor health daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
dr-bonez committed Jan 9, 2023
1 parent 011bac7 commit 826e714
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 168 deletions.
47 changes: 8 additions & 39 deletions backend/src/bin/embassyd.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use color_eyre::eyre::eyre;
use embassy::context::{DiagnosticContext, RpcContext};
Expand All @@ -10,14 +9,11 @@ use embassy::net::mdns::MdnsController;
use embassy::net::net_controller::NetController;
use embassy::net::net_utils::ResourceFqdn;
use embassy::net::static_server::diag_ui_file_router;
use embassy::net::tor::tor_health_check;
use embassy::shutdown::Shutdown;
use embassy::system::launch_metrics_task;
use embassy::util::daemon;
use embassy::util::logger::EmbassyLogger;
use embassy::{Error, ErrorKind, ResultExt};
use futures::{FutureExt, TryFutureExt};
use reqwest::{Client, Proxy};
use tokio::signal::unix::signal;
use tracing::instrument;

Expand Down Expand Up @@ -80,44 +76,17 @@ async fn inner_main(cfg_path: Option<PathBuf>) -> Result<Option<Shutdown>, Error
.await
});

let tor_health_ctx = rpc_ctx.clone();
let tor_client = Client::builder()
.proxy(
Proxy::http(format!(
"socks5h://{}:{}",
rpc_ctx.tor_socks.ip(),
rpc_ctx.tor_socks.port()
))
.with_kind(crate::ErrorKind::Network)?,
)
.build()
.with_kind(crate::ErrorKind::Network)?;
let tor_health_daemon = daemon(
move || {
let ctx = tor_health_ctx.clone();
let client = tor_client.clone();
async move { tor_health_check(&client, &ctx.net_controller.tor).await }
},
Duration::from_secs(300),
rpc_ctx.shutdown.subscribe(),
);

embassy::sound::CHIME.play().await?;

futures::try_join!(
metrics_task
.map_err(|e| Error::new(
metrics_task
.map_err(|e| {
Error::new(
eyre!("{}", e).wrap_err("Metrics daemon panicked!"),
ErrorKind::Unknown
))
.map_ok(|_| tracing::debug!("Metrics daemon Shutdown")),
tor_health_daemon
.map_err(|e| Error::new(
e.wrap_err("Tor Health daemon panicked!"),
ErrorKind::Unknown
))
.map_ok(|_| tracing::debug!("Tor Health daemon Shutdown")),
)?;
ErrorKind::Unknown,
)
})
.map_ok(|_| tracing::debug!("Metrics daemon Shutdown"))
.await?;

let mut shutdown = shutdown_recv
.recv()
Expand Down
129 changes: 0 additions & 129 deletions backend/src/net/tor.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use std::collections::BTreeMap;
use std::net::{Ipv4Addr, SocketAddr};
use std::time::Duration;

use clap::ArgMatches;
use color_eyre::eyre::eyre;
use futures::future::BoxFuture;
use futures::FutureExt;
use reqwest::Client;
use rpc_toolkit::command;
use serde_json::json;
use sqlx::{Executor, Postgres};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -109,10 +106,6 @@ impl TorController {
self.0.lock().await.remove(pkg_id, interfaces).await
}

pub async fn replace(&self) -> Result<bool, Error> {
self.0.lock().await.replace().await
}

pub async fn embassyd_tor_key(&self) -> TorSecretKeyV3 {
self.0.lock().await.embassyd_tor_key.clone()
}
Expand Down Expand Up @@ -266,80 +259,6 @@ impl TorControllerInner {
Ok(())
}

#[instrument(skip(self))]
async fn replace(&mut self) -> Result<bool, Error> {
let connection = self.connection.take();
let uptime = if let Some(mut c) = connection {
// this should be unreachable because the only time when this should be none is for the duration of tor's
// restart lower down in this method, which is held behind a Mutex
let uptime = c.get_info("uptime").await?.parse::<u64>()?;
// we never want to restart the tor daemon if it hasn't been up for at least a half hour
if uptime < 1800 {
self.connection = Some(c); // put it back
return Ok(false);
}
// when connection closes below, tor daemon is restarted
c.take_ownership().await?;
// this should close the connection
drop(c);
Some(uptime)
} else {
None
};

// attempt to reconnect to the control socket, not clear how long this should take
let mut new_connection: AuthenticatedConnection;
loop {
match TcpStream::connect(self.control_addr).await {
Ok(stream) => {
let mut new_conn = torut::control::UnauthenticatedConn::new(stream);
let auth = new_conn
.load_protocol_info()
.await?
.make_auth_data()?
.ok_or_else(|| eyre!("Cookie Auth Not Available"))
.with_kind(crate::ErrorKind::Tor)?;
new_conn.authenticate(&auth).await?;
new_connection = new_conn.into_authenticated().await;
let uptime_new = new_connection.get_info("uptime").await?.parse::<u64>()?;
// if the new uptime exceeds the one we got at the beginning, it's the same tor daemon, do not proceed
match uptime {
Some(uptime) if uptime_new > uptime => (),
_ => {
new_connection.set_async_event_handler(Some(event_handler));
break;
}
}
}
Err(e) => {
tracing::info!("Failed to reconnect to tor control socket: {}", e);
tracing::info!("Trying again in one second");
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
// replace the connection object here on the new copy of the tor daemon
self.connection.replace(new_connection);

// swap empty map for owned old service map
let old_services = std::mem::take(&mut self.services);

// re add all of the services on the new control socket
for ((package_id, interface_id), (tor_key, tor_cfg, ipv4)) in old_services {
self.add(
&package_id,
ipv4,
std::iter::once((interface_id, tor_cfg, tor_key)),
)
.await?;
}

// add embassyd hidden service again
self.add_embassyd_onion().await?;

Ok(true)
}

fn embassyd_onion(&self) -> OnionAddressV3 {
self.embassyd_tor_key.public().get_onion_address()
}
Expand All @@ -359,54 +278,6 @@ impl TorControllerInner {
}
}

pub async fn tor_health_check(client: &Client, tor_controller: &TorController) {
tracing::debug!("Attempting to self-check tor address");
let onion_addr = tor_controller.embassyd_onion().await;
let result = client
.post(format!("http://{}/rpc/v1", onion_addr))
.body(
json!({
"jsonrpc": "2.0",
"method": "echo",
"params": { "message": "Follow the orange rabbit" },
})
.to_string()
.into_bytes(),
)
.send()
.await;
if let Err(e) = result {
let mut num_attempt = 1;
tracing::error!("Unable to reach self over tor, we will retry now...");
tracing::error!("The first TOR error: {}", e);

loop {
tracing::debug!("TOR Reconnecting retry number: {num_attempt}");

match tor_controller.replace().await {
Ok(restarted) => {
if restarted {
tracing::error!("Tor has been recently restarted, refusing to restart again right now...");
}
break;
}
Err(e) => {
tracing::error!("TOR retry error: {}", e);
tracing::error!("Unable to restart tor on attempt {num_attempt}...Retrying");

num_attempt += 1;
continue;
}
}
}
} else {
tracing::debug!(
"Successfully verified main tor address liveness at {}",
onion_addr
)
}
}

#[tokio::test]
async fn test() {
let mut conn = torut::control::UnauthenticatedConn::new(
Expand Down

0 comments on commit 826e714

Please sign in to comment.