Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monero socks proxy #824

Merged
merged 2 commits into from
Dec 15, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 53 additions & 44 deletions src/syncerd/monero_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,21 @@ pub struct Transaction {
block_hash: Option<Vec<u8>>,
}

fn create_rpc_client(rpc_url: String, proxy_url: Option<String>) -> monero_rpc::RpcClient {
let mut client_builder = monero_rpc::RpcClientBuilder::new();
if let Some(proxy_url) = proxy_url {
client_builder = client_builder.proxy_address(proxy_url);
}
client_builder
.build(rpc_url)
.expect("client builder failed, cannot recover from bad configuration")
}

impl MoneroRpc {
fn new(node_rpc_url: String, proxy_url: Option<String>) -> Self {
let mut client_builder = monero_rpc::RpcClientBuilder::new();
if let Some(proxy_url) = proxy_url {
client_builder = client_builder.proxy_address(proxy_url);
}
let daemon_json_rpc = client_builder
.clone()
.build(node_rpc_url.clone())
.expect("client builder failed, cannot recover from bad configuration")
.daemon();
let daemon_rpc = client_builder
.build(node_rpc_url)
.expect("client builder failed, cannot recover from bad configuration")
.daemon_rpc();
MoneroRpc {
daemon_json_rpc,
daemon_rpc,
daemon_json_rpc: create_rpc_client(node_rpc_url.clone(), proxy_url.clone()).daemon(),
daemon_rpc: create_rpc_client(node_rpc_url, proxy_url).daemon_rpc(),
height: 0,
block_hash: vec![0],
}
Expand Down Expand Up @@ -409,6 +406,7 @@ async fn run_syncerd_task_receiver(
state: Arc<Mutex<SyncerState>>,
balance_get_tx: TokioSender<(GetAddressBalance, ServiceId)>,
tx_event: TokioSender<BridgeEvent>,
proxy_address: Option<String>,
) {
tokio::spawn(async move {
loop {
Expand Down Expand Up @@ -493,27 +491,25 @@ async fn run_syncerd_task_receiver(
}
Task::HealthCheck(HealthCheck { id }) => {
debug!("performing health check");
let mut health = match monero_rpc::RpcClientBuilder::new()
.build(syncer_servers.monero_daemon.clone())
.expect(
"client builder failed, cannot recover from bad configuration",
)
.daemon()
.get_block_count()
.await
let mut health = match create_rpc_client(
syncer_servers.monero_daemon.clone(),
proxy_address.clone(),
)
.daemon()
.get_block_count()
.await
{
Ok(_) => Health::Healthy,
Err(err) => Health::FaultyMoneroDaemon(err.to_string()),
};

health = match monero_rpc::RpcClientBuilder::new()
.build(syncer_servers.monero_rpc_wallet.clone())
.expect(
"client builder failed, cannot recover from bad configuration",
)
.wallet()
.get_version()
.await
health = match create_rpc_client(
syncer_servers.monero_daemon.clone(),
proxy_address.clone(),
)
.wallet()
.get_version()
.await
{
Ok(_) => health,
Err(err) => Health::FaultyMoneroRpcWallet(err.to_string()),
Expand Down Expand Up @@ -571,9 +567,10 @@ fn address_polling(
syncer_servers: MoneroSyncerServers,
network: monero::Network,
wallet_mutex: Arc<Mutex<monero_rpc::WalletClient>>,
proxy_address: Option<String>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, None);
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, proxy_address);
loop {
let state_guard = state.lock().await;
let mut addresses = state_guard.addresses.clone();
Expand Down Expand Up @@ -670,9 +667,10 @@ fn address_polling(
fn height_polling(
state: Arc<Mutex<SyncerState>>,
syncer_servers: MoneroSyncerServers,
proxy_address: Option<String>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, None);
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, proxy_address);
loop {
let block_notif = match rpc.check_block().await {
Ok(notif) => Some(notif),
Expand Down Expand Up @@ -764,9 +762,10 @@ fn sweep_polling(
fn unseen_transaction_polling(
state: Arc<Mutex<SyncerState>>,
syncer_servers: MoneroSyncerServers,
proxy_address: Option<String>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, None);
let mut rpc = MoneroRpc::new(syncer_servers.monero_daemon, proxy_address);
loop {
let state_guard = state.lock().await;
let unseen_transactions = state_guard.unseen_transactions.clone();
Expand Down Expand Up @@ -986,6 +985,9 @@ impl Synclet for MoneroSyncer {
debug!("monero syncer servers: {:?}", syncer_servers);
let wallet_dir = opts.monero_wallet_dir_path.clone().map(PathBuf::from);

let proxy_address = opts.shared.tor_proxy.map(|address| address.to_string());
debug!("monero synclet using proxy: {:?}", proxy_address);

let _handle = std::thread::spawn(move || {
use tokio::runtime::Builder;
let rt = Builder::new_multi_thread()
Expand All @@ -995,12 +997,11 @@ impl Synclet for MoneroSyncer {
.unwrap();
rt.block_on(async {
let wallet_mutex = Arc::new(Mutex::new(
monero_rpc::RpcClientBuilder::new()
.build(syncer_servers.monero_rpc_wallet.clone())
.expect(
"client builder failed, cannot recover from bad configuration",
)
.wallet(),
create_rpc_client(
syncer_servers.monero_rpc_wallet.clone(),
proxy_address.clone(),
)
.wallet(),
));
let (balance_get_tx, balance_get_rx): (
TokioSender<(GetAddressBalance, ServiceId)>,
Expand All @@ -1022,6 +1023,7 @@ impl Synclet for MoneroSyncer {
Arc::clone(&state),
balance_get_tx,
event_tx.clone(),
proxy_address.clone(),
)
.await;
run_syncerd_bridge_event_sender(tx, event_rx, syncer_address).await;
Expand All @@ -1031,14 +1033,21 @@ impl Synclet for MoneroSyncer {
syncer_servers.clone(),
network,
Arc::clone(&wallet_mutex),
proxy_address.clone(),
);

// transaction polling is done in the same loop
let height_handle =
height_polling(Arc::clone(&state), syncer_servers.clone());
let height_handle = height_polling(
Arc::clone(&state),
syncer_servers.clone(),
proxy_address.clone(),
);

let unseen_transaction_handle =
unseen_transaction_polling(Arc::clone(&state), syncer_servers.clone());
let unseen_transaction_handle = unseen_transaction_polling(
Arc::clone(&state),
syncer_servers.clone(),
proxy_address.clone(),
);

let sweep_handle = sweep_polling(
Arc::clone(&state),
Expand Down