Skip to content

Commit

Permalink
CARL Persistence -> Introduce proper transactions.
Browse files Browse the repository at this point in the history
Transactions now span the ResourcesManager-APIs used for modifying persistence:
- `.insert()`
- `.remove()`
- `.resources_mut()`

In particular `.resources_mut()` allows developers to group multiple
requests into a transaction. To achieve this, the API was changed to
require returning a `Result<T, E>` out of the closure. If an `Err` is
returned this way, then everything is rolled back.

Transactions are not supported for in-memory storage, as this would
require implementing a custom transaction mechanism.
This can be problematic when the database state is rolled back, but the
in-memory state is not, since some Resources are always kept
in-memory.
This change should still be an improvement, since when the database
state is not impacted, CARL can be restarted to clear out potentially
inconsistent in-memory state.
  • Loading branch information
mbfm committed Sep 4, 2024
1 parent 415a778 commit d5b6b9b
Show file tree
Hide file tree
Showing 23 changed files with 356 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::resources::manager::ResourcesManagerRef;
use opendut_carl_api::carl::cluster::CreateClusterConfigurationError;
use opendut_types::cluster::{ClusterConfiguration, ClusterId};
use tracing::{debug, error, info};
use crate::resources::storage::ResourcesStorageApi;

pub struct CreateClusterConfigurationParams {
pub resources_manager: ResourcesManagerRef,
Expand All @@ -22,7 +23,8 @@ pub async fn create_cluster_configuration(params: CreateClusterConfigurationPara
resources_manager.resources_mut(|resources| {
resources.insert(cluster_id, params.cluster_configuration)
.map_err(|cause| CreateClusterConfigurationError::Internal { cluster_id, cluster_name: cluster_name.clone(), cause: cause.to_string() })
}).await?;
}).await
.map_err(|cause| CreateClusterConfigurationError::Internal { cluster_id, cluster_name: cluster_name.clone(), cause: cause.to_string() })??;

info!("Successfully created cluster configuration '{cluster_name}' <{cluster_id}>.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ pub async fn delete_cluster_configuration(params: DeleteClusterConfigurationPara

debug!("Deleting cluster configuration <{cluster_id}>.");

let cluster_configuration = resources_manager.resources_mut(|resources| {
resources.remove::<ClusterConfiguration>(cluster_id)
.map_err(|cause| DeleteClusterConfigurationError::Internal { cluster_id, cluster_name: None, cause: cause.to_string() })?
.ok_or_else(|| DeleteClusterConfigurationError::ClusterConfigurationNotFound { cluster_id })
}).await?;
let cluster_configuration = resources_manager.remove::<ClusterConfiguration>(cluster_id).await
.map_err(|cause| DeleteClusterConfigurationError::Internal { cluster_id, cluster_name: None, cause: cause.to_string() })?
.ok_or_else(|| DeleteClusterConfigurationError::ClusterConfigurationNotFound { cluster_id })?;

let cluster_name = Clone::clone(&cluster_configuration.name);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use opendut_types::peer::{PeerDescriptor, PeerId};
use std::ops::Not;
use std::sync::Arc;
use tracing::error;
use crate::resources::storage::ResourcesStorageApi;

pub struct DeleteClusterDeploymentParams {
pub resources_manager: ResourcesManagerRef,
Expand All @@ -30,7 +31,8 @@ pub async fn delete_cluster_deployment(params: DeleteClusterDeploymentParams) ->
.map_err(|cause| DeleteClusterDeploymentError::Internal { cluster_id, cluster_name: None, cause: cause.to_string() })?;
Ok((deployment, configuration))
}).transpose()
}).await?
}).await
.map_err(|cause| DeleteClusterDeploymentError::Internal { cluster_id, cluster_name: None, cause: cause.to_string() })??
.ok_or(DeleteClusterDeploymentError::ClusterDeploymentNotFound { cluster_id })?;

