Skip to content

Commit 9cab16d

Browse files
authored
Compute eoa worker ID independent of twmq lease token (#38)
* watch more keys when resetting + log reset result * add worker_id as span field * instrument update cached method * log conflicting worker id * log cached transaction count init * handle inherently bad transactions at send time * use worker_id independent of twmq lease token * log update to optimistic nonce * fix potential race condition * don't peek recycled nonces
1 parent 172491e commit 9cab16d

File tree

7 files changed

+171
-71
lines changed

7 files changed

+171
-71
lines changed

executors/src/eoa/store/atomic.rs

Lines changed: 118 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ use crate::{
1111
EoaExecutorStore,
1212
events::EoaExecutorEvent,
1313
store::{
14-
BorrowedTransactionData, ConfirmedTransaction, EoaHealth, PendingTransaction,
15-
SubmittedTransactionDehydrated, TransactionAttempt, TransactionStoreError,
14+
BorrowedTransactionData, ConfirmedTransaction, EoaExecutorStoreKeys, EoaHealth,
15+
PendingTransaction, SubmittedTransactionDehydrated, TransactionAttempt,
16+
TransactionStoreError,
1617
borrowed::{BorrowedProcessingReport, ProcessBorrowedTransactions, SubmissionResult},
1718
pending::{
1819
MovePendingToBorrowedWithIncrementedNonces, MovePendingToBorrowedWithRecycledNonces,
@@ -140,17 +141,28 @@ impl AtomicEoaExecutorStore {
140141
///
141142
/// The transactions must have sequential nonces starting from the current optimistic count.
142143
/// This operation validates nonce ordering and atomically moves all transactions.
144+
#[tracing::instrument(skip_all, fields(transactions = ?transactions))]
143145
pub async fn atomic_move_pending_to_borrowed_with_incremented_nonces(
144146
&self,
145147
transactions: &[BorrowedTransactionData],
146148
) -> Result<usize, TransactionStoreError> {
147-
self.execute_with_watch_and_retry(&MovePendingToBorrowedWithIncrementedNonces {
148-
transactions,
149-
keys: &self.keys,
150-
eoa: self.eoa,
151-
chain_id: self.chain_id,
152-
})
153-
.await
149+
let (moved_count, new_optimistic_tx_count) = self
150+
.execute_with_watch_and_retry(&MovePendingToBorrowedWithIncrementedNonces {
151+
transactions,
152+
keys: &self.keys,
153+
eoa: self.eoa,
154+
chain_id: self.chain_id,
155+
})
156+
.await?;
157+
158+
if let Some(new_optimistic_tx_count) = new_optimistic_tx_count {
159+
tracing::info!(
160+
new_optimistic_tx_count = new_optimistic_tx_count,
161+
"Updated optimistic transaction count to {new_optimistic_tx_count}"
162+
);
163+
}
164+
165+
Ok(moved_count)
154166
}
155167

156168
/// Atomically move multiple pending transactions to borrowed state using recycled nonces
@@ -393,6 +405,7 @@ impl AtomicEoaExecutorStore {
393405
/// Synchronize nonces with the chain
394406
///
395407
/// Part of standard nonce management flow, called in the confirm stage when chain nonce advances, and we need to update our cached nonce
408+
#[tracing::instrument(skip_all, fields(current_chain_tx_count = current_chain_tx_count))]
396409
pub async fn update_cached_transaction_count(
397410
&self,
398411
current_chain_tx_count: u64,
@@ -502,47 +515,35 @@ impl AtomicEoaExecutorStore {
502515
///
503516
/// This is called when we have too many recycled nonces and detect something wrong
504517
/// We want to start fresh, with the chain nonce as the new optimistic nonce
518+
#[tracing::instrument(skip_all)]
505519
pub async fn reset_nonces(
506520
&self,
507521
current_chain_tx_count: u64,
508522
) -> Result<(), TransactionStoreError> {
509-
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
510-
511-
let current_health = self.get_eoa_health().await?;
512-
513-
// Prepare health update if health data exists
514-
let health_update = if let Some(mut health) = current_health {
515-
health.nonce_resets.push(now);
516-
Some(serde_json::to_string(&health)?)
517-
} else {
518-
None
523+
let reset_tx = ResetNoncesTransaction {
524+
keys: &self.store.keys,
525+
current_chain_tx_count,
519526
};
520527

521-
self.with_lock_check(|pipeline| {
522-
let optimistic_key = self.optimistic_transaction_count_key_name();
523-
let cached_nonce_key = self.last_transaction_count_key_name();
524-
let recycled_key = self.recycled_nonces_zset_name();
525-
let manual_reset_key = self.manual_reset_key_name();
528+
let reset_result = self.execute_with_watch_and_retry(&reset_tx).await;
526529

527-
// Update health data only if it exists
528-
if let Some(ref health_json) = health_update {
529-
let health_key = self.eoa_health_key_name();
530-
pipeline.set(&health_key, health_json);
530+
match &reset_result {
531+
Ok(()) => {
532+
tracing::info!(
533+
current_chain_tx_count = current_chain_tx_count,
534+
"Reset nonces successfully"
535+
);
531536
}
537+
Err(e) => {
538+
tracing::error!(
539+
current_chain_tx_count = current_chain_tx_count,
540+
error = ?e,
541+
"Failed to reset nonces"
542+
);
543+
}
544+
}
532545

533-
// Reset the optimistic nonce
534-
pipeline.set(&optimistic_key, current_chain_tx_count);
535-
536-
// Reset the cached nonce
537-
pipeline.set(&cached_nonce_key, current_chain_tx_count);
538-
539-
// Reset the recycled nonces
540-
pipeline.del(recycled_key);
541-
542-
// Delete the manual reset key
543-
pipeline.del(&manual_reset_key);
544-
})
545-
.await
546+
reset_result
546547
}
547548

548549
/// Fail a transaction that's in the pending state (remove from pending and fail)
@@ -654,3 +655,79 @@ impl AtomicEoaExecutorStore {
654655
.await
655656
}
656657
}
658+
659+
/// SafeRedisTransaction implementation for resetting nonces
660+
pub struct ResetNoncesTransaction<'a> {
661+
pub keys: &'a EoaExecutorStoreKeys,
662+
pub current_chain_tx_count: u64,
663+
}
664+
665+
impl SafeRedisTransaction for ResetNoncesTransaction<'_> {
666+
type ValidationData = Option<String>;
667+
type OperationResult = ();
668+
669+
fn name(&self) -> &str {
670+
"reset nonces"
671+
}
672+
673+
fn watch_keys(&self) -> Vec<String> {
674+
vec![
675+
self.keys.optimistic_transaction_count_key_name(),
676+
self.keys.last_transaction_count_key_name(),
677+
self.keys.recycled_nonces_zset_name(),
678+
self.keys.manual_reset_key_name(),
679+
]
680+
}
681+
682+
async fn validation(
683+
&self,
684+
_conn: &mut ConnectionManager,
685+
store: &EoaExecutorStore,
686+
) -> Result<Self::ValidationData, TransactionStoreError> {
687+
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
688+
689+
// Get current health data to prepare update
690+
let current_health = store.get_eoa_health().await?;
691+
let health_update = if let Some(mut health) = current_health {
692+
health.nonce_resets.push(now);
693+
// Keep only the last 5 nonce reset timestamps
694+
if health.nonce_resets.len() > 5 {
695+
health.nonce_resets.drain(0..health.nonce_resets.len() - 5);
696+
}
697+
Some(serde_json::to_string(&health)?)
698+
} else {
699+
None
700+
};
701+
702+
Ok(health_update)
703+
}
704+
705+
fn operation(
706+
&self,
707+
pipeline: &mut twmq::redis::Pipeline,
708+
health_update: Self::ValidationData,
709+
) -> Self::OperationResult {
710+
let optimistic_key = self.keys.optimistic_transaction_count_key_name();
711+
let cached_nonce_key = self.keys.last_transaction_count_key_name();
712+
let recycled_key = self.keys.recycled_nonces_zset_name();
713+
let manual_reset_key = self.keys.manual_reset_key_name();
714+
715+
// Update health data only if it exists
716+
if let Some(ref health_json) = health_update {
717+
let health_key = self.keys.eoa_health_key_name();
718+
pipeline.set(&health_key, health_json);
719+
}
720+
721+
// Reset the optimistic nonce
722+
pipeline.set(&optimistic_key, self.current_chain_tx_count);
723+
724+
// Reset the cached nonce
725+
pipeline.set(&cached_nonce_key, self.current_chain_tx_count);
726+
727+
// Reset the recycled nonces
728+
pipeline.del(recycled_key);
729+
730+
// Delete the manual reset key
731+
pipeline.del(&manual_reset_key);
732+
}
733+
}

executors/src/eoa/store/mod.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,10 @@ impl EoaExecutorStoreKeys {
284284
"{ns}:eoa_executor:pending_manual_reset:{}:{}",
285285
self.chain_id, self.eoa
286286
),
287-
None => format!("eoa_executor:pending_manual_reset:{}:{}", self.chain_id, self.eoa),
287+
None => format!(
288+
"eoa_executor:pending_manual_reset:{}:{}",
289+
self.chain_id, self.eoa
290+
),
288291
}
289292
}
290293
}
@@ -416,12 +419,15 @@ impl EoaExecutorStore {
416419
worker_id: worker_id.to_string(),
417420
});
418421
}
422+
let conflict_worker_id = conn.get::<_, Option<String>>(&lock_key).await?;
423+
419424
// Lock exists, forcefully take it over
420425
tracing::warn!(
421426
eoa = ?self.eoa,
422427
chain_id = self.chain_id,
423428
worker_id = worker_id,
424-
"Forcefully taking over EOA lock from stalled worker"
429+
conflict_worker_id = ?conflict_worker_id,
430+
"Forcefully taking over EOA lock from stalled worker."
425431
);
426432
// Force set - no expiry, only released by explicit takeover
427433
let _: () = conn.set(&lock_key, worker_id).await?;
@@ -504,15 +510,6 @@ impl EoaExecutorStore {
504510
}
505511
}
506512

507-
/// Peek recycled nonces without removing them
508-
pub async fn peek_recycled_nonces(&self) -> Result<Vec<u64>, TransactionStoreError> {
509-
let recycled_key = self.recycled_nonces_zset_name();
510-
let mut conn = self.redis.clone();
511-
512-
let nonces: Vec<u64> = conn.zrange(&recycled_key, 0, -1).await?;
513-
Ok(nonces)
514-
}
515-
516513
/// Peek at pending transactions without removing them (safe for planning)
517514
pub async fn peek_pending_transactions(
518515
&self,

executors/src/eoa/store/pending.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub struct MovePendingToBorrowedWithIncrementedNonces<'a> {
3131

3232
impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> {
3333
type ValidationData = Vec<String>; // serialized borrowed transactions
34-
type OperationResult = usize; // number of transactions processed
34+
type OperationResult = (usize, Option<u64>); // number of transactions processed, new optimistic nonce
3535

3636
fn name(&self) -> &str {
3737
"pending->borrowed with incremented nonces"
@@ -59,10 +59,11 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> {
5959
let current_optimistic: Option<u64> = conn
6060
.get(self.keys.optimistic_transaction_count_key_name())
6161
.await?;
62-
let current_optimistic_nonce = current_optimistic.ok_or(TransactionStoreError::NonceSyncRequired {
63-
eoa: self.eoa,
64-
chain_id: self.chain_id,
65-
})?;
62+
let current_optimistic_nonce =
63+
current_optimistic.ok_or(TransactionStoreError::NonceSyncRequired {
64+
eoa: self.eoa,
65+
chain_id: self.chain_id,
66+
})?;
6667

6768
// Extract and validate nonces
6869
let mut nonces: Vec<u64> = self
@@ -134,13 +135,17 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> {
134135
pipeline.hset(&borrowed_key, &tx.transaction_id, borrowed_json);
135136
}
136137

137-
// Update optimistic tx count to highest nonce + 1
138-
if let Some(last_tx) = self.transactions.last() {
139-
let new_optimistic_tx_count = last_tx.signed_transaction.nonce() + 1;
138+
let new_optimistic_tx_count = self
139+
.transactions
140+
.last()
141+
.map(|tx| tx.signed_transaction.nonce() + 1);
142+
143+
// Update optimistic tx count to highest nonce + 1, if we have a new optimistic nonce
144+
if let Some(new_optimistic_tx_count) = new_optimistic_tx_count {
140145
pipeline.set(&optimistic_key, new_optimistic_tx_count);
141146
}
142147

143-
self.transactions.len()
148+
(self.transactions.len(), new_optimistic_tx_count)
144149
}
145150
}
146151

executors/src/eoa/worker/confirm.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub struct ConfirmedTransactionWithRichReceipt {
2727

2828
impl<C: Chain> EoaExecutorWorker<C> {
2929
// ========== CONFIRM FLOW ==========
30-
#[tracing::instrument(skip_all)]
30+
#[tracing::instrument(skip_all, fields(worker_id = self.store.worker_id))]
3131
pub async fn confirm_flow(&self) -> Result<CleanupReport, EoaExecutorWorkerError> {
3232
// Get fresh on-chain transaction count
3333
let current_chain_transaction_count = self
@@ -53,6 +53,10 @@ impl<C: Chain> EoaExecutorWorker<C> {
5353
let cached_transaction_count = match self.store.get_cached_transaction_count().await {
5454
Err(e) => match e {
5555
TransactionStoreError::NonceSyncRequired { .. } => {
56+
tracing::warn!(
57+
cached_transaction_count = current_chain_transaction_count,
58+
"Nonce sync required, store was uninitialized, updating cached transaction count with current chain transaction count"
59+
);
5660
self.store
5761
.update_cached_transaction_count(current_chain_transaction_count)
5862
.await?;

executors/src/eoa/worker/error.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,9 @@ impl UserCancellable for EoaExecutorWorkerError {
140140
// ========== SIMPLE ERROR CLASSIFICATION ==========
141141
#[derive(Debug)]
142142
pub enum SendErrorClassification {
143-
PossiblySent, // "nonce too low", "already known" etc
144-
DeterministicFailure, // Invalid signature, malformed tx, insufficient funds etc
143+
PossiblySent, // "nonce too low", "already known" etc
144+
DeterministicFailure, // Invalid signature, malformed tx, insufficient funds etc
145+
DeterministicFailureNonRetryable, // Non-retryable deterministic failure
145146
}
146147

147148
#[derive(PartialEq, Eq, Debug)]
@@ -185,11 +186,14 @@ pub fn classify_send_error(
185186
if error_str.contains("malformed")
186187
|| error_str.contains("gas limit")
187188
|| error_str.contains("intrinsic gas too low")
188-
|| error_str.contains("oversized")
189189
{
190190
return SendErrorClassification::DeterministicFailure;
191191
}
192192

193+
if error_str.contains("oversized") {
194+
return SendErrorClassification::DeterministicFailureNonRetryable;
195+
}
196+
193197
tracing::warn!(
194198
"Unknown send error: {}. PLEASE REPORT FOR ADDING CORRECT CLASSIFICATION [NOTIFY]",
195199
error_str
@@ -305,6 +309,15 @@ impl SubmissionResult {
305309
transaction: borrowed_transaction.clone().into(),
306310
}
307311
}
312+
SendErrorClassification::DeterministicFailureNonRetryable => SubmissionResult {
313+
result: SubmissionResultType::Fail(
314+
EoaExecutorWorkerError::TransactionSendError {
315+
message: format!("Transaction send failed: {rpc_error}"),
316+
inner_error: rpc_error.to_engine_error(chain),
317+
},
318+
),
319+
transaction: borrowed_transaction.clone().into(),
320+
},
308321
}
309322
}
310323
}

0 commit comments

Comments
 (0)