Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datastore: implement barrier if we see "in failed tx" error. #1418

Merged
merged 1 commit into from
May 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 126 additions & 15 deletions aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,13 @@ use std::{
mem::size_of,
ops::RangeInclusive,
pin::Pin,
sync::atomic::{AtomicBool, Ordering},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
time::{Duration as StdDuration, Instant},
};
use tokio::try_join;
use tokio::{sync::Barrier, try_join};
use tokio_postgres::{error::SqlState, row::RowIndex, IsolationLevel, Row, Statement, ToStatement};
use tracing::error;
use url::Url;
Expand Down Expand Up @@ -246,9 +249,10 @@ impl<C: Clock> Datastore<C> {
};
let tx = Transaction {
raw_tx,
retry: AtomicBool::new(false),
crypter: &self.crypter,
clock: &self.clock,
retry: AtomicBool::new(false),
op_group: Mutex::new(Arc::new(Mutex::new(OperationGroup::Running(0)))),
};

// Run user-provided function with the transaction, then commit/rollback based on result.
Expand Down Expand Up @@ -304,26 +308,133 @@ fn check_error<T>(
rslt: Result<T, tokio_postgres::Error>,
) -> Result<T, tokio_postgres::Error> {
if let Err(err) = &rslt {
if let Some(code) = err.code() {
if code == &SqlState::T_R_SERIALIZATION_FAILURE
|| code == &SqlState::T_R_DEADLOCK_DETECTED
{
retry.store(true, Ordering::Relaxed);
}
if is_retryable_error(err) {
retry.store(true, Ordering::Relaxed);
}
}
rslt
}

fn is_retryable_error(err: &tokio_postgres::Error) -> bool {
err.code().map_or(false, |code| {
code == &SqlState::T_R_SERIALIZATION_FAILURE || code == &SqlState::T_R_DEADLOCK_DETECTED
})
}

fn is_transaction_abort_error(err: &tokio_postgres::Error) -> bool {
err.code()
.map_or(false, |code| code == &SqlState::IN_FAILED_SQL_TRANSACTION)
}

/// Transaction represents an ongoing datastore transaction.
pub struct Transaction<'a, C: Clock> {
raw_tx: deadpool_postgres::Transaction<'a>,
retry: AtomicBool,
crypter: &'a Crypter,
clock: &'a C,

retry: AtomicBool,
op_group: Mutex<Arc<Mutex<OperationGroup>>>, // locking discipline: outer lock before inner lock
}

enum OperationGroup {
Running(usize), // current operation count
Draining(Arc<Barrier>), // barrier to wait upon to complete drain
}