if let Some(cluster) = cluster {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::resources::manager::ResourcesManagerRef;
use opendut_carl_api::carl::cluster::StoreClusterDeploymentError;
use opendut_types::cluster::{ClusterConfiguration, ClusterDeployment, ClusterId, ClusterName};
use tracing::error;
use crate::resources::storage::ResourcesStorageApi;

pub struct StoreClusterConfigurationParams {
pub resources_manager: ResourcesManagerRef,
Expand All @@ -22,7 +23,8 @@ pub async fn store_cluster_deployment(params: StoreClusterConfigurationParams) -
.unwrap_or_else(|| ClusterName::try_from("unknown_cluster").unwrap());
resources.insert(cluster_id, deployment)
.map_err(|cause| StoreClusterDeploymentError::Internal { cluster_id, cluster_name: Some(cluster_name.clone()), cause: cause.to_string() })
}).await?;
}).await
.map_err(|cause| StoreClusterDeploymentError::Internal { cluster_id, cluster_name: None, cause: cause.to_string() })??;

Ok(cluster_id)
}
Expand Down
12 changes: 6 additions & 6 deletions opendut-carl/src/actions/peers/assign_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::peer::broker::PeerMessagingBrokerRef;
use crate::persistence::error::PersistenceError;
use crate::resources::manager::ResourcesManagerRef;
use crate::resources::storage::ResourcesStorageApi;
use opendut_carl_api::proto::services::peer_messaging_broker::{downstream, ApplyPeerConfiguration};
use opendut_types::cluster::ClusterAssignment;
use opendut_types::peer::configuration::{OldPeerConfiguration, PeerConfiguration};
Expand Down Expand Up @@ -33,7 +34,7 @@ pub async fn assign_cluster(params: AssignClusterParams) -> Result<(), AssignClu
cluster_assignment: Some(params.cluster_assignment),
};
resources.insert(peer_id, Clone::clone(&old_peer_configuration))
.map_err(|source| AssignClusterError::Persistence { peer_id, source })?;
.map_err(|source| AssignClusterError::Persistence { peer_id, source })?;

let peer_configuration = resources.get::<PeerConfiguration>(peer_id)
.map_err(|source| AssignClusterError::Persistence { peer_id, source })?
Expand All @@ -55,7 +56,8 @@ pub async fn assign_cluster(params: AssignClusterParams) -> Result<(), AssignClu
}

Ok((old_peer_configuration, peer_configuration))
}).await?;
}).await
.map_err(|source| AssignClusterError::Persistence { peer_id, source })??;

params.peer_messaging_broker.send_to_peer(
peer_id,
Expand Down Expand Up @@ -101,13 +103,11 @@ mod tests {
let old_peer_configuration = OldPeerConfiguration {
cluster_assignment: None,
};
resources_manager.resources_mut(|resources| {
resources.insert(peer_id, Clone::clone(&old_peer_configuration))
}).await?;
let peer_configuration = PeerConfiguration::default();
resources_manager.resources_mut(|resources| {
resources.insert(peer_id, Clone::clone(&old_peer_configuration))?;
resources.insert(peer_id, Clone::clone(&peer_configuration))
}).await?;
}).await??;

let (_, mut receiver) = peer_messaging_broker.open(peer_id, IpAddr::from_str("1.2.3.4")?).await?;
let received = receiver.recv().await.unwrap()
Expand Down
4 changes: 3 additions & 1 deletion opendut-carl/src/actions/peers/delete_peer_descriptor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::resources::manager::ResourcesManagerRef;
use crate::resources::storage::ResourcesStorageApi;
use crate::vpn::Vpn;
use opendut_auth::registration::client::RegistrationClientRef;
use opendut_carl_api::carl::peer::DeletePeerDescriptorError;
Expand Down Expand Up @@ -29,7 +30,8 @@ pub async fn delete_peer_descriptor(params: DeletePeerDescriptorParams) -> Resul
.ok_or_else(|| DeletePeerDescriptorError::PeerNotFound { peer_id })?;

Ok(peer_descriptor)
}).await?;
}).await
.map_err(|cause| DeletePeerDescriptorError::Internal { peer_id, peer_name: None, cause: cause.to_string() })??;

