Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bitcoin syncer: Unblock event loop #388

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 132 additions & 91 deletions src/syncerd/bitcoin_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use crate::syncerd::syncer_state::AddressTx;
use crate::syncerd::syncer_state::SyncerState;
use crate::syncerd::syncer_state::WatchedTransaction;
use crate::syncerd::types::{AddressAddendum, Boolean, Task};
use crate::syncerd::BroadcastTransaction;
use crate::syncerd::BtcAddressAddendum;
use crate::syncerd::Event;
use crate::syncerd::GetTx;
use crate::syncerd::TaskTarget;
use crate::syncerd::TransactionBroadcasted;
use crate::syncerd::TransactionRetrieved;
Expand Down Expand Up @@ -433,10 +435,10 @@ async fn run_syncerd_bridge_event_sender(
}

async fn run_syncerd_task_receiver(
electrum_server: String,
receive_task_channel: Receiver<SyncerdTask>,
state: Arc<Mutex<SyncerState>>,
tx_event: TokioSender<SyncerdBridgeEvent>,
transaction_broadcast_tx: TokioSender<(BroadcastTransaction, ServiceId)>,
transaction_get_tx: TokioSender<(GetTx, ServiceId)>,
) {
let task_receiver = Arc::new(Mutex::new(receive_task_channel));
tokio::spawn(async move {
Expand All @@ -449,45 +451,10 @@ async fn run_syncerd_task_receiver(
Ok(syncerd_task) => {
match syncerd_task.task {
Task::GetTx(task) => {
match Client::new(&electrum_server).and_then(|transaction_client| {
transaction_client.transaction_get(
&bitcoin::Txid::from_slice(&task.hash).unwrap(),
)
}) {
Ok(tx) => {
tx_event
.send(SyncerdBridgeEvent {
event: Event::TransactionRetrieved(
TransactionRetrieved {
id: task.id,
tx: Some(tx),
},
),
source: syncerd_task.source,
})
.await
.expect("error sending transaction retrieved event");
info!(
"successfully retrieved tx: {:?}",
hex::encode(task.hash)
);
}
Err(e) => {
tx_event
.send(SyncerdBridgeEvent {
event: Event::TransactionRetrieved(
TransactionRetrieved {
id: task.id,
tx: None,
},
),
source: syncerd_task.source,
})
.await
.expect("error sending transaction retrieved event");
info!("failed to retrieved tx: {:?}", e);
}
}
transaction_get_tx
.send((task, syncerd_task.source))
.await
.expect("failed on transaction_get sender");
}
Task::SweepAddress(_) => {
error!("sweep address not implemented for bitcoin syncer");
Expand All @@ -504,50 +471,11 @@ async fn run_syncerd_task_receiver(
drop(state_guard);
}
Task::BroadcastTransaction(task) => {
// TODO: match error and emit event with fail code
debug!("trying to broadcast tx: {:?}", task.tx.to_hex());
match Client::new(&electrum_server).and_then(|broadcast_client| {
broadcast_client.transaction_broadcast_raw(&task.tx.clone())
}) {
Ok(txid) => {
tx_event
.send(SyncerdBridgeEvent {
event: Event::TransactionBroadcasted(
TransactionBroadcasted {
id: task.id,
tx: task.tx,
error: None,
},
),
source: syncerd_task.source,
})
.await
.expect("error sending transaction broadcast event");
info!(
"Successfully broadcasted: {}",
txid.bright_yellow_italic()
);
}
Err(e) => {
tx_event
.send(SyncerdBridgeEvent {
event: Event::TransactionBroadcasted(
TransactionBroadcasted {
id: task.id,
tx: task.tx,
error: Some(format!(
"failed to broadcast tx: {}",
e.err()
)),
},
),
source: syncerd_task.source,
})
.await
.expect("error sending transaction broadcast event");
error!("failed to broadcast tx: {}", e.err());
}
}
transaction_broadcast_tx
.send((task, syncerd_task.source))
.await
.expect("failed on transaction_broadcast_tx sender");
}
Task::WatchAddress(task) => match task.addendum.clone() {
AddressAddendum::Bitcoin(_) => {
Expand Down Expand Up @@ -763,6 +691,94 @@ impl BitcoinSyncer {
}
}

fn transaction_broadcasting(
electrum_server: String,
mut transaction_broadcast_rx: TokioReceiver<(BroadcastTransaction, ServiceId)>,
tx_event: TokioSender<SyncerdBridgeEvent>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
while let Some((broadcast_transaction, source)) = transaction_broadcast_rx.recv().await {
match Client::new(&electrum_server).and_then(|broadcast_client| {
broadcast_client.transaction_broadcast_raw(&broadcast_transaction.tx.clone())
}) {
Ok(txid) => {
tx_event
.send(SyncerdBridgeEvent {
event: Event::TransactionBroadcasted(TransactionBroadcasted {
id: broadcast_transaction.id,
tx: broadcast_transaction.tx,
error: None,
}),
source,
})
.await
.expect("error sending transaction broadcast event");
info!("Successfully broadcasted: {}", txid.bright_yellow_italic());
}
Err(e) => {
tx_event
.send(SyncerdBridgeEvent {
event: Event::TransactionBroadcasted(TransactionBroadcasted {
id: broadcast_transaction.id,
tx: broadcast_transaction.tx,
error: Some(format!("failed to broadcast tx: {}", e.err())),
}),
source,
})
.await
.expect("error sending transaction broadcast event");
error!("failed to broadcast tx: {}", e.err());
}
}
}
})
}

fn transaction_fetcher(
electrum_server: String,
mut transaction_get_rx: TokioReceiver<(GetTx, ServiceId)>,
tx_event: TokioSender<SyncerdBridgeEvent>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
while let Some((get_transaction, source)) = transaction_get_rx.recv().await {
match Client::new(&electrum_server).and_then(|transaction_client| {
transaction_client
.transaction_get(&bitcoin::Txid::from_slice(&get_transaction.hash).unwrap())
}) {
Ok(tx) => {
tx_event
.send(SyncerdBridgeEvent {
event: Event::TransactionRetrieved(TransactionRetrieved {
id: get_transaction.id,
tx: Some(tx),
}),
source,
})
.await
.expect("error sending transaction retrieved event");
info!(
"successfully retrieved tx: {:?}",
hex::encode(get_transaction.hash)
);
}
Err(e) => {
tx_event
.send(SyncerdBridgeEvent {
event: Event::TransactionRetrieved(TransactionRetrieved {
id: get_transaction.id,
tx: None,
}),
source,
})
.await
.expect("error sending transaction retrieved event");
info!("failed to retrieved tx: {:?}", e);
}
}
}
})
}

impl Synclet for BitcoinSyncer {
fn run(
&mut self,
Expand All @@ -786,17 +802,25 @@ impl Synclet for BitcoinSyncer {
let (event_tx, event_rx): (
TokioSender<SyncerdBridgeEvent>,
TokioReceiver<SyncerdBridgeEvent>,
) = tokio::sync::mpsc::channel(120);
) = tokio::sync::mpsc::channel(200);
let (transaction_broadcast_tx, transaction_broadcast_rx): (
TokioSender<(BroadcastTransaction, ServiceId)>,
TokioReceiver<(BroadcastTransaction, ServiceId)>,
) = tokio::sync::mpsc::channel(200);
let (transaction_get_tx, transaction_get_rx): (
TokioSender<(GetTx, ServiceId)>,
TokioReceiver<(GetTx, ServiceId)>,
) = tokio::sync::mpsc::channel(200);
let state = Arc::new(Mutex::new(SyncerState::new(
event_tx.clone(),
Coin::Bitcoin,
)));

run_syncerd_task_receiver(
electrum_server.clone(),
receive_task_channel,
Arc::clone(&state),
event_tx.clone(),
transaction_broadcast_tx,
transaction_get_tx,
)
.await;
run_syncerd_bridge_event_sender(tx, event_rx, syncer_address).await;
Expand All @@ -807,11 +831,28 @@ impl Synclet for BitcoinSyncer {
let height_handle =
height_polling(Arc::clone(&state), electrum_server.clone(), polling);

let unseen_transaction_handle =
unseen_transaction_polling(Arc::clone(&state), electrum_server, polling);
let unseen_transaction_handle = unseen_transaction_polling(
Arc::clone(&state),
electrum_server.clone(),
polling,
);

let res =
tokio::try_join!(address_handle, height_handle, unseen_transaction_handle);
let transaction_broadcast_handle = transaction_broadcasting(
electrum_server.clone(),
transaction_broadcast_rx,
event_tx.clone(),
);

let transaction_get_handle =
transaction_fetcher(electrum_server, transaction_get_rx, event_tx.clone());

let res = tokio::try_join!(
address_handle,
height_handle,
unseen_transaction_handle,
transaction_broadcast_handle,
transaction_get_handle,
);
debug!("exiting bitcoin synclet run routine with: {:?}", res);
});
});
Expand Down