Skip to content

Commit

Permalink
Replace dependency backoff with backon.
Browse files Browse the repository at this point in the history
`backoff` is unmaintained and depends on `instant`, for which a security advisory got issued.
  • Loading branch information
mbfm committed Nov 13, 2024
1 parent 8028b39 commit 46881a8
Show file tree
Hide file tree
Showing 13 changed files with 97 additions and 101 deletions.
20 changes: 8 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async-trait = "0.1.77"
axum = "0.6.20"
axum-server = "0.5.1"
axum-server-dual-protocol = "0.5.2"
backoff = "0.4.0"
backon = { version = "1.2.0", default-features = false }
base64 = "0.22.1"
brotli = "6.0.0"
cargo_metadata = "0.18.1"
Expand Down
2 changes: 1 addition & 1 deletion opendut-carl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ anyhow = { workspace = true }
axum = { workspace = true }
axum-server = { workspace = true, features = ["tls-rustls"] }
axum-server-dual-protocol = { workspace = true }
backoff = { workspace = true, features = ["tokio"] }
backon = { workspace = true, features = ["tokio-sleep"] }
base64 = { workspace = true }
chrono = { workspace = true }
config = { workspace = true }
Expand Down
42 changes: 24 additions & 18 deletions opendut-carl/src/persistence/database/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use backon::Retryable;
use diesel::{Connection as _, ConnectionError, PgConnection};
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use backoff::ExponentialBackoff;
use tracing::{debug, info, warn};
use crate::resources::storage::DatabaseConnectInfo;

Expand All @@ -18,24 +18,30 @@ pub async fn connect(database_connect_info: &DatabaseConnectInfo) -> Result<PgCo
url
};

let mut connection = backoff::future::retry(ExponentialBackoff::default(), || async {
let mut connection = (|| async {
PgConnection::establish(confidential_url.as_str())
.map_err(|cause| match &cause {
ConnectionError::BadConnection(_) => {
warn!("Connecting to database at {url} failed. Retrying.");
backoff::Error::transient(ConnectError::Diesel(cause))
}
ConnectionError::CouldntSetupConfiguration(_)
| ConnectionError::InvalidConnectionUrl(_)
| ConnectionError::InvalidCString(_) => {
backoff::Error::permanent(ConnectError::Diesel(cause))
}
other => {
warn!("Unhandled Diesel ConnectionError variant: {other:?}");
backoff::Error::permanent(ConnectError::Diesel(cause))
}
})
}).await?;
})
.retry(backon::ExponentialBuilder::default())
.when(|cause| match &cause {
ConnectionError::BadConnection(_) => {
true
}
ConnectionError::CouldntSetupConfiguration(_)
| ConnectionError::InvalidConnectionUrl(_)
| ConnectionError::InvalidCString(_) => {
false
}
other => {
warn!("Unhandled Diesel ConnectionError variant: {other:?}");
false
}
})
.notify(|cause, after| {
warn!("Connecting to database at {url} failed. Retrying in {after:?}.\n {cause}");
})
.await
.map_err(ConnectError::Diesel)?;

info!("Connection to database at {url} established!");

run_pending_migrations(&mut connection)
Expand Down
2 changes: 1 addition & 1 deletion opendut-edgar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ opendut-util = { workspace = true }

anyhow = { workspace = true }
async-trait = { workspace = true }
backoff = { workspace = true, features = ["tokio"] }
backon = { workspace = true, features = ["tokio-sleep"] }
cfg-if = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true, features = ["derive"] }
Expand Down
16 changes: 7 additions & 9 deletions opendut-edgar/src/service/network_metrics/rperf/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
use backoff::ExponentialBackoffBuilder;
use backon::Retryable;
use opentelemetry::{global, KeyValue};
use opentelemetry::metrics::Gauge;
use regex::Regex;
Expand Down Expand Up @@ -46,17 +46,15 @@ pub async fn exponential_backoff_launch_rperf_client(
megabits_second_send_mutex: Arc<Mutex<Gauge<f64>>>,
megabits_second_receive_mutex: Arc<Mutex<Gauge<f64>>>
) -> Result<(), RperfRunError> {
let exponential_backoff = ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(rperf_backoff_max_elapsed_time_ms))
.build();
let exponential_backoff = backon::ExponentialBuilder::default()
.with_max_delay(rperf_backoff_max_elapsed_time_ms);

let backoff_result = backoff::future::retry(
exponential_backoff,
|| async {
let backoff_result = (|| async {
launch_rperf_client(peer, target_bandwidth_kbit_per_second, &megabits_second_send_mutex, &megabits_second_receive_mutex).await?;
Ok(())
}
).await;
})
.retry(exponential_backoff)
.await;