let peer_name = &peer_descriptor.name;

Expand Down
4 changes: 3 additions & 1 deletion opendut-carl/src/actions/peers/get_peer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use opendut_carl_api::carl::peer::GetPeerStateError;
use opendut_types::peer::state::PeerState;
use opendut_types::peer::{PeerDescriptor, PeerId};
use tracing::{debug, error, info};
use crate::resources::storage::ResourcesStorageApi;

pub struct GetPeerStateParams {
pub peer: PeerId,
Expand Down Expand Up @@ -33,7 +34,8 @@ pub async fn get_peer_state(params: GetPeerStateParams) -> Result<PeerState, Get
}
}
}
}).await?;
}).await
.map_err(|cause| GetPeerStateError::Internal { peer_id, cause: cause.to_string() })??;

info!("Successfully queried state of peer with peer_id <{}>.", peer_id);

Expand Down
1 change: 1 addition & 0 deletions opendut-carl/src/actions/peers/list_peer_descriptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::resources::manager::ResourcesManagerRef;
use opendut_carl_api::carl::peer::ListPeerDescriptorsError;
use opendut_types::peer::PeerDescriptor;
use tracing::{debug, error, info};
use crate::resources::storage::ResourcesStorageApi;

pub struct ListPeerDescriptorsParams {
pub resources_manager: ResourcesManagerRef,
Expand Down
4 changes: 3 additions & 1 deletion opendut-carl/src/actions/peers/store_peer_descriptor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::persistence::error::PersistenceError;
use crate::persistence::error::{FlattenPersistenceResult, PersistenceError};
use crate::resources::manager::ResourcesManagerRef;
use crate::vpn::Vpn;
use opendut_carl_api::carl::peer::StorePeerDescriptorError;
Expand All @@ -8,6 +8,7 @@ use opendut_types::peer::ethernet::EthernetBridge;
use opendut_types::peer::{PeerDescriptor, PeerId};
use opendut_types::util::net::NetworkInterfaceName;
use tracing::{debug, error, info, warn};
use crate::resources::storage::ResourcesStorageApi;

pub struct StorePeerDescriptorParams {
pub resources_manager: ResourcesManagerRef,
Expand Down Expand Up @@ -63,6 +64,7 @@ pub async fn store_peer_descriptor(params: StorePeerDescriptorParams) -> Result<

Ok(is_new_peer)
}).await
.flatten_persistence_result()
.map_err(|cause: PersistenceError| StorePeerDescriptorError::Internal {
peer_id,
peer_name: Clone::clone(&peer_name),
Expand Down
4 changes: 3 additions & 1 deletion opendut-carl/src/actions/peers/unassign_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::persistence::error::PersistenceError;
use crate::resources::manager::ResourcesManagerRef;
use opendut_types::peer::state::{PeerState, PeerUpState};
use opendut_types::peer::PeerId;
use crate::resources::storage::ResourcesStorageApi;

pub struct UnassignClusterParams {
pub resources_manager: ResourcesManagerRef,
Expand Down Expand Up @@ -38,7 +39,8 @@ pub async fn unassign_cluster(params: UnassignClusterParams) -> Result<(), Unass
}

Ok(())
}).await?;
}).await
.map_err(|source| UnassignClusterError::Persistence { peer_id, source })??;

