From 826e714000e2476de8e799a6e09425f8d5d282d1 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Fri, 6 Jan 2023 15:50:51 -0700 Subject: [PATCH] remove tor health daemon --- backend/src/bin/embassyd.rs | 47 +++---------- backend/src/net/tor.rs | 129 ------------------------------------ 2 files changed, 8 insertions(+), 168 deletions(-) diff --git a/backend/src/bin/embassyd.rs b/backend/src/bin/embassyd.rs index d4b1e11c0..79e86e001 100644 --- a/backend/src/bin/embassyd.rs +++ b/backend/src/bin/embassyd.rs @@ -1,6 +1,5 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::time::Duration; use color_eyre::eyre::eyre; use embassy::context::{DiagnosticContext, RpcContext}; @@ -10,14 +9,11 @@ use embassy::net::mdns::MdnsController; use embassy::net::net_controller::NetController; use embassy::net::net_utils::ResourceFqdn; use embassy::net::static_server::diag_ui_file_router; -use embassy::net::tor::tor_health_check; use embassy::shutdown::Shutdown; use embassy::system::launch_metrics_task; -use embassy::util::daemon; use embassy::util::logger::EmbassyLogger; use embassy::{Error, ErrorKind, ResultExt}; use futures::{FutureExt, TryFutureExt}; -use reqwest::{Client, Proxy}; use tokio::signal::unix::signal; use tracing::instrument; @@ -80,44 +76,17 @@ async fn inner_main(cfg_path: Option) -> Result, Error .await }); - let tor_health_ctx = rpc_ctx.clone(); - let tor_client = Client::builder() - .proxy( - Proxy::http(format!( - "socks5h://{}:{}", - rpc_ctx.tor_socks.ip(), - rpc_ctx.tor_socks.port() - )) - .with_kind(crate::ErrorKind::Network)?, - ) - .build() - .with_kind(crate::ErrorKind::Network)?; - let tor_health_daemon = daemon( - move || { - let ctx = tor_health_ctx.clone(); - let client = tor_client.clone(); - async move { tor_health_check(&client, &ctx.net_controller.tor).await } - }, - Duration::from_secs(300), - rpc_ctx.shutdown.subscribe(), - ); - embassy::sound::CHIME.play().await?; - futures::try_join!( - metrics_task - .map_err(|e| Error::new( + metrics_task + .map_err(|e| { + Error::new( eyre!("{}", e).wrap_err("Metrics daemon panicked!"), - ErrorKind::Unknown - )) - .map_ok(|_| tracing::debug!("Metrics daemon Shutdown")), - tor_health_daemon - .map_err(|e| Error::new( - e.wrap_err("Tor Health daemon panicked!"), - ErrorKind::Unknown - )) - .map_ok(|_| tracing::debug!("Tor Health daemon Shutdown")), - )?; + ErrorKind::Unknown, + ) + }) + .map_ok(|_| tracing::debug!("Metrics daemon Shutdown")) + .await?; let mut shutdown = shutdown_recv .recv() diff --git a/backend/src/net/tor.rs b/backend/src/net/tor.rs index 8c7f99e19..2f03dc9f0 100644 --- a/backend/src/net/tor.rs +++ b/backend/src/net/tor.rs @@ -1,14 +1,11 @@ use std::collections::BTreeMap; use std::net::{Ipv4Addr, SocketAddr}; -use std::time::Duration; use clap::ArgMatches; use color_eyre::eyre::eyre; use futures::future::BoxFuture; use futures::FutureExt; -use reqwest::Client; use rpc_toolkit::command; -use serde_json::json; use sqlx::{Executor, Postgres}; use tokio::net::TcpStream; use tokio::sync::Mutex; @@ -109,10 +106,6 @@ impl TorController { self.0.lock().await.remove(pkg_id, interfaces).await } - pub async fn replace(&self) -> Result { - self.0.lock().await.replace().await - } - pub async fn embassyd_tor_key(&self) -> TorSecretKeyV3 { self.0.lock().await.embassyd_tor_key.clone() } @@ -266,80 +259,6 @@ impl TorControllerInner { Ok(()) } - #[instrument(skip(self))] - async fn replace(&mut self) -> Result { - let connection = self.connection.take(); - let uptime = if let Some(mut c) = connection { - // this should be unreachable because the only time when this should be none is for the duration of tor's - // restart lower down in this method, which is held behind a Mutex - let uptime = c.get_info("uptime").await?.parse::()?; - // we never want to restart the tor daemon if it hasn't been up for at least a half hour - if uptime < 1800 { - self.connection = Some(c); // put it back - return Ok(false); - } - // when connection closes below, tor daemon is restarted - c.take_ownership().await?; - // this should close the connection - drop(c); - Some(uptime) - } else { - None - }; - - // attempt to reconnect to the control socket, not clear how long this should take - let mut new_connection: AuthenticatedConnection; - loop { - match TcpStream::connect(self.control_addr).await { - Ok(stream) => { - let mut new_conn = torut::control::UnauthenticatedConn::new(stream); - let auth = new_conn - .load_protocol_info() - .await? - .make_auth_data()? - .ok_or_else(|| eyre!("Cookie Auth Not Available")) - .with_kind(crate::ErrorKind::Tor)?; - new_conn.authenticate(&auth).await?; - new_connection = new_conn.into_authenticated().await; - let uptime_new = new_connection.get_info("uptime").await?.parse::()?; - // if the new uptime exceeds the one we got at the beginning, it's the same tor daemon, do not proceed - match uptime { - Some(uptime) if uptime_new > uptime => (), - _ => { - new_connection.set_async_event_handler(Some(event_handler)); - break; - } - } - } - Err(e) => { - tracing::info!("Failed to reconnect to tor control socket: {}", e); - tracing::info!("Trying again in one second"); - } - } - tokio::time::sleep(Duration::from_secs(1)).await; - } - // replace the connection object here on the new copy of the tor daemon - self.connection.replace(new_connection); - - // swap empty map for owned old service map - let old_services = std::mem::take(&mut self.services); - - // re add all of the services on the new control socket - for ((package_id, interface_id), (tor_key, tor_cfg, ipv4)) in old_services { - self.add( - &package_id, - ipv4, - std::iter::once((interface_id, tor_cfg, tor_key)), - ) - .await?; - } - - // add embassyd hidden service again - self.add_embassyd_onion().await?; - - Ok(true) - } - fn embassyd_onion(&self) -> OnionAddressV3 { self.embassyd_tor_key.public().get_onion_address() } @@ -359,54 +278,6 @@ impl TorControllerInner { } } -pub async fn tor_health_check(client: &Client, tor_controller: &TorController) { - tracing::debug!("Attempting to self-check tor address"); - let onion_addr = tor_controller.embassyd_onion().await; - let result = client - .post(format!("http://{}/rpc/v1", onion_addr)) - .body( - json!({ - "jsonrpc": "2.0", - "method": "echo", - "params": { "message": "Follow the orange rabbit" }, - }) - .to_string() - .into_bytes(), - ) - .send() - .await; - if let Err(e) = result { - let mut num_attempt = 1; - tracing::error!("Unable to reach self over tor, we will retry now..."); - tracing::error!("The first TOR error: {}", e); - - loop { - tracing::debug!("TOR Reconnecting retry number: {num_attempt}"); - - match tor_controller.replace().await { - Ok(restarted) => { - if restarted { - tracing::error!("Tor has been recently restarted, refusing to restart again right now..."); - } - break; - } - Err(e) => { - tracing::error!("TOR retry error: {}", e); - tracing::error!("Unable to restart tor on attempt {num_attempt}...Retrying"); - - num_attempt += 1; - continue; - } - } - } - } else { - tracing::debug!( - "Successfully verified main tor address liveness at {}", - onion_addr - ) - } -} - #[tokio::test] async fn test() { let mut conn = torut::control::UnauthenticatedConn::new(