Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: let sql in wal mode provide async db, not app level spawn blocking (transaction service) #4597

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
.await
.ok_or_else(|| TransactionServiceProtocolError::new(self.tx_id, TransactionServiceError::Shutdown))?;

let completed_tx = match self.resources.db.get_completed_transaction(self.tx_id).await {
let completed_tx = match self.resources.db.get_completed_transaction(self.tx_id) {
Ok(tx) => tx,
Err(e) => {
error!(
Expand Down Expand Up @@ -275,7 +275,6 @@ where
self.resources
.db
.broadcast_completed_transaction(self.tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.tx_id, TransactionServiceError::from(e)))?;
let _size = self
.resources
Expand Down Expand Up @@ -430,7 +429,7 @@ where
"Failed to Cancel outputs for TxId: {} after failed sending attempt with error {:?}", self.tx_id, e
);
}
if let Err(e) = self.resources.db.reject_completed_transaction(self.tx_id, reason).await {
if let Err(e) = self.resources.db.reject_completed_transaction(self.tx_id, reason) {
warn!(
target: LOG_TARGET,
"Failed to Cancel TxId: {} after failed sending attempt with error {:?}", self.tx_id, e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ where
.resources
.db
.transaction_exists(data.tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?
{
trace!(
Expand Down Expand Up @@ -167,7 +166,6 @@ where
self.resources
.db
.add_pending_inbound_transaction(inbound_transaction.tx_id, inbound_transaction.clone())
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

let send_result = send_transaction_reply(
Expand All @@ -182,7 +180,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

if send_result {
Expand Down Expand Up @@ -237,7 +234,7 @@ where
.ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))?
.fuse();

let inbound_tx = match self.resources.db.get_pending_inbound_transaction(self.id).await {
let inbound_tx = match self.resources.db.get_pending_inbound_transaction(self.id) {
Ok(tx) => tx,
Err(_e) => {
debug!(
Expand Down Expand Up @@ -295,7 +292,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
}

Expand Down Expand Up @@ -339,7 +335,6 @@ where
Ok(_) => self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?,
Err(e) => warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -456,8 +451,7 @@ where

self.resources
.db
.complete_inbound_transaction(self.id, completed_transaction.clone())
.await
.complete_inbound_transaction(self.id, completed_transaction)
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

info!(
Expand Down Expand Up @@ -486,17 +480,13 @@ where
"Cancelling Transaction Receive Protocol (TxId: {}) due to timeout after no counterparty response", self.id
);

self.resources
.db
.cancel_pending_transaction(self.id)
.await
.map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?;
self.resources.db.cancel_pending_transaction(self.id).map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?;

self.resources
.output_manager_service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ where
.resources
.db
.transaction_exists(tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?
{
let fee = sender_protocol
Expand All @@ -337,14 +336,12 @@ where
self.resources
.db
.add_pending_outbound_transaction(outbound_tx.tx_id, outbound_tx)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
}
if transaction_status == TransactionStatus::Pending {
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
}

Expand Down Expand Up @@ -394,7 +391,6 @@ where
.resources
.db
.get_pending_outbound_transaction(tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

if !outbound_tx.sender_protocol.is_collecting_single_signature() {
Expand Down Expand Up @@ -452,7 +448,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, e.into()))?
}
},
Expand Down Expand Up @@ -499,7 +494,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(
self.id, TransactionServiceError::from(e))
)?;
Expand All @@ -521,7 +515,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(
self.id, TransactionServiceError::from(e))
)?
Expand Down Expand Up @@ -594,7 +587,6 @@ where
self.resources
.db
.complete_outbound_transaction(tx_id, completed_transaction.clone())
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
info!(
target: LOG_TARGET,
Expand All @@ -615,7 +607,6 @@ where
self.resources
.db
.increment_send_count(tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

let _size = self
Expand Down Expand Up @@ -905,20 +896,15 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

self.resources
.db
.cancel_pending_transaction(self.id)
.await
.map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?;
self.resources.db.cancel_pending_transaction(self.id).map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?;

self.resources
.output_manager_service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ where
let unconfirmed_transactions = self
.db
.fetch_unconfirmed_transactions_info()
.await
.for_protocol(self.operation_id)
.unwrap();

Expand Down Expand Up @@ -216,7 +215,7 @@ where
self.operation_id
);
let op_id = self.operation_id;
while let Some(last_mined_transaction) = self.db.fetch_last_mined_transaction().await.for_protocol(op_id)? {
while let Some(last_mined_transaction) = self.db.fetch_last_mined_transaction().for_protocol(op_id)? {
let mined_height = last_mined_transaction
.mined_height
.ok_or_else(|| {
Expand Down Expand Up @@ -414,7 +413,6 @@ where
num_confirmations >= self.config.num_confirmations_required,
status.is_faux(),
)
.await
.for_protocol(self.operation_id)?;

if num_confirmations >= self.config.num_confirmations_required {
Expand Down Expand Up @@ -488,12 +486,10 @@ where
num_confirmations >= self.config.num_confirmations_required,
false,
)
.await
.for_protocol(self.operation_id)?;

self.db
.abandon_coinbase_transaction(tx_id)
.await
.for_protocol(self.operation_id)?;

self.publish_event(TransactionEvent::TransactionCancelled(
Expand All @@ -510,7 +506,6 @@ where
) -> Result<(), TransactionServiceProtocolError<OperationId>> {
self.db
.set_transaction_as_unmined(tx_id)
.await
.for_protocol(self.operation_id)?;

if *status == TransactionStatus::Coinbase {
Expand Down
Loading