Ok(())
}
1 change: 1 addition & 0 deletions opendut-carl/src/cluster/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::actions::{AssignClusterParams, DeleteClusterDeploymentParams, GetPeer
use crate::peer::broker::PeerMessagingBrokerRef;
use crate::persistence::error::PersistenceResult;
use crate::resources::manager::ResourcesManagerRef;
use crate::resources::storage::ResourcesStorageApi;
use crate::vpn::Vpn;

pub type ClusterManagerRef = Arc<Mutex<ClusterManager>>;
Expand Down
2 changes: 1 addition & 1 deletion opendut-carl/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use opendut_types::peer::PeerDescriptor;
use opendut_types::peer::state::PeerState;
use crate::persistence::error::PersistenceError;
use crate::resources::manager::ResourcesManagerRef;

use crate::resources::storage::ResourcesStorageApi;

pub fn initialize_metrics_collection(
resources_manager: ResourcesManagerRef,
Expand Down
8 changes: 5 additions & 3 deletions opendut-carl/src/peer/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use opendut_types::peer::PeerId;

use crate::persistence::error::PersistenceError;
use crate::resources::manager::ResourcesManagerRef;
use crate::resources::storage::ResourcesStorageApi;

pub type PeerMessagingBrokerRef = Arc<PeerMessagingBroker>;

Expand Down Expand Up @@ -97,7 +98,7 @@ impl PeerMessagingBroker {
self.resources_manager.resources_mut(|resources| {
let maybe_peer_state = resources.get::<PeerState>(peer_id)
.map_err(|source| OpenError::Persistence { peer_id, source })?;

match maybe_peer_state {
None => {
info!("Peer <{}> opened stream which has not been seen before.", peer_id);
Expand All @@ -118,7 +119,8 @@ impl PeerMessagingBroker {
resources.insert(peer_id, new_peer_state)
.map_err(|source| OpenError::Persistence { peer_id, source })
})
}).await?;
}).await
.map_err(|source| OpenError::Persistence { peer_id, source })??;

let maybe_old_configuration = self.resources_manager.get::<OldPeerConfiguration>(peer_id).await
.map_err(|source| OpenError::Persistence { peer_id, source })?;
Expand Down Expand Up @@ -273,7 +275,7 @@ mod tests {
use opendut_carl_api::proto::services::peer_messaging_broker::Ping;

use crate::resources::manager::ResourcesManager;

use crate::resources::storage::ResourcesStorageApi;
use super::*;

#[tokio::test]
Expand Down
33 changes: 24 additions & 9 deletions opendut-carl/src/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ pub mod database;
pub(crate) mod resources;
mod query;

pub struct Storage {
pub db: Db,
pub memory: Memory,
pub struct Storage<'a> {
pub db: Db<'a>,
pub memory: &'a mut Memory,
}
pub struct Db {
pub inner: Mutex<PgConnection>, //Mutex rather than RwLock, because we share this between threads (i.e. we need it to implement `Sync`)
pub struct Db<'a> {
pub inner: Mutex<&'a mut PgConnection>, //Mutex rather than RwLock, because we share this between threads (i.e. we need it to implement `Sync`)
}
impl Db {
pub fn connection(&self) -> MutexGuard<PgConnection> {
impl<'a> Db<'a> {
pub fn from_connection(connection: &'a mut PgConnection) -> Db {
Self { inner: Mutex::new(connection) }
}
pub fn connection(&self) -> MutexGuard<&'a mut PgConnection> {
self.inner.lock().expect("error while locking mutex for database connection")
}
}
Expand All @@ -36,7 +39,7 @@ pub(crate) mod error {
},
DieselInternal {
#[from] source: diesel::result::Error,
}
},
}
impl PersistenceError {
pub fn insert<R>(id: impl Into<Uuid>, cause: impl Into<Cause>) -> Self {
Expand Down Expand Up @@ -88,7 +91,7 @@ pub(crate) mod error {
writeln!(f, " Source: {source}")
).transpose()?;
}
PersistenceError::DieselInternal { source } => writeln!(f, "{source}")?,
PersistenceError::DieselInternal { source } => writeln!(f, "Error internal to Diesel, likely from transaction: {source}")?,
}
Ok(())
}
Expand All @@ -115,4 +118,16 @@ pub(crate) mod error {
}

pub type PersistenceResult<T> = Result<T, PersistenceError>;
pub trait FlattenPersistenceResult<T>: Sized {
fn flatten_persistence_result(self) -> PersistenceResult<T>;
}
impl<T> FlattenPersistenceResult<T> for PersistenceResult<PersistenceResult<T>> {
fn flatten_persistence_result(self) -> PersistenceResult<T> {
match self {
Ok(Ok(ok)) => Ok(ok),
Ok(Err(err)) => Err(err),
Err(err) => Err(err)
}
}
}
}
17 changes: 8 additions & 9 deletions opendut-carl/src/persistence/resources/cluster_configuration.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use super::Persistable;
use crate::persistence::error::{PersistenceError, PersistenceResult};
use crate::persistence::error::PersistenceResult;
use crate::persistence::query::Filter;
use crate::persistence::{query, Storage};
use diesel::Connection;
use opendut_types::cluster::{ClusterConfiguration, ClusterId};

