Skip to content

Commit

Permalink
Monero syncer: Use provided proxy address
Browse files Browse the repository at this point in the history
This now allows the monero syncer to connect to monero rpc servers
through a proxy.
  • Loading branch information
TheCharlatan committed Dec 13, 2022
1 parent 8a29e31 commit 90113ed
Showing 1 changed file with 54 additions and 44 deletions.
98 changes: 54 additions & 44 deletions src/syncerd/monero_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,22 @@ pub struct Transaction {
block_hash: Option<Vec<u8>>,
}

fn create_monero_rpc_client(rpc_url: String, proxy_url: Option<String>) -> monero_rpc::RpcClient {
let mut client_builder = monero_rpc::RpcClientBuilder::new();
if let Some(proxy_url) = proxy_url {
client_builder = client_builder.proxy_address(proxy_url);
}
client_builder
.build(rpc_url)
.expect("client builder failed, cannot recover from bad configuration")
}

impl MoneroRpc {
fn new(node_rpc_url: String, proxy_url: Option<String>) -> Self {
let mut client_builder = monero_rpc::RpcClientBuilder::new();
if let Some(proxy_url) = proxy_url {
client_builder = client_builder.proxy_address(proxy_url);
}
let daemon_json_rpc = client_builder
.clone()
.build(node_rpc_url.clone())
.expect("client builder failed, cannot recover from bad configuration")
.daemon();
let daemon_rpc = client_builder
.build(node_rpc_url)
.expect("client builder failed, cannot recover from bad configuration")
.daemon_rpc();
MoneroRpc {
daemon_json_rpc,
daemon_rpc,
daemon_json_rpc: create_monero_rpc_client(node_rpc_url.clone(), proxy_url.clone())
.daemon(),
daemon_rpc: create_monero_rpc_client(node_rpc_url, proxy_url).daemon_rpc(),
height: 0,
block_hash: vec![0],
}
Expand Down Expand Up @@ -399,6 +397,7 @@ async fn run_syncerd_task_receiver(
receive_task_channel: Receiver<SyncerdTask>,
state: Arc<Mutex<SyncerState>>,
tx_event: TokioSender<BridgeEvent>,
proxy_address: Option<String>,
) {
tokio::spawn(async move {
loop {
Expand Down Expand Up @@ -477,27 +476,25 @@ async fn run_syncerd_task_receiver(
}
Task::HealthCheck(HealthCheck { id }) => {
debug!("performing health check");
let mut health = match monero_rpc::RpcClientBuilder::new()
.build(syncer_servers.monero_daemon.clone())
.expect(
"client builder failed, cannot recover from bad configuration",
)
.daemon()
.get_block_count()
.await
let mut health = match create_monero_rpc_client(
syncer_servers.monero_daemon.clone(),
proxy_address.clone(),
)
.daemon()
.get_block_count()
.await
{
Ok(_) => Health::Healthy,
Err(err) => Health::FaultyMoneroDaemon(err.to_string()),
};

health = match monero_rpc::RpcClientBuilder::new()
.build(syncer_servers.monero_rpc_wallet.clone())
.expect(
"client builder failed, cannot recover from bad configuration",
)
.wallet()
.get_version()
.await
health = match create_monero_rpc_client(
syncer_servers.monero_daemon.clone(),
proxy_address.clone(),
)
.wallet()
.get_version()
.await
{
Ok(_) => health,
Err(err) => Health::FaultyMoneroRpcWallet(err.to_string()),
Expand Down Expand Up @@ -555,9 +552,10 @@ fn address_polling(
syncer_servers: MoneroSyncerServers,
network: monero::Network,
wallet_mutex: Arc<Mutex<monero_rpc::WalletClient>>,
proxy_address: Option<String>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, None);
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, proxy_address);
loop {
let state_guard = state.lock().await;
let mut addresses = state_guard.addresses.clone();
Expand Down Expand Up @@ -654,9 +652,10 @@ fn address_polling(
fn height_polling(
state: Arc<Mutex<SyncerState>>,
syncer_servers: MoneroSyncerServers,
proxy_address: Option<String>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, None);
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, proxy_address);
loop {
let block_notif = match rpc.check_block().await {
Ok(notif) => Some(notif),
Expand Down Expand Up @@ -748,9 +747,10 @@ fn sweep_polling(
fn unseen_transaction_polling(
state: Arc<Mutex<SyncerState>>,
syncer_servers: MoneroSyncerServers,
proxy_address: Option<String>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, None);
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, proxy_address);
loop {
let state_guard = state.lock().await;
let unseen_transactions = state_guard.unseen_transactions.clone();
Expand Down Expand Up @@ -841,6 +841,9 @@ impl Synclet for MoneroSyncer {
debug!("monero syncer servers: {:?}", syncer_servers);
let wallet_dir = opts.monero_wallet_dir_path.clone().map(PathBuf::from);

let proxy_address = opts.shared.tor_proxy.map(|address| address.to_string());
debug!("monero synclet using proxy: {:?}", proxy_address);

let _handle = std::thread::spawn(move || {
use tokio::runtime::Builder;
let rt = Builder::new_multi_thread()
Expand All @@ -850,12 +853,11 @@ impl Synclet for MoneroSyncer {
.unwrap();
rt.block_on(async {
let wallet_mutex = Arc::new(Mutex::new(
monero_rpc::RpcClientBuilder::new()
.build(syncer_servers.monero_rpc_wallet.clone())
.expect(
"client builder failed, cannot recover from bad configuration",
)
.wallet(),
create_monero_rpc_client(
syncer_servers.monero_rpc_wallet.clone(),
proxy_address.clone(),
)
.wallet(),
));
let (event_tx, event_rx): (
TokioSender<BridgeEvent>,
Expand All @@ -871,6 +873,7 @@ impl Synclet for MoneroSyncer {
receive_task_channel,
Arc::clone(&state),
event_tx.clone(),
proxy_address.clone(),
)
.await;
run_syncerd_bridge_event_sender(tx, event_rx, syncer_address).await;
Expand All @@ -880,14 +883,21 @@ impl Synclet for MoneroSyncer {
syncer_servers.clone(),
network,
Arc::clone(&wallet_mutex),
proxy_address.clone(),
);

// transaction polling is done in the same loop
let height_handle =
height_polling(Arc::clone(&state), syncer_servers.clone());
let height_handle = height_polling(
Arc::clone(&state),
syncer_servers.clone(),
proxy_address.clone(),
);

let unseen_transaction_handle =
unseen_transaction_polling(Arc::clone(&state), syncer_servers.clone());
let unseen_transaction_handle = unseen_transaction_polling(
Arc::clone(&state),
syncer_servers.clone(),
proxy_address.clone(),
);

let sweep_handle = sweep_polling(
Arc::clone(&state),
Expand Down

0 comments on commit 90113ed

Please sign in to comment.