Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: let sql in wal mode provide async db, not app level spawn blocking (transaction service) #4597

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Remove spawn blocking from wallet db
Removed spawn blocking calls for db operations from the wallet in the transaction
service. (This is the last  PR in a couple of PRs required to implement this
fully throughout the wallet code.)
hansieodendaal committed Sep 1, 2022

Verified

This commit was signed with the committer’s verified signature.
hansieodendaal Hansie Odendaal
commit a484b57c9eb6f79c350140999848665988c223cc
Original file line number Diff line number Diff line change
@@ -99,7 +99,7 @@ where
.await
.ok_or_else(|| TransactionServiceProtocolError::new(self.tx_id, TransactionServiceError::Shutdown))?;

let completed_tx = match self.resources.db.get_completed_transaction(self.tx_id).await {
let completed_tx = match self.resources.db.get_completed_transaction(self.tx_id) {
Ok(tx) => tx,
Err(e) => {
error!(
@@ -275,7 +275,6 @@ where
self.resources
.db
.broadcast_completed_transaction(self.tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.tx_id, TransactionServiceError::from(e)))?;
let _size = self
.resources
@@ -430,7 +429,7 @@ where
"Failed to Cancel outputs for TxId: {} after failed sending attempt with error {:?}", self.tx_id, e
);
}
if let Err(e) = self.resources.db.reject_completed_transaction(self.tx_id, reason).await {
if let Err(e) = self.resources.db.reject_completed_transaction(self.tx_id, reason) {
warn!(
target: LOG_TARGET,
"Failed to Cancel TxId: {} after failed sending attempt with error {:?}", self.tx_id, e
Original file line number Diff line number Diff line change
@@ -131,7 +131,6 @@ where
.resources
.db
.transaction_exists(data.tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?
{
trace!(
@@ -167,7 +166,6 @@ where
self.resources
.db
.add_pending_inbound_transaction(inbound_transaction.tx_id, inbound_transaction.clone())
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

let send_result = send_transaction_reply(
@@ -182,7 +180,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

if send_result {
@@ -237,7 +234,7 @@ where
.ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))?
.fuse();

let inbound_tx = match self.resources.db.get_pending_inbound_transaction(self.id).await {
let inbound_tx = match self.resources.db.get_pending_inbound_transaction(self.id) {
Ok(tx) => tx,
Err(_e) => {
debug!(
@@ -295,7 +292,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
}

@@ -339,7 +335,6 @@ where
Ok(_) => self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?,
Err(e) => warn!(
target: LOG_TARGET,
@@ -456,8 +451,7 @@ where

self.resources
.db
.complete_inbound_transaction(self.id, completed_transaction.clone())
.await
.complete_inbound_transaction(self.id, completed_transaction)
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

info!(
@@ -486,17 +480,13 @@ where
"Cancelling Transaction Receive Protocol (TxId: {}) due to timeout after no counterparty response", self.id
);

self.resources
.db
.cancel_pending_transaction(self.id)
.await
.map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?;
self.resources.db.cancel_pending_transaction(self.id).map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?;

self.resources
.output_manager_service
Original file line number Diff line number Diff line change
@@ -317,7 +317,6 @@ where
.resources
.db
.transaction_exists(tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?
{
let fee = sender_protocol
@@ -337,14 +336,12 @@ where
self.resources
.db
.add_pending_outbound_transaction(outbound_tx.tx_id, outbound_tx)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
}
if transaction_status == TransactionStatus::Pending {
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
}

@@ -394,7 +391,6 @@ where
.resources
.db
.get_pending_outbound_transaction(tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

if !outbound_tx.sender_protocol.is_collecting_single_signature() {
@@ -452,7 +448,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, e.into()))?
}
},
@@ -499,7 +494,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(
self.id, TransactionServiceError::from(e))
)?;
@@ -521,7 +515,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(
self.id, TransactionServiceError::from(e))
)?
@@ -594,7 +587,6 @@ where
self.resources
.db
.complete_outbound_transaction(tx_id, completed_transaction.clone())
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
info!(
target: LOG_TARGET,
@@ -615,7 +607,6 @@ where
self.resources
.db
.increment_send_count(tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

let _size = self
@@ -905,20 +896,15 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

self.resources
.db
.cancel_pending_transaction(self.id)
.await
.map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?;
self.resources.db.cancel_pending_transaction(self.id).map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?;

self.resources
.output_manager_service
Original file line number Diff line number Diff line change
@@ -112,7 +112,6 @@ where
let unconfirmed_transactions = self
.db
.fetch_unconfirmed_transactions_info()
.await
.for_protocol(self.operation_id)
.unwrap();

@@ -216,7 +215,7 @@ where
self.operation_id
);
let op_id = self.operation_id;
while let Some(last_mined_transaction) = self.db.fetch_last_mined_transaction().await.for_protocol(op_id)? {
while let Some(last_mined_transaction) = self.db.fetch_last_mined_transaction().for_protocol(op_id)? {
let mined_height = last_mined_transaction
.mined_height
.ok_or_else(|| {
@@ -414,7 +413,6 @@ where
num_confirmations >= self.config.num_confirmations_required,
status.is_faux(),
)
.await
.for_protocol(self.operation_id)?;

if num_confirmations >= self.config.num_confirmations_required {
@@ -488,12 +486,10 @@ where
num_confirmations >= self.config.num_confirmations_required,
false,
)
.await
.for_protocol(self.operation_id)?;

self.db
.abandon_coinbase_transaction(tx_id)
.await
.for_protocol(self.operation_id)?;

self.publish_event(TransactionEvent::TransactionCancelled(
@@ -510,7 +506,6 @@ where
) -> Result<(), TransactionServiceProtocolError<OperationId>> {
self.db
.set_transaction_as_unmined(tx_id)
.await
.for_protocol(self.operation_id)?;

if *status == TransactionStatus::Coinbase {
Loading