backoff_result
.map_err(|cause| RperfClientError { message: "Could not run rperf client".to_string(), cause })
Expand Down
16 changes: 7 additions & 9 deletions opendut-edgar/src/service/network_metrics/rperf/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::process::Stdio;
use std::time::Duration;
use backoff::ExponentialBackoffBuilder;
use backon::Retryable;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tracing::{error, trace};
Expand All @@ -10,17 +10,15 @@ use crate::service::network_metrics::rperf::RperfRunError::RperfServerError;
pub async fn exponential_backoff_launch_rperf_server(
rperf_backoff_max_elapsed_time_ms: Duration,
) -> Result<(), RperfRunError> {
let exponential_backoff = ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(rperf_backoff_max_elapsed_time_ms))
.build();
let exponential_backoff = backon::ExponentialBuilder::default()
.with_max_delay(rperf_backoff_max_elapsed_time_ms);

let backoff_result = backoff::future::retry(
exponential_backoff,
|| async {
let backoff_result = (|| async {
launch_rperf_server().await?;
Ok(())
}
).await;
})
.retry(exponential_backoff)
.await;

backoff_result
.map_err(|cause| RperfServerError { message: "Could not run rperf server".to_string(), cause })
Expand Down
2 changes: 1 addition & 1 deletion opendut-util/opendut-auth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ opendut-util-core = { workspace = true }
opendut-types = { workspace = true, optional = true }

anyhow = { workspace = true, optional = true }
backoff = { workspace = true }
backon = { workspace = true, features = ["std-blocking-sleep"] }
cfg-if = { workspace = true }
chrono = { workspace = true, optional = true, default-features = false, features = ["clock", "serde", "wasmbind"] }
config = { workspace = true, optional = true }
Expand Down
61 changes: 31 additions & 30 deletions opendut-util/opendut-auth/src/confidential/blocking/client.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
use std::fmt::Formatter;
use std::ops::{Sub};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use crate::confidential::blocking::reqwest_client::OidcBlockingReqwestClient;
use crate::confidential::config::{ConfidentialClientConfig, ConfidentialClientConfigData};
use crate::confidential::error::{ConfidentialClientError, WrappedRequestTokenError};
use crate::TOKEN_GRACE_PERIOD;
use backon::BlockingRetryable;
use chrono::{NaiveDateTime, Utc};
use config::Config;
use oauth2::{AccessToken, TokenResponse};
use oauth2::basic::{BasicClient, BasicTokenResponse};
use backoff::ExponentialBackoffBuilder;
use oauth2::{AccessToken, TokenResponse};
use std::fmt::Formatter;
use std::ops::Sub;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{RwLock, RwLockWriteGuard};
use std::time::Duration;
use tokio::sync::{Mutex, TryLockError};
use tonic::{Request, Status};
use tonic::metadata::MetadataValue;
use tonic::service::Interceptor;
use tonic::{Request, Status};
use tracing::debug;
use crate::confidential::config::{ConfidentialClientConfig, ConfidentialClientConfigData};
use crate::confidential::blocking::reqwest_client::OidcBlockingReqwestClient;
use crate::confidential::error::{ConfidentialClientError, WrappedRequestTokenError};
use crate::TOKEN_GRACE_PERIOD;