impl<C: Clock> Transaction<'_, C> {
// For some error modes, Postgres will return an error to the caller & then fail all future
// statements within the same transaction with an "in failed SQL transaction" error. This
// effectively means one statement will receive a "root cause" error and then all later
// statements will receive an "in failed SQL transaction" error. In a pipelined scenario, if our
// code is processing the results of these statements concurrently--e.g. because they are part
// of a `try_join!`/`try_join_all` group--we might receive & handle one of the "in failed SQL
// transaction" errors before we handle the "root cause" error, which might cause the "root
// cause" error's future to be cancelled before we evaluate it. If the "root cause" error would
// trigger a retry, this would mean we would skip a DB-based retry when one was warranted.
//
// To fix this problem, we (internally) wrap all direct DB operations in `run_op`. This function
// groups concurrent database operations into "operation groups", which allow us to wait for all
// operations in the group to complete (this waiting operation is called "draining"). If we ever
// observe an "in failed SQL transaction" error, we drain the operation group before returning.
// Under the assumption that the "root cause" error is concurrent with the "in failed SQL
// transactions" errors, this guarantees we will evaluate the "root cause" error for retry
// before any errors make their way out of the transaction code.
async fn run_op<T>(
&self,
op: impl Future<Output = Result<T, tokio_postgres::Error>>,
) -> Result<T, tokio_postgres::Error> {
// Enter.
//
// Before we can run the operation, we need to join this operation into an operation group.
// Retrieve the current operation group & join it.
let op_group = {
let mut tx_op_group = self.op_group.lock().unwrap();
let new_tx_op_group = {
let mut op_group = tx_op_group.lock().unwrap();
match &*op_group {
OperationGroup::Running(op_count) => {
// If the current op group is running, join it by incrementing the operation
// count.
*op_group = OperationGroup::Running(*op_count + 1);
None
}

OperationGroup::Draining { .. } => {
// If the current op group is draining, we can't join it; instead, create a
// new op group to join, and store it as the transaction's current operation
// group.
Some(Arc::new(Mutex::new(OperationGroup::Running(1))))
}
}
};
if let Some(new_tx_op_group) = new_tx_op_group {
*tx_op_group = new_tx_op_group;
}
Arc::clone(&tx_op_group)
};

// Run operation, and check if error triggers a retry or requires a drain.
let rslt = check_error(&self.retry, op.await);
let needs_drain = rslt
.as_ref()
.err()
.map_or(false, is_transaction_abort_error);

// Exit.
//
// Before we are done running the operation, we have to leave the operation group. If the
// operation group is running, we just need to decrement the count. If the operation group
// is draining (because this or another operation encountered an error which requires a
// drain), we have to wait until all operations in the group are ready to finish.
let barrier = {
let mut op_group = op_group.lock().unwrap();
match &*op_group {
OperationGroup::Running(op_count) => {
if needs_drain {
// If the operation group is running & we have determined we need to drain
// the operation group, change the operation group to Draining & wait on the
// barrier.
let barrier = Arc::new(Barrier::new(*op_count));
*op_group = OperationGroup::Draining(Arc::clone(&barrier));
Some(barrier)
} else {
// If the operation group is running & we don't need a drain, just decrement
// the operation count.
*op_group = OperationGroup::Running(op_count - 1);
None
}
}

// If the operation group is already draining, wait on the barrier.
OperationGroup::Draining(barrier) => Some(Arc::clone(barrier)),
}
};
if let Some(barrier) = barrier {
barrier.wait().await;
}
rslt
}

async fn execute<T>(
&self,
statement: &T,
Expand All @@ -332,11 +443,11 @@ impl<C: Clock> Transaction<'_, C> {
where
T: ?Sized + ToStatement,
{
check_error(&self.retry, self.raw_tx.execute(statement, params).await)
self.run_op(self.raw_tx.execute(statement, params)).await
}

async fn prepare_cached(&self, query: &str) -> Result<Statement, tokio_postgres::Error> {
check_error(&self.retry, self.raw_tx.prepare_cached(query).await)
self.run_op(self.raw_tx.prepare_cached(query)).await
}

async fn query<T>(
Expand All @@ -347,7 +458,7 @@ impl<C: Clock> Transaction<'_, C> {
where
T: ?Sized + ToStatement,
{
check_error(&self.retry, self.raw_tx.query(statement, params).await)
self.run_op(self.raw_tx.query(statement, params)).await
}

async fn query_one<T>(
Expand All @@ -358,7 +469,7 @@ impl<C: Clock> Transaction<'_, C> {
where
T: ?Sized + ToStatement,
{
check_error(&self.retry, self.raw_tx.query_one(statement, params).await)
self.run_op(self.raw_tx.query_one(statement, params)).await
}

async fn query_opt<T>(
Expand All @@ -369,7 +480,7 @@ impl<C: Clock> Transaction<'_, C> {
where
T: ?Sized + ToStatement,
{
check_error(&self.retry, self.raw_tx.query_opt(statement, params).await)
self.run_op(self.raw_tx.query_opt(statement, params)).await
}

/// Calling this method will force the transaction to eventually be rolled back and retried; all
Expand Down