Skip to content

Commit

Permalink
Bitcoin syncer: Unblock event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
TheCharlatan committed Dec 17, 2021
1 parent ae1a682 commit df38ff0
Showing 1 changed file with 131 additions and 90 deletions.
221 changes: 131 additions & 90 deletions src/syncerd/bitcoin_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use crate::syncerd::syncer_state::AddressTx;
use crate::syncerd::syncer_state::SyncerState;
use crate::syncerd::syncer_state::WatchedTransaction;
use crate::syncerd::types::{AddressAddendum, Boolean, Task};
use crate::syncerd::BroadcastTransaction;
use crate::syncerd::BtcAddressAddendum;
use crate::syncerd::Event;
use crate::syncerd::GetTx;
use crate::syncerd::TaskTarget;
use crate::syncerd::TransactionBroadcasted;
use crate::syncerd::TransactionRetrieved;
Expand Down Expand Up @@ -433,10 +435,10 @@ async fn run_syncerd_bridge_event_sender(
}

async fn run_syncerd_task_receiver(
electrum_server: String,
receive_task_channel: Receiver<SyncerdTask>,
state: Arc<Mutex<SyncerState>>,
tx_event: TokioSender<SyncerdBridgeEvent>,
transaction_broadcast_tx: TokioSender<(BroadcastTransaction, ServiceId)>,
transaction_get_tx: TokioSender<(GetTx, ServiceId)>,
) {
let task_receiver = Arc::new(Mutex::new(receive_task_channel));
tokio::spawn(async move {
Expand All @@ -449,45 +451,10 @@ async fn run_syncerd_task_receiver(
Ok(syncerd_task) => {
match syncerd_task.task {
Task::GetTx(task) => {
match Client::new(&electrum_server).and_then(|transaction_client| {
transaction_client.transaction_get(
&bitcoin::Txid::from_slice(&task.hash).unwrap(),
)
}) {
Ok(tx) => {
tx_event
.send(SyncerdBridgeEvent {
event: Event::TransactionRetrieved(
TransactionRetrieved {
id: task.id,
tx: Some(tx),
},
),
source: syncerd_task.source,
})
.await
.expect("error sending transaction retrieved event");
info!(
"successfully retrieved tx: {:?}",
hex::encode(task.hash)
);
}
Err(e) => {
tx_event
.send(SyncerdBridgeEvent {
event: Event::TransactionRetrieved(
TransactionRetrieved {
id: task.id,
tx: None,
},
),
source: syncerd_task.source,
})
.await
.expect("error sending transaction retrieved event");
info!("failed to retrieved tx: {:?}", e);
}
}
transaction_get_tx
.send((task, syncerd_task.source))
.await
.expect("failed on transaction_get sender");
}
Task::SweepAddress(_) => {
error!("sweep address not implemented for bitcoin syncer");
Expand All @@ -504,50 +471,11 @@ async fn run_syncerd_task_receiver(
drop(state_guard);
}
Task::BroadcastTransaction(task) => {
// TODO: match error and emit event with fail code
debug!("trying to broadcast tx: {:?}", task.tx.to_hex());
match Client::new(&electrum_server).and_then(|broadcast_client| {
broadcast_client.transaction_broadcast_raw(&task.tx.clone())
}) {
Ok(txid) => {
tx_event
.send(SyncerdBridgeEvent {
event: Event::TransactionBroadcasted(
TransactionBroadcasted {
id: task.id,
tx: task.tx,
error: None,
},
),
source: syncerd_task.source,
})
.await
.expect("error sending transaction broadcast event");
info!(
"Successfully broadcasted: {}",
txid.bright_yellow_italic()
);
}
Err(e) => {
tx_event
.send(SyncerdBridgeEvent {
event: Event::TransactionBroadcasted(
TransactionBroadcasted {
id: task.id,
tx: task.tx,
error: Some(format!(
"failed to broadcast tx: {}",
e.err()
)),
},
),
source: syncerd_task.source,
})
.await
.expect("error sending transaction broadcast event");
error!("failed to broadcast tx: {}", e.err());
}
}
transaction_broadcast_tx
.send((task, syncerd_task.source))
.await
.expect("failed on transaction_broadcast_tx sender");
}
Task::WatchAddress(task) => match task.addendum.clone() {
AddressAddendum::Bitcoin(_) => {
Expand Down Expand Up @@ -763,6 +691,94 @@ impl BitcoinSyncer {
}
}

fn transaction_broadcasting(
electrum_server: String,
mut transaction_broadcast_rx: TokioReceiver<(BroadcastTransaction, ServiceId)>,
tx_event: TokioSender<SyncerdBridgeEvent>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
while let Some((broadcast_transaction, source)) = transaction_broadcast_rx.recv().await {
match Client::new(&electrum_server).and_then(|broadcast_client| {
broadcast_client.transaction_broadcast_raw(&broadcast_transaction.tx.clone())
}) {
Ok(txid) => {
tx_event
.send(SyncerdBridgeEvent {
event: Event::TransactionBroadcasted(TransactionBroadcasted {
id: broadcast_transaction.id,
tx: broadcast_transaction.tx,
error: None,
}),
source,
})
.await
.expect("error sending transaction broadcast event");
info!("Successfully broadcasted: {}", txid.bright_yellow_italic());
}
Err(e) => {
tx_event
.send(SyncerdBridgeEvent {
event: Event::TransactionBroadcasted(TransactionBroadcasted {
id: broadcast_transaction.id,
tx: broadcast_transaction.tx,
error: Some(format!("failed to broadcast tx: {}", e.err())),
}),
source,
})
.await
.expect("error sending transaction broadcast event");
error!("failed to broadcast tx: {}", e.err());
}
}
}
})
}

fn transaction_fetcher(
electrum_server: String,
mut transaction_get_rx: TokioReceiver<(GetTx, ServiceId)>,
tx_event: TokioSender<SyncerdBridgeEvent>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
while let Some((get_transaction, source)) = transaction_get_rx.recv().await {
match Client::new(&electrum_server).and_then(|transaction_client| {
transaction_client
.transaction_get(&bitcoin::Txid::from_slice(&get_transaction.hash).unwrap())
}) {
Ok(tx) => {
tx_event
.send(SyncerdBridgeEvent {
event: Event::TransactionRetrieved(TransactionRetrieved {
id: get_transaction.id,
tx: Some(tx),
}),
source,
})
.await
.expect("error sending transaction retrieved event");
info!(
"successfully retrieved tx: {:?}",
hex::encode(get_transaction.hash)
);
}
Err(e) => {
tx_event
.send(SyncerdBridgeEvent {
event: Event::TransactionRetrieved(TransactionRetrieved {
id: get_transaction.id,
tx: None,
}),
source,
})
.await
.expect("error sending transaction retrieved event");
info!("failed to retrieved tx: {:?}", e);
}
}
}
})
}

impl Synclet for BitcoinSyncer {
fn run(
&mut self,
Expand All @@ -787,16 +803,24 @@ impl Synclet for BitcoinSyncer {
TokioSender<SyncerdBridgeEvent>,
TokioReceiver<SyncerdBridgeEvent>,
) = tokio::sync::mpsc::channel(120);
let (transaction_broadcast_tx, transaction_broadcast_rx): (
TokioSender<(BroadcastTransaction, ServiceId)>,
TokioReceiver<(BroadcastTransaction, ServiceId)>,
) = tokio::sync::mpsc::channel(120);
let (transaction_get_tx, transaction_get_rx): (
TokioSender<(GetTx, ServiceId)>,
TokioReceiver<(GetTx, ServiceId)>,
) = tokio::sync::mpsc::channel(120);
let state = Arc::new(Mutex::new(SyncerState::new(
event_tx.clone(),
Coin::Bitcoin,
)));

run_syncerd_task_receiver(
electrum_server.clone(),
receive_task_channel,
Arc::clone(&state),
event_tx.clone(),
transaction_broadcast_tx,
transaction_get_tx,
)
.await;
run_syncerd_bridge_event_sender(tx, event_rx, syncer_address).await;
Expand All @@ -807,11 +831,28 @@ impl Synclet for BitcoinSyncer {
let height_handle =
height_polling(Arc::clone(&state), electrum_server.clone(), polling);

let unseen_transaction_handle =
unseen_transaction_polling(Arc::clone(&state), electrum_server, polling);
let unseen_transaction_handle = unseen_transaction_polling(
Arc::clone(&state),
electrum_server.clone(),
polling,
);

let transaction_broadcast_handle = transaction_broadcasting(
electrum_server.clone(),
transaction_broadcast_rx,
event_tx.clone(),
);

let transaction_get_handle =
transaction_fetcher(electrum_server, transaction_get_rx, event_tx.clone());

let res =
tokio::try_join!(address_handle, height_handle, unseen_transaction_handle);
let res = tokio::try_join!(
address_handle,
height_handle,
unseen_transaction_handle,
transaction_broadcast_handle,
transaction_get_handle,
);
debug!("exiting bitcoin synclet run routine with: {:?}", res);
});
});
Expand Down

0 comments on commit df38ff0

Please sign in to comment.