Skip to content

Commit

Permalink
Remove spawn blocking from wallet db
Browse files Browse the repository at this point in the history
Removed spawn blocking calls for db operations from the wallet in the transaction
service. (This is the last  PR in a couple of PRs required to implement this
fully throughout the wallet code.)
  • Loading branch information
hansieodendaal committed Sep 1, 2022
1 parent 66c8032 commit 393a526
Show file tree
Hide file tree
Showing 12 changed files with 419 additions and 771 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
.await
.ok_or_else(|| TransactionServiceProtocolError::new(self.tx_id, TransactionServiceError::Shutdown))?;

let completed_tx = match self.resources.db.get_completed_transaction(self.tx_id).await {
let completed_tx = match self.resources.db.get_completed_transaction(self.tx_id) {
Ok(tx) => tx,
Err(e) => {
error!(
Expand Down Expand Up @@ -275,7 +275,6 @@ where
self.resources
.db
.broadcast_completed_transaction(self.tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.tx_id, TransactionServiceError::from(e)))?;
let _size = self
.resources
Expand Down Expand Up @@ -430,7 +429,7 @@ where
"Failed to Cancel outputs for TxId: {} after failed sending attempt with error {:?}", self.tx_id, e
);
}
if let Err(e) = self.resources.db.reject_completed_transaction(self.tx_id, reason).await {
if let Err(e) = self.resources.db.reject_completed_transaction(self.tx_id, reason) {
warn!(
target: LOG_TARGET,
"Failed to Cancel TxId: {} after failed sending attempt with error {:?}", self.tx_id, e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ where
.resources
.db
.transaction_exists(data.tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?
{
trace!(
Expand Down Expand Up @@ -167,7 +166,6 @@ where
self.resources
.db
.add_pending_inbound_transaction(inbound_transaction.tx_id, inbound_transaction.clone())
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

let send_result = send_transaction_reply(
Expand All @@ -182,7 +180,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

if send_result {
Expand Down Expand Up @@ -237,7 +234,7 @@ where
.ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))?
.fuse();

let inbound_tx = match self.resources.db.get_pending_inbound_transaction(self.id).await {
let inbound_tx = match self.resources.db.get_pending_inbound_transaction(self.id) {
Ok(tx) => tx,
Err(_e) => {
debug!(
Expand Down Expand Up @@ -295,7 +292,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
}

Expand Down Expand Up @@ -339,7 +335,6 @@ where
Ok(_) => self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?,
Err(e) => warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -456,8 +451,7 @@ where

self.resources
.db
.complete_inbound_transaction(self.id, completed_transaction.clone())
.await
.complete_inbound_transaction(self.id, completed_transaction)
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

info!(
Expand Down Expand Up @@ -486,17 +480,13 @@ where
"Cancelling Transaction Receive Protocol (TxId: {}) due to timeout after no counterparty response", self.id
);

self.resources
.db
.cancel_pending_transaction(self.id)
.await
.map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?;
self.resources.db.cancel_pending_transaction(self.id).map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?;

self.resources
.output_manager_service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ where
.resources
.db
.transaction_exists(tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?
{
let fee = sender_protocol
Expand All @@ -337,14 +336,12 @@ where
self.resources
.db
.add_pending_outbound_transaction(outbound_tx.tx_id, outbound_tx)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
}
if transaction_status == TransactionStatus::Pending {
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
}

Expand Down Expand Up @@ -394,7 +391,6 @@ where
.resources
.db
.get_pending_outbound_transaction(tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

if !outbound_tx.sender_protocol.is_collecting_single_signature() {
Expand Down Expand Up @@ -452,7 +448,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, e.into()))?
}
},
Expand Down Expand Up @@ -499,7 +494,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(
self.id, TransactionServiceError::from(e))
)?;
Expand All @@ -521,7 +515,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(
self.id, TransactionServiceError::from(e))
)?
Expand Down Expand Up @@ -594,7 +587,6 @@ where
self.resources
.db
.complete_outbound_transaction(tx_id, completed_transaction.clone())
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
info!(
target: LOG_TARGET,
Expand All @@ -615,7 +607,6 @@ where
self.resources
.db
.increment_send_count(tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

let _size = self
Expand Down Expand Up @@ -905,20 +896,15 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

self.resources
.db
.cancel_pending_transaction(self.id)
.await
.map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?;
self.resources.db.cancel_pending_transaction(self.id).map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?;

self.resources
.output_manager_service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ where
let unconfirmed_transactions = self
.db
.fetch_unconfirmed_transactions_info()
.await
.for_protocol(self.operation_id)
.unwrap();

Expand Down Expand Up @@ -216,7 +215,7 @@ where
self.operation_id
);
let op_id = self.operation_id;
while let Some(last_mined_transaction) = self.db.fetch_last_mined_transaction().await.for_protocol(op_id)? {
while let Some(last_mined_transaction) = self.db.fetch_last_mined_transaction().for_protocol(op_id)? {
let mined_height = last_mined_transaction
.mined_height
.ok_or_else(|| {
Expand Down Expand Up @@ -414,7 +413,6 @@ where
num_confirmations >= self.config.num_confirmations_required,
status.is_faux(),
)
.await
.for_protocol(self.operation_id)?;

if num_confirmations >= self.config.num_confirmations_required {
Expand Down Expand Up @@ -488,12 +486,10 @@ where
num_confirmations >= self.config.num_confirmations_required,
false,
)
.await
.for_protocol(self.operation_id)?;

self.db
.abandon_coinbase_transaction(tx_id)
.await
.for_protocol(self.operation_id)?;

self.publish_event(TransactionEvent::TransactionCancelled(
Expand All @@ -510,7 +506,6 @@ where
) -> Result<(), TransactionServiceProtocolError<OperationId>> {
self.db
.set_transaction_as_unmined(tx_id)
.await
.for_protocol(self.operation_id)?;

if *status == TransactionStatus::Coinbase {
Expand Down
Loading

0 comments on commit 393a526

Please sign in to comment.