Skip to content

Commit

Permalink
Merge pull request #824 from TheCharlatan/moneroSocksProxy
Browse files Browse the repository at this point in the history
Monero socks proxy
  • Loading branch information
h4sh3d authored Dec 15, 2022
2 parents 3ea10e1 + 883ece8 commit b792e72
Showing 1 changed file with 53 additions and 44 deletions.
97 changes: 53 additions & 44 deletions src/syncerd/monero_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,21 @@ pub struct Transaction {
block_hash: Option<Vec<u8>>,
}

fn create_rpc_client(rpc_url: String, proxy_url: Option<String>) -> monero_rpc::RpcClient {
let mut client_builder = monero_rpc::RpcClientBuilder::new();
if let Some(proxy_url) = proxy_url {
client_builder = client_builder.proxy_address(proxy_url);
}
client_builder
.build(rpc_url)
.expect("client builder failed, cannot recover from bad configuration")
}

impl MoneroRpc {
fn new(node_rpc_url: String, proxy_url: Option<String>) -> Self {
let mut client_builder = monero_rpc::RpcClientBuilder::new();
if let Some(proxy_url) = proxy_url {
client_builder = client_builder.proxy_address(proxy_url);
}
let daemon_json_rpc = client_builder
.clone()
.build(node_rpc_url.clone())
.expect("client builder failed, cannot recover from bad configuration")
.daemon();
let daemon_rpc = client_builder
.build(node_rpc_url)
.expect("client builder failed, cannot recover from bad configuration")
.daemon_rpc();
MoneroRpc {
daemon_json_rpc,
daemon_rpc,
daemon_json_rpc: create_rpc_client(node_rpc_url.clone(), proxy_url.clone()).daemon(),
daemon_rpc: create_rpc_client(node_rpc_url, proxy_url).daemon_rpc(),
height: 0,
block_hash: vec![0],
}
Expand Down Expand Up @@ -409,6 +406,7 @@ async fn run_syncerd_task_receiver(
state: Arc<Mutex<SyncerState>>,
balance_get_tx: TokioSender<(GetAddressBalance, ServiceId)>,
tx_event: TokioSender<BridgeEvent>,
proxy_address: Option<String>,
) {
tokio::spawn(async move {
loop {
Expand Down Expand Up @@ -493,27 +491,25 @@ async fn run_syncerd_task_receiver(
}
Task::HealthCheck(HealthCheck { id }) => {
debug!("performing health check");
let mut health = match monero_rpc::RpcClientBuilder::new()
.build(syncer_servers.monero_daemon.clone())
.expect(
"client builder failed, cannot recover from bad configuration",
)
.daemon()
.get_block_count()
.await
let mut health = match create_rpc_client(
syncer_servers.monero_daemon.clone(),
proxy_address.clone(),
)
.daemon()
.get_block_count()
.await
{
Ok(_) => Health::Healthy,
Err(err) => Health::FaultyMoneroDaemon(err.to_string()),
};

health = match monero_rpc::RpcClientBuilder::new()
.build(syncer_servers.monero_rpc_wallet.clone())
.expect(
"client builder failed, cannot recover from bad configuration",
)
.wallet()
.get_version()
.await
health = match create_rpc_client(
syncer_servers.monero_daemon.clone(),
proxy_address.clone(),
)
.wallet()
.get_version()
.await
{
Ok(_) => health,
Err(err) => Health::FaultyMoneroRpcWallet(err.to_string()),
Expand Down Expand Up @@ -571,9 +567,10 @@ fn address_polling(
syncer_servers: MoneroSyncerServers,
network: monero::Network,
wallet_mutex: Arc<Mutex<monero_rpc::WalletClient>>,
proxy_address: Option<String>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, None);
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, proxy_address);
loop {
let state_guard = state.lock().await;
let mut addresses = state_guard.addresses.clone();
Expand Down Expand Up @@ -670,9 +667,10 @@ fn address_polling(
fn height_polling(
state: Arc<Mutex<SyncerState>>,
syncer_servers: MoneroSyncerServers,
proxy_address: Option<String>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, None);
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, proxy_address);
loop {
let block_notif = match rpc.check_block().await {
Ok(notif) => Some(notif),
Expand Down Expand Up @@ -764,9 +762,10 @@ fn sweep_polling(
fn unseen_transaction_polling(
state: Arc<Mutex<SyncerState>>,
syncer_servers: MoneroSyncerServers,
proxy_address: Option<String>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, None);
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, proxy_address);
loop {
let state_guard = state.lock().await;
let unseen_transactions = state_guard.unseen_transactions.clone();
Expand Down Expand Up @@ -986,6 +985,9 @@ impl Synclet for MoneroSyncer {
debug!("monero syncer servers: {:?}", syncer_servers);
let wallet_dir = opts.monero_wallet_dir_path.clone().map(PathBuf::from);

let proxy_address = opts.shared.tor_proxy.map(|address| address.to_string());
debug!("monero synclet using proxy: {:?}", proxy_address);

let _handle = std::thread::spawn(move || {
use tokio::runtime::Builder;
let rt = Builder::new_multi_thread()
Expand All @@ -995,12 +997,11 @@ impl Synclet for MoneroSyncer {
.unwrap();
rt.block_on(async {
let wallet_mutex = Arc::new(Mutex::new(
monero_rpc::RpcClientBuilder::new()
.build(syncer_servers.monero_rpc_wallet.clone())
.expect(
"client builder failed, cannot recover from bad configuration",
)
.wallet(),
create_rpc_client(
syncer_servers.monero_rpc_wallet.clone(),
proxy_address.clone(),
)
.wallet(),
));
let (balance_get_tx, balance_get_rx): (
TokioSender<(GetAddressBalance, ServiceId)>,
Expand All @@ -1022,6 +1023,7 @@ impl Synclet for MoneroSyncer {
Arc::clone(&state),
balance_get_tx,
event_tx.clone(),
proxy_address.clone(),
)
.await;
run_syncerd_bridge_event_sender(tx, event_rx, syncer_address).await;
Expand All @@ -1031,14 +1033,21 @@ impl Synclet for MoneroSyncer {
syncer_servers.clone(),
network,
Arc::clone(&wallet_mutex),
proxy_address.clone(),
);

// transaction polling is done in the same loop
let height_handle =
height_polling(Arc::clone(&state), syncer_servers.clone());
let height_handle = height_polling(
Arc::clone(&state),
syncer_servers.clone(),
proxy_address.clone(),
);

let unseen_transaction_handle =
unseen_transaction_polling(Arc::clone(&state), syncer_servers.clone());
let unseen_transaction_handle = unseen_transaction_polling(
Arc::clone(&state),
syncer_servers.clone(),
proxy_address.clone(),
);

let sweep_handle = sweep_polling(
Arc::clone(&state),
Expand Down

0 comments on commit b792e72

Please sign in to comment.