Skip to content

Commit

Permalink
Datastore: implement barrier if we see "in failed tx" error. (divviup…
Browse files Browse the repository at this point in the history
…#1418)

For some error modes, Postgres will return an error to the caller & then
fail all future statements within the same transaction with an "in
failed SQL transaction" error. This effectively means one statement will
receive a "root cause" error and then all later statements will receive
an "in failed SQL transaction" error. In a pipelined scenario, if our
code is processing the results of these statements concurrently--e.g.
because they are part of a `try_join!`/`try_join_all` group--we might
receive & handle one of the "in failed SQL transaction" errors before we
handle the "root cause" error, which might cause the "root cause"
error's future to be cancelled before we evaluate it. If the "root
cause" error would trigger a retry, this would mean we would skip a
DB-based retry when one was warranted.

To fix this problem, we (internally) wrap all direct DB operations in
`run_op`. This function groups concurrent database operations into
"operation groups", which allow us to wait for all operations in the
group to complete (this waiting operation is called "draining"). If we
ever observe an "in failed SQL transaction" error, we drain the
operation group before returning.  Under the assumption that the "root
cause" error is concurrent with the "in failed SQL transactions" errors,
this guarantees we will evaluate the "root cause" error for retry before
any errors make their way out of the transaction code.
  • Loading branch information
branlwyd authored May 27, 2023
1 parent 8c8a741 commit 7ad8faa
Showing 1 changed file with 126 additions and 15 deletions.
141 changes: 126 additions & 15 deletions aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,13 @@ use std::{
mem::size_of,
ops::RangeInclusive,
pin::Pin,
sync::atomic::{AtomicBool, Ordering},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
time::{Duration as StdDuration, Instant},
};
use tokio::try_join;
use tokio::{sync::Barrier, try_join};
use tokio_postgres::{error::SqlState, row::RowIndex, IsolationLevel, Row, Statement, ToStatement};
use tracing::error;
use url::Url;
Expand Down Expand Up @@ -246,9 +249,10 @@ impl<C: Clock> Datastore<C> {
};
let tx = Transaction {
raw_tx,
retry: AtomicBool::new(false),
crypter: &self.crypter,
clock: &self.clock,
retry: AtomicBool::new(false),
op_group: Mutex::new(Arc::new(Mutex::new(OperationGroup::Running(0)))),
};

// Run user-provided function with the transaction, then commit/rollback based on result.
Expand Down Expand Up @@ -304,26 +308,133 @@ fn check_error<T>(
rslt: Result<T, tokio_postgres::Error>,
) -> Result<T, tokio_postgres::Error> {
if let Err(err) = &rslt {
if let Some(code) = err.code() {
if code == &SqlState::T_R_SERIALIZATION_FAILURE
|| code == &SqlState::T_R_DEADLOCK_DETECTED
{
retry.store(true, Ordering::Relaxed);
}
if is_retryable_error(err) {
retry.store(true, Ordering::Relaxed);
}
}
rslt
}

fn is_retryable_error(err: &tokio_postgres::Error) -> bool {
err.code().map_or(false, |code| {
code == &SqlState::T_R_SERIALIZATION_FAILURE || code == &SqlState::T_R_DEADLOCK_DETECTED
})
}

fn is_transaction_abort_error(err: &tokio_postgres::Error) -> bool {
err.code()
.map_or(false, |code| code == &SqlState::IN_FAILED_SQL_TRANSACTION)
}

/// Transaction represents an ongoing datastore transaction.
pub struct Transaction<'a, C: Clock> {
raw_tx: deadpool_postgres::Transaction<'a>,
retry: AtomicBool,
crypter: &'a Crypter,
clock: &'a C,

retry: AtomicBool,
op_group: Mutex<Arc<Mutex<OperationGroup>>>, // locking discipline: outer lock before inner lock
}

enum OperationGroup {
Running(usize), // current operation count
Draining(Arc<Barrier>), // barrier to wait upon to complete drain
}

impl<C: Clock> Transaction<'_, C> {
// For some error modes, Postgres will return an error to the caller & then fail all future
// statements within the same transaction with an "in failed SQL transaction" error. This
// effectively means one statement will receive a "root cause" error and then all later
// statements will receive an "in failed SQL transaction" error. In a pipelined scenario, if our
// code is processing the results of these statements concurrently--e.g. because they are part
// of a `try_join!`/`try_join_all` group--we might receive & handle one of the "in failed SQL
// transaction" errors before we handle the "root cause" error, which might cause the "root
// cause" error's future to be cancelled before we evaluate it. If the "root cause" error would
// trigger a retry, this would mean we would skip a DB-based retry when one was warranted.
//
// To fix this problem, we (internally) wrap all direct DB operations in `run_op`. This function
// groups concurrent database operations into "operation groups", which allow us to wait for all
// operations in the group to complete (this waiting operation is called "draining"). If we ever
// observe an "in failed SQL transaction" error, we drain the operation group before returning.
// Under the assumption that the "root cause" error is concurrent with the "in failed SQL
// transactions" errors, this guarantees we will evaluate the "root cause" error for retry
// before any errors make their way out of the transaction code.
async fn run_op<T>(
&self,
op: impl Future<Output = Result<T, tokio_postgres::Error>>,
) -> Result<T, tokio_postgres::Error> {
// Enter.
//
// Before we can run the operation, we need to join this operation into an operation group.
// Retrieve the current operation group & join it.
let op_group = {
let mut tx_op_group = self.op_group.lock().unwrap();
let new_tx_op_group = {
let mut op_group = tx_op_group.lock().unwrap();
match &*op_group {
OperationGroup::Running(op_count) => {
// If the current op group is running, join it by incrementing the operation
// count.
*op_group = OperationGroup::Running(*op_count + 1);
None
}

OperationGroup::Draining { .. } => {
// If the current op group is draining, we can't join it; instead, create a
// new op group to join, and store it as the transaction's current operation
// group.
Some(Arc::new(Mutex::new(OperationGroup::Running(1))))
}
}
};
if let Some(new_tx_op_group) = new_tx_op_group {
*tx_op_group = new_tx_op_group;
}
Arc::clone(&tx_op_group)
};

// Run operation, and check if error triggers a retry or requires a drain.
let rslt = check_error(&self.retry, op.await);
let needs_drain = rslt
.as_ref()
.err()
.map_or(false, is_transaction_abort_error);

// Exit.
//
// Before we are done running the operation, we have to leave the operation group. If the
// operation group is running, we just need to decrement the count. If the operation group
// is draining (because this or another operation encountered an error which requires a
// drain), we have to wait until all operations in the group are ready to finish.
let barrier = {
let mut op_group = op_group.lock().unwrap();
match &*op_group {
OperationGroup::Running(op_count) => {
if needs_drain {
// If the operation group is running & we have determined we need to drain
// the operation group, change the operation group to Draining & wait on the
// barrier.
let barrier = Arc::new(Barrier::new(*op_count));
*op_group = OperationGroup::Draining(Arc::clone(&barrier));
Some(barrier)
} else {
// If the operation group is running & we don't need a drain, just decrement
// the operation count.
*op_group = OperationGroup::Running(op_count - 1);
None
}
}

// If the operation group is already draining, wait on the barrier.
OperationGroup::Draining(barrier) => Some(Arc::clone(barrier)),
}
};
if let Some(barrier) = barrier {
barrier.wait().await;
}
rslt
}

async fn execute<T>(
&self,
statement: &T,
Expand All @@ -332,11 +443,11 @@ impl<C: Clock> Transaction<'_, C> {
where
T: ?Sized + ToStatement,
{
check_error(&self.retry, self.raw_tx.execute(statement, params).await)
self.run_op(self.raw_tx.execute(statement, params)).await
}

async fn prepare_cached(&self, query: &str) -> Result<Statement, tokio_postgres::Error> {
check_error(&self.retry, self.raw_tx.prepare_cached(query).await)
self.run_op(self.raw_tx.prepare_cached(query)).await
}

async fn query<T>(
Expand All @@ -347,7 +458,7 @@ impl<C: Clock> Transaction<'_, C> {
where
T: ?Sized + ToStatement,
{
check_error(&self.retry, self.raw_tx.query(statement, params).await)
self.run_op(self.raw_tx.query(statement, params)).await
}

async fn query_one<T>(
Expand All @@ -358,7 +469,7 @@ impl<C: Clock> Transaction<'_, C> {
where
T: ?Sized + ToStatement,
{
check_error(&self.retry, self.raw_tx.query_one(statement, params).await)
self.run_op(self.raw_tx.query_one(statement, params)).await
}

async fn query_opt<T>(
Expand All @@ -369,7 +480,7 @@ impl<C: Clock> Transaction<'_, C> {
where
T: ?Sized + ToStatement,
{
check_error(&self.retry, self.raw_tx.query_opt(statement, params).await)
self.run_op(self.raw_tx.query_opt(statement, params)).await
}

/// Calling this method will force the transaction to eventually be rolled back and retried; all
Expand Down

0 comments on commit 7ad8faa

Please sign in to comment.