impl Persistable for ClusterConfiguration {
fn insert(self, _id: ClusterId, storage: &mut Storage) -> PersistenceResult<()> {
storage.db.connection().transaction::<_, PersistenceError, _>(|connection| {
//Delete before inserting to ensure that when an update removes
//list elements we don't leave those elements behind in the database.
//TODO more efficient solution
query::cluster_configuration::remove(self.id, connection)?;
let mut connection = storage.db.connection();

query::cluster_configuration::insert(self, connection)
})
//Delete before inserting to ensure that when an update removes
//list elements we don't leave those elements behind in the database.
//TODO more efficient solution
query::cluster_configuration::remove(self.id, &mut connection)?;

query::cluster_configuration::insert(self, &mut connection)
}

fn remove(cluster_id: ClusterId, storage: &mut Storage) -> PersistenceResult<Option<Self>> {
Expand Down
19 changes: 9 additions & 10 deletions opendut-carl/src/persistence/resources/cluster_deployment.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
use diesel::Connection;
use opendut_types::cluster::{ClusterDeployment, ClusterId};

use crate::persistence::error::{PersistenceError, PersistenceResult};
use crate::persistence::error::PersistenceResult;
use crate::persistence::query::Filter;
use crate::persistence::{query, Storage};

use super::Persistable;

impl Persistable for ClusterDeployment {
fn insert(self, _id: ClusterId, storage: &mut Storage) -> PersistenceResult<()> {
storage.db.connection().transaction::<_, PersistenceError, _>(|connection| {
//Delete before inserting to ensure that when an update removes
//list elements we don't leave those elements behind in the database.
//TODO more efficient solution
query::cluster_deployment::remove(self.id, connection)?;

query::cluster_deployment::insert(self, connection)
})
let mut connection = storage.db.connection();

//Delete before inserting to ensure that when an update removes
//list elements we don't leave those elements behind in the database.
//TODO more efficient solution
query::cluster_deployment::remove(self.id, &mut connection)?;

query::cluster_deployment::insert(self, &mut connection)
}

fn remove(cluster_id: ClusterId, storage: &mut Storage) -> PersistenceResult<Option<Self>> {
Expand Down
18 changes: 8 additions & 10 deletions opendut-carl/src/persistence/resources/peer_descriptor.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
use diesel::Connection;

use opendut_types::peer::{PeerDescriptor, PeerId};

use super::Persistable;
use crate::persistence::error::{PersistenceError, PersistenceResult};
use crate::persistence::error::PersistenceResult;
use crate::persistence::query::Filter;
use crate::persistence::{query, Storage};

impl Persistable for PeerDescriptor {
fn insert(self, _peer_id: PeerId, storage: &mut Storage) -> PersistenceResult<()> {
storage.db.connection().transaction::<_, PersistenceError, _>(|connection| {
//Delete before inserting to ensure that when an update removes
//list elements we don't leave those elements behind in the database.
//TODO more efficient solution
query::peer_descriptor::remove(self.id, connection)?;
let mut connection = storage.db.connection();

//Delete before inserting to ensure that when an update removes
//list elements we don't leave those elements behind in the database.
//TODO more efficient solution
query::peer_descriptor::remove(self.id, &mut connection)?;

query::peer_descriptor::insert(self, connection)
})
query::peer_descriptor::insert(self, &mut connection)
}

fn remove(peer_id: PeerId, storage: &mut Storage) -> PersistenceResult<Option<Self>> {
Expand Down
Loading

0 comments on commit d5b6b9b

Please sign in to comment.