#[derive(Debug)]
pub struct ConfidentialClient {
Expand All @@ -41,8 +41,7 @@ pub enum AuthError {
#[error("ExpirationFieldMissing: {message}.")]
ExpirationFieldMissing { message: String },
#[error("FailedToUpdateToken: {message} cause: {cause}.")]
FailedToLockConfidentialClient { message: String, cause: backoff::Error<TryLockError> },

FailedToLockConfidentialClient { message: String, cause: TryLockError },
}

#[derive(Clone)]
Expand Down Expand Up @@ -115,16 +114,17 @@ impl ConfidentialClient {
let token_endpoint = idp_config.issuer_url.join("protocol/openid-connect/token")
.map_err(|error| ConfidentialClientError::UrlParse { message: String::from("Failed to derive token url from issuer url: "), cause: error })?;

let exponential_backoff = ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(Duration::from_secs(120)))
.build();

let operation = || {
self.reqwest_client.client.get(token_endpoint.clone()).send()?;
Ok(())
};

let backoff_result = backoff::retry(exponential_backoff, operation);
let backoff_result = operation
.retry(
backon::ExponentialBuilder::default()
.with_max_delay(Duration::from_secs(120))
)
.call();

match backoff_result {
Ok(_) => { Ok(()) }
Expand Down Expand Up @@ -173,7 +173,7 @@ impl ConfidentialClient {
self.fetch_token()?
}
Some(token) => {
if Utc::now().naive_utc().lt(&token.expires_in.sub(TOKEN_GRACE_PERIOD)) {
if Utc::now().naive_utc() < token.expires_in.sub(TOKEN_GRACE_PERIOD) {
Token { value: token.access_token.secret().to_string() }
} else {
self.fetch_token()?
Expand All @@ -189,16 +189,17 @@ impl Interceptor for ConfClientArcMutex<Option<ConfidentialClientRef>> {

let cloned_arc_mutex = Arc::clone(&self.0);

let exponential_backoff = ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(Duration::from_secs(120)))
.build();

let operation = || {
let mutex_guard = cloned_arc_mutex.try_lock()?;
Ok(mutex_guard)
};

let backoff_result = backoff::retry(exponential_backoff, operation);
let backoff_result = operation
.retry(
backon::ExponentialBuilder::default()
.with_max_delay(Duration::from_secs(120))
)
.call();

let token = match backoff_result {
Ok(mutex_guard) => {
Expand All @@ -208,7 +209,7 @@ impl Interceptor for ConfClientArcMutex<Option<ConfidentialClientRef>> {
Err(error) => {
eprintln!("Failed to acquire lock on the Confidential Client definitively. The following telemetry data will not be transmitted.");
eprintln!("Failed request: {:?}", request);
Some(Err(AuthError::FailedToLockConfidentialClient {message: "Unable to acquire lock on the Confidential Client".to_owned(), cause: error}))
Some(Err(AuthError::FailedToLockConfidentialClient { message: "Unable to acquire lock on the Confidential Client".to_owned(), cause: error }))
}
};

Expand All @@ -231,15 +232,15 @@ impl Interceptor for ConfClientArcMutex<Option<ConfidentialClientRef>> {

#[cfg(test)]
mod auth_tests {
use crate::confidential::blocking;
use crate::confidential::config::ConfidentialClientConfigData;
use crate::confidential::pem::read_pem_from_file_path;
use chrono::Utc;
use googletest::assert_that;
use googletest::matchers::gt;
use oauth2::{ClientId, ClientSecret, TokenResponse};
use url::Url;
use opendut_util_core::project;
use crate::confidential::config::ConfidentialClientConfigData;
use crate::confidential::pem::read_pem_from_file_path;
use crate::confidential::blocking;
use url::Url;

#[test_with::env(RUN_KEYCLOAK_INTEGRATION_TESTS)]
#[test]
Expand Down
2 changes: 1 addition & 1 deletion opendut-util/opendut-auth/src/confidential/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub enum ConfidentialClientError {
#[error("Failed to load OIDC configuration: '{message}'. Cause: '{cause}'")]
Configuration { message: String, cause: Box<dyn std::error::Error + Send + Sync> },
#[error("{message}\n {cause}")]
KeycloakConnection { message: String, cause: backoff::Error<reqwest::Error> },
KeycloakConnection { message: String, cause: reqwest::Error },
#[error("{message}\n {cause}")]
UrlParse { message: String, cause: url::ParseError },
#[error("OIDC configuration error: '{message}'.")]
Expand Down
2 changes: 1 addition & 1 deletion tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ opendut-types = { workspace = true }
opendut-util = { workspace = true }

anyhow = { workspace = true }
backoff = { workspace = true, features = ["tokio"] }
backon = { workspace = true, features = ["tokio-sleep"] }
config = { workspace = true }
googletest = { workspace = true }
test-log = { workspace = true }
Expand Down
19 changes: 14 additions & 5 deletions tests/src/testing/carl_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::time::Duration;
use crate::testing::util;
use anyhow::anyhow;
use backon::Retryable;
use opendut_carl_api::carl::CarlClient;
use opendut_types::peer::state::PeerState;
use opendut_types::peer::PeerId;
Expand All @@ -24,15 +26,22 @@ impl TestCarlClient {
}

pub async fn await_peer_up(&self, peer_id: PeerId) -> anyhow::Result<()> {
util::retry(|| async {
let edgar_state = self.inner().await.peers.get_peer_state(peer_id).await
.map_err(|cause| backoff::Error::transient(cause.into()))?;

(|| async {
let edgar_state = self.inner().await
.peers.get_peer_state(peer_id).await?;

match edgar_state {
PeerState::Up { .. } => Ok(()),
PeerState::Down => Err(backoff::Error::transient(anyhow!("No peers registered in time!")))
PeerState::Down => Err(anyhow!("No peers registered in time!"))
}
}).await?;
})
.retry(
backon::ExponentialBuilder::default()
.with_max_delay(Duration::from_secs(15))
)
.await?;

Ok(())
}

Expand Down
Loading

0 comments on commit 46881a8

Please sign in to comment.