diff --git a/opendut-carl/src/actions/clusters/create_cluster_configuration.rs b/opendut-carl/src/actions/clusters/create_cluster_configuration.rs index 478b62d18..fad05e095 100644 --- a/opendut-carl/src/actions/clusters/create_cluster_configuration.rs +++ b/opendut-carl/src/actions/clusters/create_cluster_configuration.rs @@ -2,6 +2,7 @@ use crate::resources::manager::ResourcesManagerRef; use opendut_carl_api::carl::cluster::CreateClusterConfigurationError; use opendut_types::cluster::{ClusterConfiguration, ClusterId}; use tracing::{debug, error, info}; +use crate::resources::storage::ResourcesStorageApi; pub struct CreateClusterConfigurationParams { pub resources_manager: ResourcesManagerRef, @@ -22,7 +23,8 @@ pub async fn create_cluster_configuration(params: CreateClusterConfigurationPara resources_manager.resources_mut(|resources| { resources.insert(cluster_id, params.cluster_configuration) .map_err(|cause| CreateClusterConfigurationError::Internal { cluster_id, cluster_name: cluster_name.clone(), cause: cause.to_string() }) - }).await?; + }).await + .map_err(|cause| CreateClusterConfigurationError::Internal { cluster_id, cluster_name: cluster_name.clone(), cause: cause.to_string() })??; info!("Successfully created cluster configuration '{cluster_name}' <{cluster_id}>."); diff --git a/opendut-carl/src/actions/clusters/delete_cluster_configuration.rs b/opendut-carl/src/actions/clusters/delete_cluster_configuration.rs index 76231c852..ea4a67953 100644 --- a/opendut-carl/src/actions/clusters/delete_cluster_configuration.rs +++ b/opendut-carl/src/actions/clusters/delete_cluster_configuration.rs @@ -18,11 +18,9 @@ pub async fn delete_cluster_configuration(params: DeleteClusterConfigurationPara debug!("Deleting cluster configuration <{cluster_id}>."); - let cluster_configuration = resources_manager.resources_mut(|resources| { - resources.remove::(cluster_id) - .map_err(|cause| DeleteClusterConfigurationError::Internal { cluster_id, cluster_name: None, cause: cause.to_string() })? - .ok_or_else(|| DeleteClusterConfigurationError::ClusterConfigurationNotFound { cluster_id }) - }).await?; + let cluster_configuration = resources_manager.remove::(cluster_id).await + .map_err(|cause| DeleteClusterConfigurationError::Internal { cluster_id, cluster_name: None, cause: cause.to_string() })? + .ok_or_else(|| DeleteClusterConfigurationError::ClusterConfigurationNotFound { cluster_id })?; let cluster_name = Clone::clone(&cluster_configuration.name); diff --git a/opendut-carl/src/actions/clusters/delete_cluster_deployment.rs b/opendut-carl/src/actions/clusters/delete_cluster_deployment.rs index b741e48fb..93ad37043 100644 --- a/opendut-carl/src/actions/clusters/delete_cluster_deployment.rs +++ b/opendut-carl/src/actions/clusters/delete_cluster_deployment.rs @@ -8,6 +8,7 @@ use opendut_types::peer::{PeerDescriptor, PeerId}; use std::ops::Not; use std::sync::Arc; use tracing::error; +use crate::resources::storage::ResourcesStorageApi; pub struct DeleteClusterDeploymentParams { pub resources_manager: ResourcesManagerRef, @@ -30,7 +31,8 @@ pub async fn delete_cluster_deployment(params: DeleteClusterDeploymentParams) -> .map_err(|cause| DeleteClusterDeploymentError::Internal { cluster_id, cluster_name: None, cause: cause.to_string() })?; Ok((deployment, configuration)) }).transpose() - }).await? + }).await + .map_err(|cause| DeleteClusterDeploymentError::Internal { cluster_id, cluster_name: None, cause: cause.to_string() })?? .ok_or(DeleteClusterDeploymentError::ClusterDeploymentNotFound { cluster_id })?; if let Some(cluster) = cluster { diff --git a/opendut-carl/src/actions/clusters/store_cluster_deployment.rs b/opendut-carl/src/actions/clusters/store_cluster_deployment.rs index bb384d75a..c30566a07 100644 --- a/opendut-carl/src/actions/clusters/store_cluster_deployment.rs +++ b/opendut-carl/src/actions/clusters/store_cluster_deployment.rs @@ -2,6 +2,7 @@ use crate::resources::manager::ResourcesManagerRef; use opendut_carl_api::carl::cluster::StoreClusterDeploymentError; use opendut_types::cluster::{ClusterConfiguration, ClusterDeployment, ClusterId, ClusterName}; use tracing::error; +use crate::resources::storage::ResourcesStorageApi; pub struct StoreClusterConfigurationParams { pub resources_manager: ResourcesManagerRef, @@ -22,7 +23,8 @@ pub async fn store_cluster_deployment(params: StoreClusterConfigurationParams) - .unwrap_or_else(|| ClusterName::try_from("unknown_cluster").unwrap()); resources.insert(cluster_id, deployment) .map_err(|cause| StoreClusterDeploymentError::Internal { cluster_id, cluster_name: Some(cluster_name.clone()), cause: cause.to_string() }) - }).await?; + }).await + .map_err(|cause| StoreClusterDeploymentError::Internal { cluster_id, cluster_name: None, cause: cause.to_string() })??; Ok(cluster_id) } diff --git a/opendut-carl/src/actions/peers/assign_cluster.rs b/opendut-carl/src/actions/peers/assign_cluster.rs index 775e6f26d..73dd5c4fe 100644 --- a/opendut-carl/src/actions/peers/assign_cluster.rs +++ b/opendut-carl/src/actions/peers/assign_cluster.rs @@ -1,6 +1,7 @@ use crate::peer::broker::PeerMessagingBrokerRef; use crate::persistence::error::PersistenceError; use crate::resources::manager::ResourcesManagerRef; +use crate::resources::storage::ResourcesStorageApi; use opendut_carl_api::proto::services::peer_messaging_broker::{downstream, ApplyPeerConfiguration}; use opendut_types::cluster::ClusterAssignment; use opendut_types::peer::configuration::{OldPeerConfiguration, PeerConfiguration}; @@ -33,7 +34,7 @@ pub async fn assign_cluster(params: AssignClusterParams) -> Result<(), AssignClu cluster_assignment: Some(params.cluster_assignment), }; resources.insert(peer_id, Clone::clone(&old_peer_configuration)) - .map_err(|source| AssignClusterError::Persistence { peer_id, source })?; + .map_err(|source| AssignClusterError::Persistence { peer_id, source })?; let peer_configuration = resources.get::(peer_id) .map_err(|source| AssignClusterError::Persistence { peer_id, source })? @@ -55,7 +56,8 @@ pub async fn assign_cluster(params: AssignClusterParams) -> Result<(), AssignClu } Ok((old_peer_configuration, peer_configuration)) - }).await?; + }).await + .map_err(|source| AssignClusterError::Persistence { peer_id, source })??; params.peer_messaging_broker.send_to_peer( peer_id, @@ -101,13 +103,11 @@ mod tests { let old_peer_configuration = OldPeerConfiguration { cluster_assignment: None, }; - resources_manager.resources_mut(|resources| { - resources.insert(peer_id, Clone::clone(&old_peer_configuration)) - }).await?; let peer_configuration = PeerConfiguration::default(); resources_manager.resources_mut(|resources| { + resources.insert(peer_id, Clone::clone(&old_peer_configuration))?; resources.insert(peer_id, Clone::clone(&peer_configuration)) - }).await?; + }).await??; let (_, mut receiver) = peer_messaging_broker.open(peer_id, IpAddr::from_str("1.2.3.4")?).await?; let received = receiver.recv().await.unwrap() diff --git a/opendut-carl/src/actions/peers/delete_peer_descriptor.rs b/opendut-carl/src/actions/peers/delete_peer_descriptor.rs index 0983713e2..25c7ce72e 100644 --- a/opendut-carl/src/actions/peers/delete_peer_descriptor.rs +++ b/opendut-carl/src/actions/peers/delete_peer_descriptor.rs @@ -1,4 +1,5 @@ use crate::resources::manager::ResourcesManagerRef; +use crate::resources::storage::ResourcesStorageApi; use crate::vpn::Vpn; use opendut_auth::registration::client::RegistrationClientRef; use opendut_carl_api::carl::peer::DeletePeerDescriptorError; @@ -29,7 +30,8 @@ pub async fn delete_peer_descriptor(params: DeletePeerDescriptorParams) -> Resul .ok_or_else(|| DeletePeerDescriptorError::PeerNotFound { peer_id })?; Ok(peer_descriptor) - }).await?; + }).await + .map_err(|cause| DeletePeerDescriptorError::Internal { peer_id, peer_name: None, cause: cause.to_string() })??; let peer_name = &peer_descriptor.name; diff --git a/opendut-carl/src/actions/peers/get_peer_state.rs b/opendut-carl/src/actions/peers/get_peer_state.rs index 0083de587..1b62c830c 100644 --- a/opendut-carl/src/actions/peers/get_peer_state.rs +++ b/opendut-carl/src/actions/peers/get_peer_state.rs @@ -3,6 +3,7 @@ use opendut_carl_api::carl::peer::GetPeerStateError; use opendut_types::peer::state::PeerState; use opendut_types::peer::{PeerDescriptor, PeerId}; use tracing::{debug, error, info}; +use crate::resources::storage::ResourcesStorageApi; pub struct GetPeerStateParams { pub peer: PeerId, @@ -33,7 +34,8 @@ pub async fn get_peer_state(params: GetPeerStateParams) -> Result.", peer_id); diff --git a/opendut-carl/src/actions/peers/list_peer_descriptors.rs b/opendut-carl/src/actions/peers/list_peer_descriptors.rs index 1921a400c..c8a7883f4 100644 --- a/opendut-carl/src/actions/peers/list_peer_descriptors.rs +++ b/opendut-carl/src/actions/peers/list_peer_descriptors.rs @@ -2,6 +2,7 @@ use crate::resources::manager::ResourcesManagerRef; use opendut_carl_api::carl::peer::ListPeerDescriptorsError; use opendut_types::peer::PeerDescriptor; use tracing::{debug, error, info}; +use crate::resources::storage::ResourcesStorageApi; pub struct ListPeerDescriptorsParams { pub resources_manager: ResourcesManagerRef, diff --git a/opendut-carl/src/actions/peers/store_peer_descriptor.rs b/opendut-carl/src/actions/peers/store_peer_descriptor.rs index ebcf864cf..c365f6dab 100644 --- a/opendut-carl/src/actions/peers/store_peer_descriptor.rs +++ b/opendut-carl/src/actions/peers/store_peer_descriptor.rs @@ -1,4 +1,4 @@ -use crate::persistence::error::PersistenceError; +use crate::persistence::error::{FlattenPersistenceResult, PersistenceError}; use crate::resources::manager::ResourcesManagerRef; use crate::vpn::Vpn; use opendut_carl_api::carl::peer::StorePeerDescriptorError; @@ -8,6 +8,7 @@ use opendut_types::peer::ethernet::EthernetBridge; use opendut_types::peer::{PeerDescriptor, PeerId}; use opendut_types::util::net::NetworkInterfaceName; use tracing::{debug, error, info, warn}; +use crate::resources::storage::ResourcesStorageApi; pub struct StorePeerDescriptorParams { pub resources_manager: ResourcesManagerRef, @@ -63,6 +64,7 @@ pub async fn store_peer_descriptor(params: StorePeerDescriptorParams) -> Result< Ok(is_new_peer) }).await + .flatten_persistence_result() .map_err(|cause: PersistenceError| StorePeerDescriptorError::Internal { peer_id, peer_name: Clone::clone(&peer_name), diff --git a/opendut-carl/src/actions/peers/unassign_cluster.rs b/opendut-carl/src/actions/peers/unassign_cluster.rs index 7fcab9f02..14a4080d1 100644 --- a/opendut-carl/src/actions/peers/unassign_cluster.rs +++ b/opendut-carl/src/actions/peers/unassign_cluster.rs @@ -2,6 +2,7 @@ use crate::persistence::error::PersistenceError; use crate::resources::manager::ResourcesManagerRef; use opendut_types::peer::state::{PeerState, PeerUpState}; use opendut_types::peer::PeerId; +use crate::resources::storage::ResourcesStorageApi; pub struct UnassignClusterParams { pub resources_manager: ResourcesManagerRef, @@ -38,7 +39,8 @@ pub async fn unassign_cluster(params: UnassignClusterParams) -> Result<(), Unass } Ok(()) - }).await?; + }).await + .map_err(|source| UnassignClusterError::Persistence { peer_id, source })??; Ok(()) } diff --git a/opendut-carl/src/cluster/manager.rs b/opendut-carl/src/cluster/manager.rs index f41a01dd7..4fee9ddb8 100644 --- a/opendut-carl/src/cluster/manager.rs +++ b/opendut-carl/src/cluster/manager.rs @@ -19,6 +19,7 @@ use crate::actions::{AssignClusterParams, DeleteClusterDeploymentParams, GetPeer use crate::peer::broker::PeerMessagingBrokerRef; use crate::persistence::error::PersistenceResult; use crate::resources::manager::ResourcesManagerRef; +use crate::resources::storage::ResourcesStorageApi; use crate::vpn::Vpn; pub type ClusterManagerRef = Arc>; diff --git a/opendut-carl/src/metrics.rs b/opendut-carl/src/metrics.rs index 4e49d1450..d195804f3 100644 --- a/opendut-carl/src/metrics.rs +++ b/opendut-carl/src/metrics.rs @@ -5,7 +5,7 @@ use opendut_types::peer::PeerDescriptor; use opendut_types::peer::state::PeerState; use crate::persistence::error::PersistenceError; use crate::resources::manager::ResourcesManagerRef; - +use crate::resources::storage::ResourcesStorageApi; pub fn initialize_metrics_collection( resources_manager: ResourcesManagerRef, diff --git a/opendut-carl/src/peer/broker.rs b/opendut-carl/src/peer/broker.rs index c7a61ac1b..3ae05a6a0 100644 --- a/opendut-carl/src/peer/broker.rs +++ b/opendut-carl/src/peer/broker.rs @@ -19,6 +19,7 @@ use opendut_types::peer::PeerId; use crate::persistence::error::PersistenceError; use crate::resources::manager::ResourcesManagerRef; +use crate::resources::storage::ResourcesStorageApi; pub type PeerMessagingBrokerRef = Arc; @@ -97,7 +98,7 @@ impl PeerMessagingBroker { self.resources_manager.resources_mut(|resources| { let maybe_peer_state = resources.get::(peer_id) .map_err(|source| OpenError::Persistence { peer_id, source })?; - + match maybe_peer_state { None => { info!("Peer <{}> opened stream which has not been seen before.", peer_id); @@ -118,7 +119,8 @@ impl PeerMessagingBroker { resources.insert(peer_id, new_peer_state) .map_err(|source| OpenError::Persistence { peer_id, source }) }) - }).await?; + }).await + .map_err(|source| OpenError::Persistence { peer_id, source })??; let maybe_old_configuration = self.resources_manager.get::(peer_id).await .map_err(|source| OpenError::Persistence { peer_id, source })?; @@ -273,7 +275,7 @@ mod tests { use opendut_carl_api::proto::services::peer_messaging_broker::Ping; use crate::resources::manager::ResourcesManager; - + use crate::resources::storage::ResourcesStorageApi; use super::*; #[tokio::test] diff --git a/opendut-carl/src/persistence/mod.rs b/opendut-carl/src/persistence/mod.rs index 5689827ef..e63de3226 100644 --- a/opendut-carl/src/persistence/mod.rs +++ b/opendut-carl/src/persistence/mod.rs @@ -7,15 +7,18 @@ pub mod database; pub(crate) mod resources; mod query; -pub struct Storage { - pub db: Db, - pub memory: Memory, +pub struct Storage<'a> { + pub db: Db<'a>, + pub memory: &'a mut Memory, } -pub struct Db { - pub inner: Mutex, //Mutex rather than RwLock, because we share this between threads (i.e. we need it to implement `Sync`) +pub struct Db<'a> { + pub inner: Mutex<&'a mut PgConnection>, //Mutex rather than RwLock, because we share this between threads (i.e. we need it to implement `Sync`) } -impl Db { - pub fn connection(&self) -> MutexGuard { +impl<'a> Db<'a> { + pub fn from_connection(connection: &'a mut PgConnection) -> Db { + Self { inner: Mutex::new(connection) } + } + pub fn connection(&self) -> MutexGuard<&'a mut PgConnection> { self.inner.lock().expect("error while locking mutex for database connection") } } @@ -36,7 +39,7 @@ pub(crate) mod error { }, DieselInternal { #[from] source: diesel::result::Error, - } + }, } impl PersistenceError { pub fn insert(id: impl Into, cause: impl Into) -> Self { @@ -88,7 +91,7 @@ pub(crate) mod error { writeln!(f, " Source: {source}") ).transpose()?; } - PersistenceError::DieselInternal { source } => writeln!(f, "{source}")?, + PersistenceError::DieselInternal { source } => writeln!(f, "Error internal to Diesel, likely from transaction: {source}")?, } Ok(()) } @@ -115,4 +118,16 @@ pub(crate) mod error { } pub type PersistenceResult = Result; + pub trait FlattenPersistenceResult: Sized { + fn flatten_persistence_result(self) -> PersistenceResult; + } + impl FlattenPersistenceResult for PersistenceResult> { + fn flatten_persistence_result(self) -> PersistenceResult { + match self { + Ok(Ok(ok)) => Ok(ok), + Ok(Err(err)) => Err(err), + Err(err) => Err(err) + } + } + } } diff --git a/opendut-carl/src/persistence/resources/cluster_configuration.rs b/opendut-carl/src/persistence/resources/cluster_configuration.rs index 602032a33..efd6a6bdf 100644 --- a/opendut-carl/src/persistence/resources/cluster_configuration.rs +++ b/opendut-carl/src/persistence/resources/cluster_configuration.rs @@ -1,20 +1,19 @@ use super::Persistable; -use crate::persistence::error::{PersistenceError, PersistenceResult}; +use crate::persistence::error::PersistenceResult; use crate::persistence::query::Filter; use crate::persistence::{query, Storage}; -use diesel::Connection; use opendut_types::cluster::{ClusterConfiguration, ClusterId}; impl Persistable for ClusterConfiguration { fn insert(self, _id: ClusterId, storage: &mut Storage) -> PersistenceResult<()> { - storage.db.connection().transaction::<_, PersistenceError, _>(|connection| { - //Delete before inserting to ensure that when an update removes - //list elements we don't leave those elements behind in the database. - //TODO more efficient solution - query::cluster_configuration::remove(self.id, connection)?; + let mut connection = storage.db.connection(); - query::cluster_configuration::insert(self, connection) - }) + //Delete before inserting to ensure that when an update removes + //list elements we don't leave those elements behind in the database. + //TODO more efficient solution + query::cluster_configuration::remove(self.id, &mut connection)?; + + query::cluster_configuration::insert(self, &mut connection) } fn remove(cluster_id: ClusterId, storage: &mut Storage) -> PersistenceResult> { diff --git a/opendut-carl/src/persistence/resources/cluster_deployment.rs b/opendut-carl/src/persistence/resources/cluster_deployment.rs index 3a2d7b786..c4813f37c 100644 --- a/opendut-carl/src/persistence/resources/cluster_deployment.rs +++ b/opendut-carl/src/persistence/resources/cluster_deployment.rs @@ -1,7 +1,6 @@ -use diesel::Connection; use opendut_types::cluster::{ClusterDeployment, ClusterId}; -use crate::persistence::error::{PersistenceError, PersistenceResult}; +use crate::persistence::error::PersistenceResult; use crate::persistence::query::Filter; use crate::persistence::{query, Storage}; @@ -9,14 +8,14 @@ use super::Persistable; impl Persistable for ClusterDeployment { fn insert(self, _id: ClusterId, storage: &mut Storage) -> PersistenceResult<()> { - storage.db.connection().transaction::<_, PersistenceError, _>(|connection| { - //Delete before inserting to ensure that when an update removes - //list elements we don't leave those elements behind in the database. - //TODO more efficient solution - query::cluster_deployment::remove(self.id, connection)?; - - query::cluster_deployment::insert(self, connection) - }) + let mut connection = storage.db.connection(); + + //Delete before inserting to ensure that when an update removes + //list elements we don't leave those elements behind in the database. + //TODO more efficient solution + query::cluster_deployment::remove(self.id, &mut connection)?; + + query::cluster_deployment::insert(self, &mut connection) } fn remove(cluster_id: ClusterId, storage: &mut Storage) -> PersistenceResult> { diff --git a/opendut-carl/src/persistence/resources/peer_descriptor.rs b/opendut-carl/src/persistence/resources/peer_descriptor.rs index 4c0fe285d..382162350 100644 --- a/opendut-carl/src/persistence/resources/peer_descriptor.rs +++ b/opendut-carl/src/persistence/resources/peer_descriptor.rs @@ -1,22 +1,20 @@ -use diesel::Connection; - use opendut_types::peer::{PeerDescriptor, PeerId}; use super::Persistable; -use crate::persistence::error::{PersistenceError, PersistenceResult}; +use crate::persistence::error::PersistenceResult; use crate::persistence::query::Filter; use crate::persistence::{query, Storage}; impl Persistable for PeerDescriptor { fn insert(self, _peer_id: PeerId, storage: &mut Storage) -> PersistenceResult<()> { - storage.db.connection().transaction::<_, PersistenceError, _>(|connection| { - //Delete before inserting to ensure that when an update removes - //list elements we don't leave those elements behind in the database. - //TODO more efficient solution - query::peer_descriptor::remove(self.id, connection)?; + let mut connection = storage.db.connection(); + + //Delete before inserting to ensure that when an update removes + //list elements we don't leave those elements behind in the database. + //TODO more efficient solution + query::peer_descriptor::remove(self.id, &mut connection)?; - query::peer_descriptor::insert(self, connection) - }) + query::peer_descriptor::insert(self, &mut connection) } fn remove(peer_id: PeerId, storage: &mut Storage) -> PersistenceResult> { diff --git a/opendut-carl/src/resources/manager.rs b/opendut-carl/src/resources/manager.rs index 120b6932d..9c1965017 100644 --- a/opendut-carl/src/resources/manager.rs +++ b/opendut-carl/src/resources/manager.rs @@ -1,9 +1,9 @@ use std::sync::Arc; -use crate::persistence::error::PersistenceResult; +use crate::persistence::error::{FlattenPersistenceResult, PersistenceResult}; use crate::persistence::resources::Persistable; -use crate::resources::storage::PersistenceOptions; -use crate::resources::{storage, Resource, Resources}; +use crate::resources::storage::{PersistenceOptions, ResourcesStorageApi}; +use crate::resources::{storage, Resource, Resources, ResourcesTransaction}; use tokio::sync::RwLock; pub type ResourcesManagerRef = Arc; @@ -29,13 +29,17 @@ impl ResourcesManager { pub async fn insert(&self, id: R::Id, resource: R) -> PersistenceResult<()> where R: Resource + Persistable { let mut state = self.state.write().await; - state.resources.insert(id, resource) + state.resources.transaction(move |mut transaction| { + transaction.insert(id, resource) + }).flatten_persistence_result() } pub async fn remove(&self, id: R::Id) -> PersistenceResult> where R: Resource + Persistable { let mut state = self.state.write().await; - state.resources.remove(id) + state.resources.transaction(move |mut transaction| { + transaction.remove(id) + }).flatten_persistence_result() } pub async fn get(&self, id: R::Id) -> PersistenceResult> @@ -50,16 +54,25 @@ impl ResourcesManager { state.resources.list() } - pub async fn resources(&self, f: F) -> T - where F: FnOnce(&Resources) -> T { + pub async fn resources(&self, f: F) -> PersistenceResult + where F: FnOnce(&Resources) -> PersistenceResult { let state = self.state.read().await; f(&state.resources) } - pub async fn resources_mut(&self, f: F) -> T - where F: FnOnce(&mut Resources) -> T { + /// Allows grouping modifications to the database. This does multiple things: + /// - Opens a database transaction and then either commits it, or rolls it back when you return an `Err` out of the closure. + /// - Acquires the lock for the database mutex and keeps it until the end of the closure. + /// - Groups the async calls, so we only have to await at the end. + pub async fn resources_mut(&self, f: F) -> PersistenceResult> + where + F: FnOnce(&mut ResourcesTransaction) -> Result, + E: std::error::Error + Send + Sync + 'static, + { let mut state = self.state.write().await; - f(&mut state.resources) + state.resources.transaction(move |mut transaction| { + f(&mut transaction) + }) } } @@ -96,14 +109,13 @@ mod test { use googletest::prelude::*; + use super::*; use opendut_types::cluster::{ClusterConfiguration, ClusterId, ClusterName}; use opendut_types::peer::executor::{container::{ContainerCommand, ContainerImage, ContainerName, Engine}, ExecutorDescriptor, ExecutorDescriptors, ExecutorId, ExecutorKind}; use opendut_types::peer::{PeerDescriptor, PeerId, PeerLocation, PeerName, PeerNetworkDescriptor}; use opendut_types::topology::Topology; use opendut_types::util::net::{NetworkInterfaceConfiguration, NetworkInterfaceDescriptor, NetworkInterfaceId, NetworkInterfaceName}; - use super::*; - #[tokio::test] async fn test() -> Result<()> { @@ -171,20 +183,18 @@ mod test { assert_that!(testee.remove::(peer_resource_id).await?, some(eq(Clone::clone(&peer)))); - let id = testee.resources_mut(|resources| { - resources.insert(peer_resource_id, Clone::clone(&peer)).unwrap(); - peer_resource_id - }).await; + testee.insert(peer_resource_id, Clone::clone(&peer)).await?; - assert_that!(testee.get::(id).await?, some(eq(Clone::clone(&peer)))); + assert_that!(testee.get::(peer_resource_id).await?, some(eq(Clone::clone(&peer)))); testee.resources(|resources| { - resources.list::().unwrap() + resources.list::()? .into_iter() .for_each(|cluster| { assert_that!(cluster, eq(Clone::clone(&cluster_configuration))); }); - }).await; + Ok(()) + }).await?; Ok(()) } diff --git a/opendut-carl/src/resources/mod.rs b/opendut-carl/src/resources/mod.rs index 449d1b72b..69a3cf6a0 100644 --- a/opendut-carl/src/resources/mod.rs +++ b/opendut-carl/src/resources/mod.rs @@ -2,6 +2,8 @@ use resource::Resource; use crate::persistence::error::PersistenceResult; use crate::persistence::resources::Persistable; +use crate::resources::storage::persistent::PersistentResourcesTransaction; +use crate::resources::storage::volatile::VolatileResourcesTransaction; use crate::resources::storage::{PersistenceOptions, ResourcesStorage, ResourcesStorageApi}; pub mod manager; @@ -19,8 +21,20 @@ impl Resources { Ok(Self { storage }) } + pub fn transaction(&mut self, code: F) -> PersistenceResult> + where + F: FnOnce(ResourcesTransaction) -> Result, + E: std::error::Error + Send + Sync + 'static, + { + match &mut self.storage { + ResourcesStorage::Persistent(storage) => storage.transaction(|transaction| code(ResourcesTransaction::Persistent(transaction))), + ResourcesStorage::Volatile(storage) => storage.noop_transaction(|transaction| code(ResourcesTransaction::Volatile(transaction))), + } + } +} +impl ResourcesStorageApi for Resources { /// Inserts a new resource with this ID or updates it, if it already exists. - pub fn insert(&mut self, id: R::Id, resource: R) -> PersistenceResult<()> + fn insert(&mut self, id: R::Id, resource: R) -> PersistenceResult<()> where R: Resource + Persistable { match &mut self.storage { ResourcesStorage::Persistent(storage) => storage.insert(id, resource), @@ -28,7 +42,7 @@ impl Resources { } } - pub fn remove(&mut self, id: R::Id) -> PersistenceResult> + fn remove(&mut self, id: R::Id) -> PersistenceResult> where R: Resource + Persistable { match &mut self.storage { ResourcesStorage::Persistent(storage) => storage.remove(id), @@ -36,7 +50,7 @@ impl Resources { } } - pub fn get(&self, id: R::Id) -> PersistenceResult> + fn get(&self, id: R::Id) -> PersistenceResult> where R: Resource + Persistable { match &self.storage { ResourcesStorage::Persistent(storage) => storage.get(id), @@ -44,7 +58,7 @@ impl Resources { } } - pub fn list(&self) -> PersistenceResult> + fn list(&self) -> PersistenceResult> where R: Resource + Persistable { match &self.storage { ResourcesStorage::Persistent(storage) => storage.list(), @@ -70,3 +84,41 @@ impl Resources { } } } + +pub enum ResourcesTransaction<'a> { + Persistent(PersistentResourcesTransaction<'a>), + Volatile(VolatileResourcesTransaction<'a>), +} +impl ResourcesStorageApi for ResourcesTransaction<'_> { + fn insert(&mut self, id: R::Id, resource: R) -> PersistenceResult<()> + where R: Resource + Persistable { + match self { + ResourcesTransaction::Persistent(transaction) => transaction.insert(id, resource), + ResourcesTransaction::Volatile(transaction) => transaction.insert(id, resource), + } + } + + fn remove(&mut self, id: R::Id) -> PersistenceResult> + where R: Resource + Persistable { + match self { + ResourcesTransaction::Persistent(transaction) => transaction.remove(id), + ResourcesTransaction::Volatile(transaction) => transaction.remove(id), + } + } + + fn get(&self, id: R::Id) -> PersistenceResult> + where R: Resource + Persistable + Clone { + match self { + ResourcesTransaction::Persistent(transaction) => transaction.get(id), + ResourcesTransaction::Volatile(transaction) => transaction.get(id), + } + } + + fn list(&self) -> PersistenceResult> + where R: Resource + Persistable + Clone { + match self { + ResourcesTransaction::Persistent(transaction) => transaction.list(), + ResourcesTransaction::Volatile(transaction) => transaction.list(), + } + } +} diff --git a/opendut-carl/src/resources/storage/persistent.rs b/opendut-carl/src/resources/storage/persistent.rs index d85ccca33..9355aa6d8 100644 --- a/opendut-carl/src/resources/storage/persistent.rs +++ b/opendut-carl/src/resources/storage/persistent.rs @@ -1,45 +1,137 @@ -use std::sync::Mutex; - use crate::persistence::database::ConnectError; -use crate::persistence::error::PersistenceResult; +use crate::persistence::error::{PersistenceError, PersistenceResult}; use crate::persistence::resources::Persistable; use crate::persistence::{Db, Storage}; use crate::resources::storage::volatile::VolatileResourcesStorage; use crate::resources::storage::{DatabaseConnectInfo, Resource, ResourcesStorageApi}; +use diesel::{Connection, PgConnection}; +use std::any::Any; +use std::sync::Mutex; pub struct PersistentResourcesStorage { - storage: Storage, + db_connection: Mutex, + memory: Mutex, } impl PersistentResourcesStorage { pub async fn connect(database_connect_info: &DatabaseConnectInfo) -> Result { - let db = Db { - inner: Mutex::new( - crate::persistence::database::connect(database_connect_info).await? - ) - }; + let db_connection = crate::persistence::database::connect(database_connect_info).await?; + let db_connection = Mutex::new(db_connection); let memory = VolatileResourcesStorage::default(); - let storage = Storage { db, memory }; - Ok(Self { storage }) + let memory = Mutex::new(memory); + Ok(Self { db_connection, memory }) + } + + pub fn transaction(&mut self, code: F) -> PersistenceResult> + where + F: FnOnce(PersistentResourcesTransaction) -> Result, + E: std::error::Error + Send + Sync + 'static, + { + let transaction_result = self.db_connection.lock().unwrap().transaction::<_, TransactionPassthroughError, _>(|connection| { + let mut memory = self.memory.lock().unwrap(); + let transaction = PersistentResourcesTransaction { + db_connection: Mutex::new(connection), + memory: Mutex::new(&mut memory), + }; + + let result = code(transaction); + match result { + Ok(ok) => Ok(ok), + Err(error) => Err(TransactionPassthroughError::Passthrough(Box::new(error))), //passthrough via an Err-value to trigger transaction rollback + } + }); + + match transaction_result { + Ok(ok) => Ok(Ok(ok)), + Err(TransactionPassthroughError::Passthrough(error)) => { + let error = error.downcast::() + .expect("should be error of type E, like we handed it out from the transaction"); + Ok(Err(*error)) + } + Err(TransactionPassthroughError::Diesel(source)) => Err(PersistenceError::DieselInternal { source }), + } } } impl ResourcesStorageApi for PersistentResourcesStorage { fn insert(&mut self, id: R::Id, resource: R) -> PersistenceResult<()> where R: Resource + Persistable { - resource.insert(id, &mut self.storage) + let mut db = self.db_connection.lock().unwrap(); + let db = Db::from_connection(&mut db); + let mut storage = Storage { db, memory: &mut self.memory.lock().unwrap() }; + resource.insert(id, &mut storage) } fn remove(&mut self, id: R::Id) -> PersistenceResult> where R: Resource + Persistable { - R::remove(id, &mut self.storage) + let mut db = self.db_connection.lock().unwrap(); + let db = Db::from_connection(&mut db); + let mut storage = Storage { db, memory: &mut self.memory.lock().unwrap() }; + R::remove(id, &mut storage) } fn get(&self, id: R::Id) -> PersistenceResult> where R: Resource + Persistable + Clone { - R::get(id, &self.storage) + let mut db = self.db_connection.lock().unwrap(); + let db = Db::from_connection(&mut db); + let storage = Storage { db, memory: &mut self.memory.lock().unwrap() }; + R::get(id, &storage) } fn list(&self) -> PersistenceResult> where R: Resource + Persistable + Clone { - R::list(&self.storage) + let mut db = self.db_connection.lock().unwrap(); + let db = Db::from_connection(&mut db); + let storage = Storage { db, memory: &mut self.memory.lock().unwrap() }; + R::list(&storage) + } +} + + +pub struct PersistentResourcesTransaction<'a> { + db_connection: Mutex<&'a mut PgConnection>, + memory: Mutex<&'a mut VolatileResourcesStorage>, +} +impl ResourcesStorageApi for PersistentResourcesTransaction<'_> { + fn insert(&mut self, id: R::Id, resource: R) -> PersistenceResult<()> + where R: Resource + Persistable { + let mut db = self.db_connection.lock().unwrap(); + let db = Db::from_connection(&mut db); + let mut storage = Storage { db, memory: &mut self.memory.lock().unwrap() }; + resource.insert(id, &mut storage) } + + fn remove(&mut self, id: R::Id) -> PersistenceResult> + where R: Resource + Persistable { + let mut db = self.db_connection.lock().unwrap(); + let db = Db::from_connection(&mut db); + let mut storage = Storage { db, memory: &mut self.memory.lock().unwrap() }; + R::remove(id, &mut storage) + } + + fn get(&self, id: R::Id) -> PersistenceResult> + where + R: Resource + Persistable + Clone + { + let mut db = self.db_connection.lock().unwrap(); + let db = Db::from_connection(&mut db); + let storage = Storage { db, memory: &mut self.memory.lock().unwrap() }; + R::get(id, &storage) + } + + fn list(&self) -> PersistenceResult> + where + R: Resource + Persistable + Clone + { + let mut db = self.db_connection.lock().unwrap(); + let db = Db::from_connection(&mut db); + let storage = Storage { db, memory: &mut self.memory.lock().unwrap() }; + R::list(&storage) + } +} + +#[derive(Debug, thiserror::Error)] +enum TransactionPassthroughError { + #[error("Error returned by Diesel while performing transaction.")] + Diesel(#[from] diesel::result::Error), + #[error("Error returned by the code executed within the transaction.")] + Passthrough(Box), } diff --git a/opendut-carl/src/resources/storage/tests/mod.rs b/opendut-carl/src/resources/storage/tests/mod.rs index 72084c24e..84a57ee0c 100644 --- a/opendut-carl/src/resources/storage/tests/mod.rs +++ b/opendut-carl/src/resources/storage/tests/mod.rs @@ -2,4 +2,5 @@ mod peer_descriptor; mod cluster_configuration; -mod cluster_deployment; \ No newline at end of file +mod cluster_deployment; +mod transaction; diff --git a/opendut-carl/src/resources/storage/tests/transaction.rs b/opendut-carl/src/resources/storage/tests/transaction.rs new file mode 100644 index 000000000..73a6e6356 --- /dev/null +++ b/opendut-carl/src/resources/storage/tests/transaction.rs @@ -0,0 +1,38 @@ +use crate::persistence; +use crate::persistence::error::PersistenceError; +use crate::resources::storage::tests::peer_descriptor::peer_descriptor; +use crate::resources::storage::ResourcesStorageApi; +use googletest::prelude::*; +use opendut_types::cluster::{ClusterDeployment, ClusterId}; +use opendut_types::peer::PeerDescriptor; + +#[test_with::no_env(SKIP_DATABASE_CONTAINER_TESTS)] +#[tokio::test] +async fn should_rollback_from_an_error_during_a_transaction() -> anyhow::Result<()> { + let db = persistence::database::testing::spawn_and_connect_resources_manager().await?; + let resources_manager = db.resources_manager; + + let peer = peer_descriptor()?; + let peer_id = peer.id; + + let result = resources_manager.get::(peer_id).await?; + assert!(result.is_none()); + + let error = resources_manager.resources_mut(|resources| { + resources.insert(peer_id, peer)?; //will be rolled back + let result = resources.get::(peer_id)?; + assert!(result.is_some()); + + let non_existent_cluster_id = ClusterId::random(); + resources.insert(non_existent_cluster_id, ClusterDeployment { id: non_existent_cluster_id })?; //fails because no Cluster with that ID was created + + Ok::<_, PersistenceError>(()) + }).await; + + assert_that!(error, ok(err(anything()))); + + let result = resources_manager.get::(peer_id).await?; + assert!(result.is_none()); //database rolled back due to error + + Ok(()) +} diff --git a/opendut-carl/src/resources/storage/volatile.rs b/opendut-carl/src/resources/storage/volatile.rs index 0dcbe758b..9a72dde07 100644 --- a/opendut-carl/src/resources/storage/volatile.rs +++ b/opendut-carl/src/resources/storage/volatile.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use opendut_types::resources::Id; use crate::persistence::error::PersistenceResult; +use crate::persistence::resources::Persistable; use crate::resources::ids::IntoId; use crate::resources::storage::ResourcesStorageApi; use crate::resources::Resource; @@ -12,6 +13,17 @@ use crate::resources::Resource; pub struct VolatileResourcesStorage { storage: HashMap>>, } +impl VolatileResourcesStorage { + pub fn noop_transaction(&mut self, code: F) -> PersistenceResult> + where + F: FnOnce(VolatileResourcesTransaction) -> Result, + E: std::error::Error + Send + Sync + 'static, + { + let transaction = VolatileResourcesTransaction { inner: self }; + Ok(code(transaction)) + } +} + impl ResourcesStorageApi for VolatileResourcesStorage { fn insert(&mut self, id: R::Id, resource: R) -> PersistenceResult<()> @@ -100,3 +112,28 @@ impl VolatileResourcesStorage { self.storage.is_empty() } } + +pub struct VolatileResourcesTransaction<'a> { + inner: &'a mut VolatileResourcesStorage, +} +impl ResourcesStorageApi for VolatileResourcesTransaction<'_> { + fn insert(&mut self, id: R::Id, resource: R) -> PersistenceResult<()> + where R: Resource + Persistable { + self.inner.insert(id, resource) + } + + fn remove(&mut self, id: R::Id) -> PersistenceResult> + where R: Resource + Persistable { + self.inner.remove(id) + } + + fn get(&self, id: R::Id) -> PersistenceResult> + where R: Resource + Persistable + Clone { + self.inner.get(id) + } + + fn list(&self) -> PersistenceResult> + where R: Resource + Persistable + Clone { + self.inner.list() + } +}