From 2b17a1c88399be69445c97e3062923c7023054b0 Mon Sep 17 00:00:00 2001 From: Tim Bruijnzeels Date: Thu, 21 Sep 2023 13:17:40 +0200 Subject: [PATCH] Support migrations using non-disk storage #1094 * Use kvx with explicit namespace type. * Do not depend on a data dir for storage. * Fix upgrade code. --- Cargo.lock | 15 +- Cargo.toml | 4 +- src/bin/krillup.rs | 10 +- src/cli/ta_client.rs | 5 +- src/commons/eventsourcing/kv.rs | 161 ++++- src/commons/eventsourcing/mod.rs | 18 +- src/commons/eventsourcing/store.rs | 28 +- src/commons/eventsourcing/wal.rs | 7 +- src/commons/util/mod.rs | 6 +- src/commons/util/storage.rs | 61 -- src/constants.rs | 29 +- src/daemon/auth/common/crypt.rs | 4 +- src/daemon/ca/status.rs | 51 +- src/daemon/config.rs | 66 ++- src/daemon/properties/mod.rs | 2 +- src/daemon/ta/mod.rs | 6 +- src/pubd/manager.rs | 16 +- src/pubd/repository.rs | 7 +- src/test.rs | 17 +- src/upgrades/mod.rs | 561 +++++++----------- src/upgrades/pre_0_10_0/cas_migration.rs | 15 +- src/upgrades/pre_0_10_0/old_events.rs | 6 +- src/upgrades/pre_0_10_0/pubd_migration.rs | 22 +- src/upgrades/pre_0_14_0/mod.rs | 8 +- .../{ => status}/ta/status.json | 0 .../{ => status}/testbed/status.json | 0 tests/auth_check.rs | 4 +- tests/benchmark.rs | 2 +- tests/functional_ca_import.rs | 2 +- tests/functional_keyroll.rs | 3 +- 30 files changed, 545 insertions(+), 591 deletions(-) delete mode 100644 src/commons/util/storage.rs rename test-resources/status_store/migration-0.9.5/{ => status}/ta/status.json (100%) rename test-resources/status_store/migration-0.9.5/{ => status}/testbed/status.json (100%) diff --git a/Cargo.lock b/Cargo.lock index 0257db892..4f82d90ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1130,9 +1130,8 @@ dependencies = [ [[package]] name = "kvx" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aea5d159eef7e2aa78c53130afef47424676a67195299aa84ba092ab86eedcdf" +version = "0.7.0" +source = "git+https://github.com/nlnetlabs/kvx?branch=support-namespace-migrations#3619a8541028e65b750c678413c9b7089d9f6c4e" dependencies = [ "kvx_macros", "kvx_types", @@ -1148,9 +1147,8 @@ dependencies = [ [[package]] name = "kvx_macros" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f40efe43ad6ed19cf391e87927dd6c1cd18f231f00112111a6e9d3214fda6e46" +version = "0.7.0" +source = "git+https://github.com/nlnetlabs/kvx?branch=support-namespace-migrations#3619a8541028e65b750c678413c9b7089d9f6c4e" dependencies = [ "kvx_types", "proc-macro-error", @@ -1161,9 +1159,8 @@ dependencies = [ [[package]] name = "kvx_types" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "261784226d4f9e160da81b401ee77ce1a3c00598599f3d7da93a6fc6caa93d9d" +version = "0.7.0" +source = "git+https://github.com/nlnetlabs/kvx?branch=support-namespace-migrations#3619a8541028e65b750c678413c9b7089d9f6c4e" dependencies = [ "postgres", "postgres-types", diff --git a/Cargo.toml b/Cargo.toml index 3d138c46e..f4cfb2376 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,8 +36,8 @@ hyper = { version = "^0.14", features = ["server"] } intervaltree = "0.2.6" jmespatch = { version = "^0.3", features = ["sync"], optional = true } kmip = { version = "0.4.2", package = "kmip-protocol", features = ["tls-with-openssl"], optional = true } -kvx = { version = "0.6.0", features = ["macros"] } -# kvx = { version = "0.6.0", git = "https://github.com/nlnetlabs/kvx", features = ["macros"] } +# kvx = { version = "0.6.0", features = ["macros"] } +kvx = { version = "0.7.0", git = "https://github.com/nlnetlabs/kvx", branch = "support-namespace-migrations", features = ["macros"] } libflate = "^1" log = "^0.4" once_cell = { version = "^1.7.2", optional = true } diff --git a/src/bin/krillup.rs b/src/bin/krillup.rs index f4aad1283..5e9006d31 100644 --- a/src/bin/krillup.rs +++ b/src/bin/krillup.rs @@ -31,8 +31,7 @@ async fn main() { match Config::create(config_file, true) { Ok(config) => { - let properties_manager = match PropertiesManager::create(&config.storage_uri, config.use_history_cache) - { + let properties_manager = match PropertiesManager::create(&config.storage_uri, config.use_history_cache) { Ok(mgr) => mgr, Err(e) => { eprintln!("*** Error Preparing Data Migration ***"); @@ -61,12 +60,7 @@ async fn main() { let from = report.versions().from(); let to = report.versions().to(); if report.data_migration() { - info!( - "Prepared and verified upgrade from {} to {}. Prepared data was saved to: {}", - from, - to, - config.upgrade_storage_uri() - ); + info!("Prepared and verified upgrade from {} to {}.", from, to,); } else { info!("No preparation is needed for the upgrade from {} to {}.", from, to) } diff --git a/src/cli/ta_client.rs b/src/cli/ta_client.rs index e60778c0e..3e9f0de49 100644 --- a/src/cli/ta_client.rs +++ b/src/cli/ta_client.rs @@ -12,6 +12,7 @@ use std::{ use bytes::Bytes; use clap::{App, Arg, ArgMatches, SubCommand}; + use log::LevelFilter; use rpki::{ ca::idexchange::{self, ChildHandle, RepoInfo, ServiceUri}, @@ -28,7 +29,7 @@ use crate::{ api::{AddChildRequest, ApiRepositoryContact, CertAuthInfo, IdCertInfo, RepositoryContact, Token}, crypto::{KrillSigner, KrillSignerBuilder, OpenSslSignerConfig}, error::Error as KrillError, - eventsourcing::{segment, AggregateStore, AggregateStoreError, Segment}, + eventsourcing::{namespace, AggregateStore, AggregateStoreError, Namespace}, util::{file, httpclient}, }, constants::{ @@ -1002,7 +1003,7 @@ struct TrustAnchorSignerManager { impl TrustAnchorSignerManager { fn create(config: Config) -> Result { - let store = AggregateStore::create(&config.storage_uri, segment!("signer"), config.use_history_cache) + let store = AggregateStore::create(&config.storage_uri, namespace!("signer"), config.use_history_cache) .map_err(KrillError::AggregateStoreError)?; let ta_handle = TrustAnchorHandle::new("ta".into()); let signer = config.signer()?; diff --git a/src/commons/eventsourcing/kv.rs b/src/commons/eventsourcing/kv.rs index 44146c77b..11c34f9f7 100644 --- a/src/commons/eventsourcing/kv.rs +++ b/src/commons/eventsourcing/kv.rs @@ -1,7 +1,7 @@ -use std::fmt; +use std::{fmt, str::FromStr}; -pub use kvx::{segment, Key, Scope, Segment, SegmentBuf}; -use kvx::{KeyValueStoreBackend, ReadStore, WriteStore}; +pub use kvx::{namespace, segment, Key, Namespace, Scope, Segment, SegmentBuf}; +use kvx::{KeyValueStoreBackend, NamespaceBuf, ReadStore, WriteStore}; use serde::{de::DeserializeOwned, Serialize}; use url::Url; @@ -39,13 +39,78 @@ pub struct KeyValueStore { } impl KeyValueStore { - /// Creates a new KeyValueStore and initializes the version if it had - /// not been set. - pub fn create(storage_uri: &Url, name_space: impl Into) -> Result { - let store = KeyValueStore { - inner: kvx::KeyValueStore::new(storage_uri, name_space)?, - }; - Ok(store) + /// Creates a new KeyValueStore. + pub fn create(storage_uri: &Url, namespace: &Namespace) -> Result { + kvx::KeyValueStore::new(storage_uri, namespace) + .map(|inner| KeyValueStore { inner }) + .map_err(KeyValueError::KVError) + } + + /// Creates a new KeyValueStore for upgrades. + /// + /// Adds the implicit prefix "upgrade-{version}-" to the given namespace. + pub fn create_upgrade_store(storage_uri: &Url, namespace: &Namespace) -> Result { + let namespace = Self::prefixed_namespace(namespace, "upgrade")?; + + kvx::KeyValueStore::new(storage_uri, namespace) + .map(|inner| KeyValueStore { inner }) + .map_err(KeyValueError::KVError) + } + + fn prefixed_namespace(namespace: &Namespace, prefix: &str) -> Result { + let namespace_string = format!("{}_{}", prefix, namespace); + NamespaceBuf::from_str(&namespace_string) + .map_err(|e| KeyValueError::Other(format!("Cannot parse namespace: {}. Error: {}", namespace_string, e))) + } + + /// Archive this store (i.e. for this namespace). Deletes + /// any existing archive for this namespace if present. + pub fn migrate_to_archive(&mut self, storage_uri: &Url, namespace: &Namespace) -> Result<(), KeyValueError> { + let archive_ns = Self::prefixed_namespace(namespace, "archive")?; + // Wipe any existing archive, before archiving this store. + // We don't want to keep too much old data. See issue: #1088. + let archive_store = KeyValueStore::create(storage_uri, namespace)?; + archive_store.wipe()?; + + self.inner.migrate_namespace(archive_ns).map_err(KeyValueError::KVError) + } + + /// Make this (upgrade) store the current store. + /// + /// Fails if there is a non-empty current store. + pub fn migrate_to_current(&mut self, storage_uri: &Url, namespace: &Namespace) -> Result<(), KeyValueError> { + let current_store = KeyValueStore::create(storage_uri, namespace)?; + if !current_store.is_empty()? { + Err(KeyValueError::Other(format!( + "Abort migrate upgraded store for {} to current. The current store was not archived.", + namespace + ))) + } else { + self.inner + .migrate_namespace(namespace.into()) + .map_err(KeyValueError::KVError) + } + } + + /// Returns true if this KeyValueStore (with this namespace) has any entries + pub fn is_empty(&self) -> Result { + self.inner.is_empty().map_err(KeyValueError::KVError) + } + + /// Import all data from the given KV store into this + pub fn import(&self, other: &Self) -> Result<(), KeyValueError> { + let mut scopes = other.scopes()?; + scopes.push(Scope::global()); // not explicitly listed but should be migrated as well. + + for scope in scopes { + for key in other.keys(&scope, "")? { + if let Some(value) = other.get_raw_value(&key)? { + self.store_raw_value(&key, value)?; + } + } + } + + Ok(()) } /// Stores a key value pair, serialized as json, overwrite existing @@ -67,13 +132,35 @@ impl KeyValueStore { /// Gets a value for a key, returns an error if the value cannot be deserialized, /// returns None if it cannot be found. pub fn get(&self, key: &Key) -> Result, KeyValueError> { - if let Some(value) = self.inner.get(key)? { - Ok(serde_json::from_value(value)?) + if let Some(value) = self.get_raw_value(key)? { + match serde_json::from_value(value) { + Ok(result) => Ok(result), + Err(e) => { + // Get the value again so that we can do a full error report + let value = self.get_raw_value(key)?; + let value_str = value.map(|v| v.to_string()).unwrap_or("".to_string()); + + let expected_type = std::any::type_name::(); + + Err(KeyValueError::Other(format!( + "Could not deserialize value for key '{}'. Expected type: {}. Error: {}. Value was: {}", + key, expected_type, e, value_str + ))) + } + } } else { Ok(None) } } + fn get_raw_value(&self, key: &Key) -> Result, KeyValueError> { + self.inner.get(key).map_err(KeyValueError::KVError) + } + + fn store_raw_value(&self, key: &Key, value: serde_json::Value) -> Result<(), KeyValueError> { + self.inner.store(key, value).map_err(KeyValueError::KVError) + } + /// Transactional `get`. pub fn get_transactional(&self, key: &Key) -> Result, KeyValueError> { let mut result: Option = None; @@ -116,7 +203,7 @@ impl KeyValueStore { } /// Archive a key - pub fn archive(&self, key: &Key) -> Result<(), KeyValueError> { + pub fn archive_key(&self, key: &Key) -> Result<(), KeyValueError> { self.move_key(key, &key.clone().with_sub_scope(segment!("archived"))) } @@ -130,7 +217,7 @@ impl KeyValueStore { self.move_key(key, &key.clone().with_sub_scope(segment!("surplus"))) } - /// Returns all 1st level scopes + /// Returns all scopes, including sub_scopes pub fn scopes(&self) -> Result, KeyValueError> { Ok(self.inner.list_scopes()?) } @@ -172,6 +259,7 @@ pub enum KeyValueError { UnknownKey(Key), DuplicateKey(Key), KVError(kvx::Error), + Other(String), } impl From for KeyValueError { @@ -201,6 +289,7 @@ impl fmt::Display for KeyValueError { KeyValueError::UnknownKey(key) => write!(f, "Unknown key: {}", key), KeyValueError::DuplicateKey(key) => write!(f, "Duplicate key: {}", key), KeyValueError::KVError(e) => write!(f, "Store error: {}", e), + KeyValueError::Other(msg) => write!(f, "{}", msg), } } } @@ -224,6 +313,16 @@ mod tests { .unwrap() } + fn random_namespace() -> NamespaceBuf { + rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(8) + .map(char::from) + .collect::() + .parse() + .unwrap() + } + fn get_storage_uri() -> Url { env::var("KRILL_KV_STORAGE_URL") .ok() @@ -235,7 +334,7 @@ mod tests { fn test_store() { let storage_uri = get_storage_uri(); - let store = KeyValueStore::create(&storage_uri, random_segment()).unwrap(); + let store = KeyValueStore::create(&storage_uri, &random_namespace()).unwrap(); let content = "content".to_owned(); let key = Key::new_global(random_segment()); @@ -248,7 +347,7 @@ mod tests { fn test_store_new() { let storage_uri = get_storage_uri(); - let store = KeyValueStore::create(&storage_uri, random_segment()).unwrap(); + let store = KeyValueStore::create(&storage_uri, &random_namespace()).unwrap(); let content = "content".to_owned(); let key = Key::new_global(random_segment()); @@ -260,7 +359,7 @@ mod tests { fn test_store_scoped() { let storage_uri = get_storage_uri(); - let store = KeyValueStore::create(&storage_uri, random_segment()).unwrap(); + let store = KeyValueStore::create(&storage_uri, &random_namespace()).unwrap(); let content = "content".to_owned(); let id = random_segment(); let scope = Scope::from_segment(segment!("scope")); @@ -281,7 +380,7 @@ mod tests { fn test_get() { let storage_uri = get_storage_uri(); - let store = KeyValueStore::create(&storage_uri, random_segment()).unwrap(); + let store = KeyValueStore::create(&storage_uri, &random_namespace()).unwrap(); let content = "content".to_owned(); let key = Key::new_global(random_segment()); assert_eq!(store.get::(&key).unwrap(), None); @@ -294,7 +393,7 @@ mod tests { fn test_get_transactional() { let storage_uri = get_storage_uri(); - let store = KeyValueStore::create(&storage_uri, random_segment()).unwrap(); + let store = KeyValueStore::create(&storage_uri, &random_namespace()).unwrap(); let content = "content".to_owned(); let key = Key::new_global(random_segment()); assert_eq!(store.get_transactional::(&key).unwrap(), None); @@ -307,7 +406,7 @@ mod tests { fn test_has() { let storage_uri = get_storage_uri(); - let store = KeyValueStore::create(&storage_uri, random_segment()).unwrap(); + let store = KeyValueStore::create(&storage_uri, &random_namespace()).unwrap(); let content = "content".to_owned(); let key = Key::new_global(random_segment()); assert!(!store.has(&key).unwrap()); @@ -320,7 +419,7 @@ mod tests { fn test_drop_key() { let storage_uri = get_storage_uri(); - let store = KeyValueStore::create(&storage_uri, random_segment()).unwrap(); + let store = KeyValueStore::create(&storage_uri, &random_namespace()).unwrap(); let content = "content".to_owned(); let key = Key::new_global(random_segment()); store.store(&key, &content).unwrap(); @@ -334,7 +433,7 @@ mod tests { fn test_drop_scope() { let storage_uri = get_storage_uri(); - let store = KeyValueStore::create(&storage_uri, random_segment()).unwrap(); + let store = KeyValueStore::create(&storage_uri, &random_namespace()).unwrap(); let content = "content".to_owned(); let scope = Scope::from_segment(random_segment()); let key = Key::new_scoped(scope.clone(), random_segment()); @@ -355,7 +454,7 @@ mod tests { fn test_wipe() { let storage_uri = get_storage_uri(); - let store = KeyValueStore::create(&storage_uri, random_segment()).unwrap(); + let store = KeyValueStore::create(&storage_uri, &random_namespace()).unwrap(); let content = "content".to_owned(); let scope = Scope::from_segment(segment!("scope")); let key = Key::new_scoped(scope.clone(), random_segment()); @@ -373,7 +472,7 @@ mod tests { fn test_move_key() { let storage_uri = get_storage_uri(); - let store = KeyValueStore::create(&storage_uri, random_segment()).unwrap(); + let store = KeyValueStore::create(&storage_uri, &random_namespace()).unwrap(); let content = "content".to_string(); let key = Key::new_global(random_segment()); @@ -390,14 +489,14 @@ mod tests { fn test_archive() { let storage_uri = get_storage_uri(); - let store = KeyValueStore::create(&storage_uri, random_segment()).unwrap(); + let store = KeyValueStore::create(&storage_uri, &random_namespace()).unwrap(); let content = "content".to_string(); let key = Key::new_global(random_segment()); store.store(&key, &content).unwrap(); assert!(store.has(&key).unwrap()); - store.archive(&key).unwrap(); + store.archive_key(&key).unwrap(); assert!(!store.has(&key).unwrap()); assert!(store.has(&key.with_sub_scope(segment!("archived"))).unwrap()); } @@ -406,7 +505,7 @@ mod tests { fn test_archive_corrupt() { let storage_uri = get_storage_uri(); - let store = KeyValueStore::create(&storage_uri, random_segment()).unwrap(); + let store = KeyValueStore::create(&storage_uri, &random_namespace()).unwrap(); let content = "content".to_string(); let key = Key::new_global(random_segment()); @@ -422,7 +521,7 @@ mod tests { fn test_archive_surplus() { let storage_uri = get_storage_uri(); - let store = KeyValueStore::create(&storage_uri, random_segment()).unwrap(); + let store = KeyValueStore::create(&storage_uri, &random_namespace()).unwrap(); let content = "content".to_string(); let key = Key::new_global(random_segment()); @@ -438,7 +537,7 @@ mod tests { fn test_scopes() { let storage_uri = get_storage_uri(); - let store = KeyValueStore::create(&storage_uri, random_segment()).unwrap(); + let store = KeyValueStore::create(&storage_uri, &random_namespace()).unwrap(); let content = "content".to_owned(); let id = segment!("id"); let scope = Scope::from_segment(random_segment()); @@ -467,7 +566,7 @@ mod tests { fn test_has_scope() { let storage_uri = get_storage_uri(); - let store = KeyValueStore::create(&storage_uri, random_segment()).unwrap(); + let store = KeyValueStore::create(&storage_uri, &random_namespace()).unwrap(); let content = "content".to_owned(); let scope = Scope::from_segment(random_segment()); let key = Key::new_scoped(scope.clone(), segment!("id")); @@ -481,7 +580,7 @@ mod tests { fn test_keys() { let storage_uri = get_storage_uri(); - let store = KeyValueStore::create(&storage_uri, random_segment()).unwrap(); + let store = KeyValueStore::create(&storage_uri, &random_namespace()).unwrap(); let content = "content".to_owned(); let id = segment!("command--id"); let scope = Scope::from_segment(segment!("command")); diff --git a/src/commons/eventsourcing/mod.rs b/src/commons/eventsourcing/mod.rs index bf12907df..68d46fdf2 100644 --- a/src/commons/eventsourcing/mod.rs +++ b/src/commons/eventsourcing/mod.rs @@ -21,7 +21,9 @@ pub use self::listener::*; pub mod locks; mod kv; -pub use self::kv::{segment, Key, KeyValueError, KeyValueStore, Scope, Segment, SegmentBuf, SegmentExt}; +pub use self::kv::{ + namespace, segment, Key, KeyValueError, KeyValueStore, Namespace, Scope, Segment, SegmentBuf, SegmentExt, +}; //------------ Tests --------------------------------------------------------- @@ -45,7 +47,7 @@ mod tests { api::{CommandHistoryCriteria, CommandSummary}, }, constants::ACTOR_DEF_TEST, - test::tmp_storage, + test::mem_storage, }; use super::*; @@ -337,13 +339,11 @@ mod tests { #[test] fn event_sourcing_framework() { - // crate::test::test_under_tmp(|data_dir| { - // let storage_uri = crate::commons::util::storage::storage_uri_from_data_dir(&data_dir).unwrap(); - - let storage_uri = tmp_storage(); + let storage_uri = mem_storage(); let counter = Arc::new(EventCounter::default()); - let mut manager = AggregateStore::::create(&storage_uri, segment!("person"), false).unwrap(); + + let mut manager = AggregateStore::::create(&storage_uri, namespace!("person"), false).unwrap(); manager.add_post_save_listener(counter.clone()); let alice_name = "alice smith".to_string(); @@ -375,8 +375,8 @@ mod tests { assert_eq!("alice smith-doe", alice.name()); assert_eq!(21, alice.age()); - // Should read state from disk - let manager = AggregateStore::::create(&storage_uri, segment!("person"), false).unwrap(); + // Should read state again when restarted with same data store mapping. + let manager = AggregateStore::::create(&storage_uri, namespace!("person"), false).unwrap(); let alice = manager.get_latest(&alice_handle).unwrap(); assert_eq!("alice smith-doe", alice.name()); diff --git a/src/commons/eventsourcing/store.rs b/src/commons/eventsourcing/store.rs index 85474f5e1..88a3a5693 100644 --- a/src/commons/eventsourcing/store.rs +++ b/src/commons/eventsourcing/store.rs @@ -5,6 +5,7 @@ use std::{ sync::{Arc, Mutex, RwLock}, }; +use kvx::Namespace; use rpki::{ca::idexchange::MyHandle, repository::x509::Time}; use serde::{de::DeserializeOwned, Serialize}; use url::Url; @@ -14,7 +15,7 @@ use crate::commons::{ error::KrillIoError, eventsourcing::{ cmd::Command, locks::HandleLocks, segment, Aggregate, Key, KeyValueError, KeyValueStore, PostSaveEventListener, - PreSaveEventListener, Scope, Segment, SegmentBuf, SegmentExt, StoredCommand, StoredCommandBuilder, + PreSaveEventListener, Scope, Segment, SegmentExt, StoredCommand, StoredCommandBuilder, }, }; @@ -42,13 +43,23 @@ pub struct AggregateStore { /// # Starting up /// impl AggregateStore { - /// Creates an AggregateStore using a disk based KeyValueStore - pub fn create( + /// Creates an AggregateStore using the given storage url + pub fn create(storage_uri: &Url, name_space: &Namespace, use_history_cache: bool) -> StoreResult { + let kv = KeyValueStore::create(storage_uri, name_space)?; + Self::create_from_kv(kv, use_history_cache) + } + + /// Creates an AggregateStore for upgrades using the given storage url + pub fn create_upgrade_store( storage_uri: &Url, - name_space: impl Into, + name_space: &Namespace, use_history_cache: bool, ) -> StoreResult { - let kv = KeyValueStore::create(storage_uri, name_space)?; + let kv = KeyValueStore::create_upgrade_store(storage_uri, name_space)?; + Self::create_from_kv(kv, use_history_cache) + } + + fn create_from_kv(kv: KeyValueStore, use_history_cache: bool) -> StoreResult { let cache = RwLock::new(HashMap::new()); let history_cache = if !use_history_cache { None @@ -394,7 +405,10 @@ where // Little local helper so we can use borrowed records without keeping // the lock longer than it wants to live. - fn command_history_for_records(crit: CommandHistoryCriteria, records: &[CommandHistoryRecord]) -> CommandHistory { + fn command_history_for_records( + crit: CommandHistoryCriteria, + records: &[CommandHistoryRecord], + ) -> CommandHistory { let offset = crit.offset(); let rows = match crit.rows_limit() { @@ -419,7 +433,7 @@ where CommandHistory::new(offset, total, matching) } - + match &self.history_cache { Some(mutex) => { let mut cache_lock = mutex.lock().unwrap(); diff --git a/src/commons/eventsourcing/wal.rs b/src/commons/eventsourcing/wal.rs index 0fd7b5a59..84560c130 100644 --- a/src/commons/eventsourcing/wal.rs +++ b/src/commons/eventsourcing/wal.rs @@ -5,12 +5,13 @@ use std::{ sync::{Arc, RwLock}, }; +use kvx::Namespace; use rpki::ca::idexchange::MyHandle; use serde::Serialize; use url::Url; use crate::commons::eventsourcing::{ - locks::HandleLocks, segment, Key, KeyValueError, KeyValueStore, Scope, Segment, SegmentBuf, SegmentExt, Storable, + locks::HandleLocks, segment, Key, KeyValueError, KeyValueStore, Scope, Segment, SegmentExt, Storable, }; //------------ WalSupport ---------------------------------------------------- @@ -129,7 +130,7 @@ pub struct WalStore { impl WalStore { /// Creates a new store using a disk based keystore for the given data /// directory and namespace (directory). - pub fn create(storage_uri: &Url, name_space: impl Into) -> WalStoreResult { + pub fn create(storage_uri: &Url, name_space: &Namespace) -> WalStoreResult { let kv = KeyValueStore::create(storage_uri, name_space)?; let cache = RwLock::new(HashMap::new()); let locks = HandleLocks::default(); @@ -355,7 +356,7 @@ impl WalStore { if let Ok(revision) = u64::from_str(number) { if revision < latest.revision() { if archive { - self.kv.archive(&key)?; + self.kv.archive_key(&key)?; } else { self.kv.drop_key(&key)?; } diff --git a/src/commons/util/mod.rs b/src/commons/util/mod.rs index 297909689..916a38768 100644 --- a/src/commons/util/mod.rs +++ b/src/commons/util/mod.rs @@ -15,7 +15,6 @@ pub mod cmslogger; pub mod ext_serde; pub mod file; pub mod httpclient; -pub mod storage; //------------ KrillVersion -------------------------------------------------- @@ -36,6 +35,11 @@ impl KrillVersion { Self::from_str(KRILL_VERSION).unwrap() } + /// Make a notation friendly to namespaces for upgrades. + pub fn hyphen_notated(&self) -> String { + format!("{}-{}-{}{}", self.major, self.minor, self.patch, self.release_type) + } + pub fn v0_5_0_or_before() -> Self { Self::dev(0, 5, 0, "or-before".to_string()) } diff --git a/src/commons/util/storage.rs b/src/commons/util/storage.rs deleted file mode 100644 index 49c1c5452..000000000 --- a/src/commons/util/storage.rs +++ /dev/null @@ -1,61 +0,0 @@ -use std::path::{Path, PathBuf}; -use url::Url; - -use crate::commons::{error::Error, KrillResult}; - -pub fn data_dir_from_storage_uri(storage_uri: &Url) -> Option { - if storage_uri.scheme() != "local" { - None - } else { - Some( - Path::new(&format!( - "{}{}", - storage_uri.host_str().unwrap_or(""), - storage_uri.path() - )) - .to_path_buf(), - ) - } -} - -// TODO mark as test only -// #[cfg(test)] -pub fn storage_uri_from_data_dir(data_dir: &Path) -> KrillResult { - Url::parse(&format!("local://{}/", data_dir.to_string_lossy())).map_err(|e| Error::custom(e.to_string())) -} - -#[cfg(test)] -mod tests { - use std::path::{Path, PathBuf}; - use url::Url; - - use crate::commons::util::storage::{data_dir_from_storage_uri, storage_uri_from_data_dir}; - - #[test] - fn conversion() { - assert_eq!( - data_dir_from_storage_uri(&Url::parse("local:///tmp/test").unwrap()).unwrap(), - PathBuf::from("/tmp/test") - ); - assert_eq!( - data_dir_from_storage_uri(&Url::parse("local://./data").unwrap()).unwrap(), - PathBuf::from("./data") - ); - assert_eq!( - data_dir_from_storage_uri(&Url::parse("local://data").unwrap()).unwrap(), - PathBuf::from("data") - ); - assert_eq!( - data_dir_from_storage_uri(&Url::parse("local://data/test").unwrap()).unwrap(), - PathBuf::from("data/test") - ); - assert_eq!( - storage_uri_from_data_dir(Path::new("./data")).unwrap(), - Url::parse("local://./data/").unwrap() - ); - assert_eq!( - storage_uri_from_data_dir(Path::new("/tmp/data")).unwrap(), - Url::parse("local:///tmp/data/").unwrap() - ); - } -} diff --git a/src/constants.rs b/src/constants.rs index ccd91c60a..f26b1911b 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -1,8 +1,7 @@ +use kvx::Namespace; + use crate::{ - commons::{ - actor::ActorDef, - eventsourcing::{segment, Segment}, - }, + commons::{actor::ActorDef, eventsourcing::namespace}, daemon::auth::common::NoResourceType, }; @@ -47,18 +46,16 @@ pub fn test_announcements_enabled() -> bool { // until const fn's are more versatile for str's, we need to use lazy_static to be able to expand the segment macro at // compile time, while running the expanded code, which actually makes it a Segment, at runtime -pub const CASERVER_NS: &Segment = segment!("cas"); -pub const CA_OBJECTS_NS: &Segment = segment!("ca_objects"); -pub const KEYS_NS: &Segment = segment!("keys"); -pub const PUBSERVER_CONTENT_NS: &Segment = segment!("pubd_objects"); -pub const PUBSERVER_NS: &Segment = segment!("pubd"); -pub const PROPERTIES_NS: &Segment = segment!("properties"); -pub const SIGNERS_NS: &Segment = segment!("signers"); -pub const STATUS_NS: &Segment = segment!("status"); -pub const TA_PROXY_SERVER_NS: &Segment = segment!("ta_proxy"); -pub const TA_SIGNER_SERVER_NS: &Segment = segment!("ta_signer"); - -pub const UPGRADE_DIR: &str = "upgrade-data"; +pub const CASERVER_NS: &Namespace = namespace!("cas"); +pub const CA_OBJECTS_NS: &Namespace = namespace!("ca_objects"); +pub const KEYS_NS: &Namespace = namespace!("keys"); +pub const PUBSERVER_CONTENT_NS: &Namespace = namespace!("pubd_objects"); +pub const PUBSERVER_NS: &Namespace = namespace!("pubd"); +pub const PROPERTIES_NS: &Namespace = namespace!("properties"); +pub const SIGNERS_NS: &Namespace = namespace!("signers"); +pub const STATUS_NS: &Namespace = namespace!("status"); +pub const TA_PROXY_SERVER_NS: &Namespace = namespace!("ta_proxy"); +pub const TA_SIGNER_SERVER_NS: &Namespace = namespace!("ta_signer"); pub const PROPERTIES_DFLT_NAME: &str = "main"; diff --git a/src/daemon/auth/common/crypt.rs b/src/daemon/auth/common/crypt.rs index a4ae61a9b..a74f2cc05 100644 --- a/src/daemon/auth/common/crypt.rs +++ b/src/daemon/auth/common/crypt.rs @@ -18,7 +18,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; -use kvx::{segment, Key, Segment}; +use kvx::{namespace, segment, Key, Namespace, Segment}; use crate::{ commons::{error::Error, util::ext_serde, KrillResult}, @@ -34,7 +34,7 @@ const POLY1305_TAG_BYTE_LEN: usize = POLY1305_TAG_BIT_LEN / 8; const CLEARTEXT_PREFIX_LEN: usize = CHACHA20_NONCE_BYTE_LEN + POLY1305_TAG_BYTE_LEN; const UNUSED_AAD: [u8; 0] = [0; 0]; -const CRYPT_STATE_NS: &Segment = segment!("login_sessions"); +const CRYPT_STATE_NS: &Namespace = namespace!("login_sessions"); const CRYPT_STATE_KEY: &Segment = segment!("main_key"); #[derive(Debug, Deserialize, Serialize)] diff --git a/src/daemon/ca/status.rs b/src/daemon/ca/status.rs index 75b304c0d..ae752aa6a 100644 --- a/src/daemon/ca/status.rs +++ b/src/daemon/ca/status.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, str::FromStr, sync::RwLock}; +use kvx::Namespace; use rpki::ca::{ idexchange::{CaHandle, ChildHandle, ParentHandle, ServiceUri}, provisioning::ResourceClassListResponse as Entitlements, @@ -13,7 +14,7 @@ use crate::commons::{ RepoStatus, }, error::Error, - eventsourcing::{segment, Key, KeyValueStore, Scope, Segment, SegmentBuf, SegmentExt}, + eventsourcing::{segment, Key, KeyValueStore, Scope, Segment, SegmentExt}, util::httpclient, KrillResult, }; @@ -67,7 +68,7 @@ pub struct StatusStore { } impl StatusStore { - pub fn create(storage_uri: &Url, namespace: impl Into) -> KrillResult { + pub fn create(storage_uri: &Url, namespace: &Namespace) -> KrillResult { let store = KeyValueStore::create(storage_uri, namespace)?; let cache = RwLock::new(HashMap::new()); @@ -401,33 +402,39 @@ impl StatusStore { mod tests { use super::*; - use std::path::PathBuf; - - use crate::{ - commons::util::{file, storage::storage_uri_from_data_dir}, - test, - }; + use crate::{constants::STATUS_NS, test}; #[test] fn read_save_status() { - test::test_under_tmp(|data_dir| { - let source = PathBuf::from("test-resources/status_store/migration-0.9.5/"); - let target = data_dir.join("status"); - file::backup_dir(&source, &target).unwrap(); + let source_dir_path_str = "test-resources/status_store/migration-0.9.5/"; + let source_dir_url = Url::parse(&format!("local://{}", source_dir_path_str)).unwrap(); + + let source_store = KeyValueStore::create(&source_dir_url, STATUS_NS).unwrap(); + + let test_storage_uri = test::mem_storage(); + let status_kv_store = KeyValueStore::create(&test_storage_uri, STATUS_NS).unwrap(); - let status_testbed_before_migration = - include_str!("../../../test-resources/status_store/migration-0.9.5/testbed/status.json"); + // copy the source KV store (files) into the test KV store (in memory) + status_kv_store.import(&source_store).unwrap(); - let status_testbed_before_migration: CaStatus = - serde_json::from_str(status_testbed_before_migration).unwrap(); + // get the status for testbed before initialising a StatusStore + // using the copied the data - that will be done next and start + // a migration. + let testbed_status_key = Key::new_scoped( + Scope::from_segment(segment!("testbed")), + Segment::parse("status.json").unwrap(), + ); + let status_testbed_before_migration: CaStatus = status_kv_store.get(&testbed_status_key).unwrap().unwrap(); - let storage_uri = storage_uri_from_data_dir(&data_dir).unwrap(); - let store = StatusStore::create(&storage_uri, segment!("status")).unwrap(); - let testbed = CaHandle::from_str("testbed").unwrap(); + // Initialise the StatusStore using the new (in memory) storage, + // and migrate the data. + let store = StatusStore::create(&test_storage_uri, STATUS_NS).unwrap(); + let testbed = CaHandle::from_str("testbed").unwrap(); - let status_testbed_migrated = store.get_ca_status(&testbed); + // Get the migrated status for testbed and verify that it's equivalent + // to the status before migration. + let status_testbed_migrated = store.get_ca_status(&testbed); - assert_eq!(status_testbed_before_migration, status_testbed_migrated); - }); + assert_eq!(status_testbed_before_migration, status_testbed_migrated); } } diff --git a/src/daemon/config.rs b/src/daemon/config.rs index b20e03a75..7bc368198 100644 --- a/src/daemon/config.rs +++ b/src/daemon/config.rs @@ -8,7 +8,7 @@ use std::{ }; use chrono::Duration; -use kvx::SegmentBuf; +use kvx::Namespace; use log::{error, LevelFilter}; use rpki::{ ca::idexchange::PublisherHandle, @@ -27,10 +27,7 @@ use crate::{ crypto::{OpenSslSignerConfig, SignSupport}, error::{Error, KrillIoError}, eventsourcing::KeyValueStore, - util::{ - ext_serde, - storage::{data_dir_from_storage_uri, storage_uri_from_data_dir}, - }, + util::ext_serde, KrillResult, }, constants::*, @@ -452,11 +449,9 @@ pub struct Config { )] pub storage_uri: Url, - #[serde(default="ConfigDefaults::dflt_true")] + #[serde(default = "ConfigDefaults::dflt_true")] pub use_history_cache: bool, - upgrade_storage_uri: Option, - tls_keys_dir: Option, repo_dir: Option, @@ -815,20 +810,33 @@ pub struct Benchmark { /// # Accessors impl Config { - pub fn upgrade_storage_uri(&self) -> &Url { - self.upgrade_storage_uri.as_ref().unwrap() // should not panic, as it is always set by Config::verify - } - /// General purpose KV store, can be used to track server settings /// etc not specific to any Aggregate or WalSupport type pub fn general_key_value_store(&self) -> KrillResult { KeyValueStore::create(&self.storage_uri, PROPERTIES_NS).map_err(Error::KeyValueError) } - pub fn key_value_store(&self, name_space: impl Into) -> KrillResult { + pub fn key_value_store(&self, name_space: &Namespace) -> KrillResult { KeyValueStore::create(&self.storage_uri, name_space).map_err(Error::KeyValueError) } + /// Returns the data directory if disk was used for storage. + /// This will always be true for upgrades of pre 0.14.0 versions + fn data_dir(&self) -> Option { + if self.storage_uri.scheme() != "local" { + None + } else { + Some( + Path::new(&format!( + "{}{}", + self.storage_uri.host_str().unwrap_or(""), + self.storage_uri.path() + )) + .to_path_buf(), + ) + } + } + pub fn tls_keys_dir(&self) -> &PathBuf { self.tls_keys_dir.as_ref().unwrap() // should not panic, as it is always set } @@ -1076,7 +1084,6 @@ impl Config { https_mode, storage_uri: storage_uri.clone(), use_history_cache: false, - upgrade_storage_uri: data_dir.map(|d| storage_uri_from_data_dir(&d.join(UPGRADE_DIR)).unwrap()), tls_keys_dir: data_dir.map(|d| d.join(HTTPS_SUB_DIR)), repo_dir: data_dir.map(|d| d.join(REPOSITORY_DIR)), ta_support_enabled: false, // but, enabled by testbed where applicable @@ -1166,7 +1173,6 @@ impl Config { if upgrade_only { info!("Prepare upgrade using configuration file: {}", config_file); info!("Processing data from: {}", config.storage_uri); - info!("Saving prepared data to: {}", config.upgrade_storage_uri(),); } else { info!("{} uses configuration file: {}", KRILL_SERVER_APP, config_file); } @@ -1202,15 +1208,8 @@ impl Config { self.ca_refresh_seconds = CA_REFRESH_SECONDS_MAX; } - if self.upgrade_storage_uri.is_none() { - if self.storage_uri.scheme() != "local" { - return Err(ConfigError::other("'upgrade_storage_uri' is not configured, but 'storage_uri' is not a local directory, please configure an 'upgrade_storage_uri'")); - } - self.upgrade_storage_uri = Some(self.storage_uri.join(UPGRADE_DIR).unwrap()); - } - if self.tls_keys_dir.is_none() { - if let Some(mut data_dir) = data_dir_from_storage_uri(&self.storage_uri) { + if let Some(mut data_dir) = self.data_dir() { data_dir.push(HTTPS_SUB_DIR); self.tls_keys_dir = Some(data_dir); } else { @@ -1219,7 +1218,7 @@ impl Config { } if self.repo_dir.is_none() { - if let Some(mut data_dir) = data_dir_from_storage_uri(&self.storage_uri) { + if let Some(mut data_dir) = self.data_dir() { data_dir.push(REPOSITORY_DIR); self.repo_dir = Some(data_dir); } else { @@ -1228,7 +1227,7 @@ impl Config { } if self.pid_file.is_none() { - if let Some(mut data_dir) = data_dir_from_storage_uri(&self.storage_uri) { + if let Some(mut data_dir) = self.data_dir() { data_dir.push("krill.pid"); self.pid_file = Some(data_dir); } else { @@ -2189,4 +2188,21 @@ mod tests { let res = parse_and_process_config_str(config_str); assert_err_msg(res, "Signer name 'Blah' is not unique"); } + + #[test] + fn data_dir_for_storage() { + fn test_uri(uri: &str, expected_path: &str) { + let storage_uri = Url::parse(uri).unwrap(); + let config = Config::test_config(&storage_uri, None, false, false, false, false); + + let expected_path = PathBuf::from(expected_path); + assert_eq!(config.data_dir().unwrap(), expected_path); + } + + test_uri("local:///tmp/test", "/tmp/test"); + test_uri("local://./data", "./data"); + test_uri("local://data", "data"); + test_uri("local://data/test", "data/test"); + test_uri("local:///tmp/test", "/tmp/test"); + } } diff --git a/src/daemon/properties/mod.rs b/src/daemon/properties/mod.rs index b8e580921..6c70ff95b 100644 --- a/src/daemon/properties/mod.rs +++ b/src/daemon/properties/mod.rs @@ -267,7 +267,7 @@ impl PropertiesManager { } pub fn is_initialized(&self) -> bool { - self.properties().is_ok() + self.store.has(&self.main_key).unwrap_or_default() } pub fn init(&self, krill_version: KrillVersion) -> KrillResult> { diff --git a/src/daemon/ta/mod.rs b/src/daemon/ta/mod.rs index 585edfdb0..a2552e44b 100644 --- a/src/daemon/ta/mod.rs +++ b/src/daemon/ta/mod.rs @@ -39,7 +39,7 @@ mod tests { commons::{ api::{PublicationServerInfo, RepositoryContact}, crypto::KrillSignerBuilder, - eventsourcing::{segment, AggregateStore, Segment}, + eventsourcing::{namespace, AggregateStore, Namespace}, }, daemon::config::ConfigDefaults, test, @@ -51,9 +51,9 @@ mod tests { let cleanup = test::init_logging(); let ta_signer_store: AggregateStore = - AggregateStore::create(storage_uri, segment!("ta_signer"), false).unwrap(); + AggregateStore::create(storage_uri, namespace!("ta_signer"), false).unwrap(); let ta_proxy_store: AggregateStore = - AggregateStore::create(storage_uri, segment!("ta_proxy"), false).unwrap(); + AggregateStore::create(storage_uri, namespace!("ta_proxy"), false).unwrap(); // We will import a TA key - this is only (supposed to be) supported for the openssl signer let signers = ConfigDefaults::openssl_signer_only(); diff --git a/src/pubd/manager.rs b/src/pubd/manager.rs index 2f4ead09d..82ab27f06 100644 --- a/src/pubd/manager.rs +++ b/src/pubd/manager.rs @@ -311,10 +311,7 @@ mod tests { IdCertInfo, }, crypto::{KrillSignerBuilder, OpenSslSignerConfig}, - util::{ - file::{self, CurrentFile}, - storage::storage_uri_from_data_dir, - }, + util::file::{self, CurrentFile}, }, constants::*, daemon::config::{SignerConfig, SignerType}, @@ -378,7 +375,7 @@ mod tests { fn should_add_publisher() { // we need a disk, as repo_dir, etc. use data_dir by default let (data_dir, cleanup) = test::tmp_dir(); - let storage_uri = test::tmp_storage(); + let storage_uri = test::mem_storage(); let server = make_server(&storage_uri, &data_dir); let alice = publisher_alice(&storage_uri); @@ -402,8 +399,7 @@ mod tests { fn should_not_add_publisher_twice() { // we need a disk, as repo_dir, etc. use data_dir by default let (data_dir, cleanup) = test::tmp_dir(); - // let storage_uri = test::tmp_storage(); - let storage_uri = storage_uri_from_data_dir(&data_dir).unwrap(); + let storage_uri = test::mem_storage(); let server = make_server(&storage_uri, &data_dir); @@ -427,7 +423,7 @@ mod tests { fn should_list_files() { // we need a disk, as repo_dir, etc. use data_dir by default let (data_dir, cleanup) = test::tmp_dir(); - let storage_uri = test::tmp_storage(); + let storage_uri = test::mem_storage(); let server = make_server(&storage_uri, &data_dir); let alice = publisher_alice(&storage_uri); @@ -448,7 +444,7 @@ mod tests { async fn should_publish_files() { // we need a disk, as repo_dir, etc. use data_dir by default let (data_dir, cleanup) = test::tmp_dir(); - let storage_uri = test::tmp_storage(); + let storage_uri = test::mem_storage(); let server = make_server(&storage_uri, &data_dir); let session = session_dir(&data_dir); @@ -637,7 +633,7 @@ mod tests { #[test] pub fn repository_session_reset() { let (data_dir, cleanup) = test::tmp_dir(); - let storage_uri = test::tmp_storage(); + let storage_uri = test::mem_storage(); let server = make_server(&storage_uri, &data_dir); // set up server with default repository, and publisher alice diff --git a/src/pubd/repository.rs b/src/pubd/repository.rs index 522f22b47..dadb7da8e 100644 --- a/src/pubd/repository.rs +++ b/src/pubd/repository.rs @@ -1516,11 +1516,8 @@ pub struct RepositoryAccessProxy { impl RepositoryAccessProxy { pub fn create(config: &Config) -> KrillResult { - let store = AggregateStore::::create( - &config.storage_uri, - PUBSERVER_NS, - config.use_history_cache, - )?; + let store = + AggregateStore::::create(&config.storage_uri, PUBSERVER_NS, config.use_history_cache)?; let key = MyHandle::from_str(PUBSERVER_DFLT).unwrap(); if store.has(&key)? { diff --git a/src/test.rs b/src/test.rs index 0aa6caac7..55d89dcc3 100644 --- a/src/test.rs +++ b/src/test.rs @@ -61,7 +61,7 @@ pub const KRILL_SECOND_SERVER_URI: &str = "https://localhost:3002/"; pub fn init_logging() -> impl FnOnce() { // Just creates a test config so we can initialize logging, then forgets about it - let storage = tmp_storage(); + let storage = mem_storage(); let (dir, cleanup) = tmp_dir(); let _ = Config::test(&storage, Some(&dir), false, false, false, false).init_logging(); @@ -148,7 +148,7 @@ pub fn init_config(config: &mut Config) { /// Starts krill server for testing using the given configuration. Creates a random base directory in the 'work' folder, /// adjusts the config to use it and returns it. Be sure to clean it up when the test is done. pub async fn start_krill_with_custom_config(mut config: Config) -> Url { - let storage_uri = tmp_storage(); + let storage_uri = mem_storage(); config.storage_uri = storage_uri.clone(); start_krill(config).await; storage_uri @@ -163,8 +163,7 @@ pub async fn start_krill_with_default_test_config( second_signer: bool, ) -> impl FnOnce() { let (data_dir, cleanup) = tmp_dir(); - let storage_uri = tmp_storage(); - // let storage_uri = storage_uri_from_data_dir(&data_dir).unwrap(); + let storage_uri = mem_storage(); let config = test_config( &storage_uri, Some(&data_dir), @@ -182,7 +181,7 @@ pub async fn start_krill_with_default_test_config( /// RRDP delta delays work properly. pub async fn start_krill_testbed_with_rrdp_interval(interval: u32) -> impl FnOnce() { let (data_dir, cleanup) = tmp_dir(); - let storage_uri = tmp_storage(); + let storage_uri = mem_storage(); let mut config = test_config(&storage_uri, Some(&data_dir), true, false, false, false); config.rrdp_updates_config.rrdp_delta_interval_min_seconds = interval; start_krill(config).await; @@ -206,7 +205,7 @@ async fn start_krill_with_error_trap(config: Arc) { /// own temp dir for storage. pub async fn start_krill_pubd(rrdp_delta_rrdp_delta_min_interval_seconds: u32) -> impl FnOnce() { let (data_dir, cleanup) = tmp_dir(); - let storage_uri = tmp_storage(); + let storage_uri = mem_storage(); let mut config = test_config(&storage_uri, Some(&data_dir), false, false, false, true); config.rrdp_updates_config.rrdp_delta_interval_min_seconds = rrdp_delta_rrdp_delta_min_interval_seconds; init_config(&mut config); @@ -231,7 +230,7 @@ pub async fn start_krill_pubd(rrdp_delta_rrdp_delta_min_interval_seconds: u32) - /// own temp dir for storage. pub async fn start_second_krill() -> impl FnOnce() { let (data_dir, cleanup) = tmp_dir(); - let storage_uri = tmp_storage(); + let storage_uri = mem_storage(); let mut config = test_config(&storage_uri, Some(&data_dir), false, false, false, true); init_config(&mut config); config.port = 3002; @@ -823,7 +822,7 @@ pub fn test_in_memory(op: F) where F: FnOnce(&Url), { - let storage_uri = tmp_storage(); + let storage_uri = mem_storage(); op(&storage_uri); } @@ -861,7 +860,7 @@ fn random_hex_string() -> String { hex::encode(bytes) } -pub fn tmp_storage() -> Url { +pub fn mem_storage() -> Url { let mut bytes = [0; 8]; openssl::rand::rand_bytes(&mut bytes).unwrap(); diff --git a/src/upgrades/mod.rs b/src/upgrades/mod.rs index ff7015e2b..eba7473f2 100644 --- a/src/upgrades/mod.rs +++ b/src/upgrades/mod.rs @@ -2,7 +2,7 @@ //! - Updating the format of commands or events //! - Export / Import data -use std::{convert::TryInto, fmt, fs, path::Path, str::FromStr, time::Duration}; +use std::{convert::TryInto, fmt, str::FromStr}; use serde::{de::DeserializeOwned, Deserialize}; @@ -11,13 +11,12 @@ use rpki::{ca::idexchange::MyHandle, repository::x509::Time}; use crate::{ commons::{ actor::Actor, - crypto::KrillSignerBuilder, - error::{Error, KrillIoError}, + error::KrillIoError, eventsourcing::{ segment, Aggregate, AggregateStore, AggregateStoreError, Key, KeyValueError, KeyValueStore, Scope, Segment, - SegmentExt, Storable, StoredCommand, WalStore, WithStorableDetails, + SegmentExt, Storable, StoredCommand, WalStore, WalStoreError, WithStorableDetails, }, - util::{file, storage::data_dir_from_storage_uri, KrillVersion}, + util::KrillVersion, KrillResult, }, constants::{ @@ -26,6 +25,7 @@ use crate::{ }, daemon::{config::Config, krillserver::KrillServer, properties::PropertiesManager}, pubd, + upgrades::pre_0_14_0::{OldStoredCommand, OldStoredEffect, OldStoredEvent}, }; #[cfg(feature = "hsm")] @@ -34,15 +34,14 @@ use rpki::crypto::KeyIdentifier; #[cfg(feature = "hsm")] use crate::commons::crypto::SignerHandle; -use self::pre_0_13_0::OldRepositoryContent; +use self::{pre_0_13_0::OldRepositoryContent, pre_0_14_0::OldCommandKey}; pub mod pre_0_10_0; #[allow(clippy::mutable_key_type)] pub mod pre_0_13_0; -mod pre_0_14_0; -pub use self::pre_0_14_0::*; +pub mod pre_0_14_0; pub type UpgradeResult = Result; @@ -108,6 +107,7 @@ impl UpgradeVersions { #[allow(clippy::large_enum_variant)] pub enum UpgradeError { AggregateStoreError(AggregateStoreError), + WalStoreError(WalStoreError), KeyStoreError(KeyValueError), IoError(KrillIoError), Unrecognised(String), @@ -122,6 +122,7 @@ impl fmt::Display for UpgradeError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let cause = match &self { UpgradeError::AggregateStoreError(e) => format!("Aggregate Error: {}", e), + UpgradeError::WalStoreError(e) => format!("Write-Ahead-Log Store Error: {}", e), UpgradeError::KeyStoreError(e) => format!("Keystore Error: {}", e), UpgradeError::IoError(e) => format!("I/O Error: {}", e), UpgradeError::Unrecognised(s) => format!("Unrecognised: {}", s), @@ -151,6 +152,12 @@ impl From for UpgradeError { } } +impl From for UpgradeError { + fn from(e: WalStoreError) -> Self { + UpgradeError::WalStoreError(e) + } +} + impl From for UpgradeError { fn from(e: KeyValueError) -> Self { UpgradeError::KeyStoreError(e) @@ -634,23 +641,6 @@ pub fn prepare_upgrade_data_migrations( error!("{}", msg); Err(UpgradeError::custom(msg)) } else if versions.from < KrillVersion::candidate(0, 10, 0, 1) { - let upgrade_data_dir = data_dir_from_storage_uri(config.upgrade_storage_uri()).unwrap(); - if !upgrade_data_dir.exists() { - file::create_dir_all(&upgrade_data_dir)?; - } - - // Get a lock to ensure that only one process can run this migration - // at any one time (for a given config). - let _lock = { - // Create upgrade dir if it did not yet exist. - let lock_file_path = upgrade_data_dir.join("upgrade.lock"); - fslock::LockFile::open(&lock_file_path).map_err(|_| { - UpgradeError::custom( - format!("Cannot get upgrade lock. Another process may be running a Krill upgrade. Or, perhaps you ran 'krillup' as root - in that case check the ownership of directory: {}", upgrade_data_dir.to_string_lossy()), - ) - })? - }; - // Complex migrations involving command / event conversions pre_0_10_0::PublicationServerRepositoryAccessMigration::upgrade(mode, config, &versions)?; pre_0_10_0::CasMigration::upgrade(mode, config)?; @@ -714,22 +704,16 @@ pub fn prepare_upgrade_data_migrations( /// Migrate v0.12.x RepositoryContent to the new 0.13.0+ format. /// Apply any open WAL changes to the source first. fn migrate_0_12_pubd_objects(config: &Config) -> KrillResult { - let data_dir = data_dir_from_storage_uri(&config.storage_uri).unwrap(); - let old_repo_content_dir = data_dir.join(PUBSERVER_CONTENT_NS.as_str()); - if old_repo_content_dir.exists() { - let old_store: WalStore = WalStore::create(&config.storage_uri, PUBSERVER_CONTENT_NS)?; - let repo_content_handle = MyHandle::new("0".into()); - - if old_store.has(&repo_content_handle)? { - let old_repo_content = old_store.get_latest(&repo_content_handle)?.as_ref().clone(); - let repo_content: pubd::RepositoryContent = old_repo_content.try_into()?; - let new_key = Key::new_scoped(Scope::from_segment(segment!("0")), segment!("snapshot.json")); - let upgrade_store = KeyValueStore::create(config.upgrade_storage_uri(), PUBSERVER_CONTENT_NS)?; - upgrade_store.store(&new_key, &repo_content)?; - Ok(true) - } else { - Ok(false) - } + let old_store: WalStore = WalStore::create(&config.storage_uri, PUBSERVER_CONTENT_NS)?; + let repo_content_handle = MyHandle::new("0".into()); + + if old_store.has(&repo_content_handle)? { + let old_repo_content = old_store.get_latest(&repo_content_handle)?.as_ref().clone(); + let repo_content: pubd::RepositoryContent = old_repo_content.try_into()?; + let new_key = Key::new_scoped(Scope::from_segment(segment!("0")), segment!("snapshot.json")); + let upgrade_store = KeyValueStore::create_upgrade_store(&config.storage_uri, PUBSERVER_CONTENT_NS)?; + upgrade_store.store(&new_key, &repo_content)?; + Ok(true) } else { Ok(false) } @@ -738,20 +722,17 @@ fn migrate_0_12_pubd_objects(config: &Config) -> KrillResult { /// The format of the RepositoryContent did not change in 0.12, but /// the location and way of storing it did. So, migrate if present. fn migrate_pre_0_12_pubd_objects(config: &Config) -> KrillResult<()> { - let data_dir = data_dir_from_storage_uri(&config.storage_uri).unwrap(); - let old_repo_content_dir = data_dir.join(PUBSERVER_CONTENT_NS.as_str()); - if old_repo_content_dir.exists() { - let old_store = KeyValueStore::create(&config.storage_uri, PUBSERVER_CONTENT_NS)?; - let old_key = Key::new_global(segment!("0.json")); - if let Ok(Some(old_repo_content)) = old_store.get::(&old_key) { - info!("Found pre 0.12.0 RC2 publication server data. Migrating.."); - let repo_content: pubd::RepositoryContent = old_repo_content.try_into()?; - - let new_key = Key::new_scoped(Scope::from_segment(segment!("0")), segment!("snapshot.json")); - let upgrade_store = KeyValueStore::create(config.upgrade_storage_uri(), PUBSERVER_CONTENT_NS)?; - upgrade_store.store(&new_key, &repo_content)?; - } + let old_store = KeyValueStore::create(&config.storage_uri, PUBSERVER_CONTENT_NS)?; + let old_key = Key::new_global(segment!("0.json")); + if let Ok(Some(old_repo_content)) = old_store.get::(&old_key) { + info!("Found pre 0.12.0 RC2 publication server data. Migrating.."); + let repo_content: pubd::RepositoryContent = old_repo_content.try_into()?; + + let new_key = Key::new_scoped(Scope::from_segment(segment!("0")), segment!("snapshot.json")); + let upgrade_store = KeyValueStore::create_upgrade_store(&config.storage_uri, PUBSERVER_CONTENT_NS)?; + upgrade_store.store(&new_key, &repo_content)?; } + Ok(()) } @@ -765,134 +746,50 @@ pub fn finalise_data_migration( config: &Config, properties_manager: &PropertiesManager, ) -> KrillResult<()> { - if upgrade.from >= KrillVersion::candidate(0, 14, 0, 0) { - // Not supported yet, we will need to implement changing the - // namespace in kvx::KeyValueStore. - // - // When this is done then we can use the same logic for any - // storage implementation used (disk/db). - todo!("Support migrations from 0.14.x and higher migrations"); - } else { - info!( - "Finish data migrations for upgrade from {} to {}", - upgrade.from(), - upgrade.to() - ); - - // Krill versions before 0.14.x *always* used disk based storage. - // - // So, we should always get some data dir from the current config - // when upgrading from a a version before 0.14.x. - // - // Furthermore, now that we are storing the version in one single - // place, we can remove the "version" file from any directory that - // remains after migration. - if let Some(data_dir) = data_dir_from_storage_uri(&config.storage_uri) { - let archive_base_dir = data_dir.join(&format!("archive-{}", upgrade.from())); - let upgrade_base_dir = data_dir.join("upgrade-data"); - - for ns in &[ - CASERVER_NS, - CA_OBJECTS_NS, - KEYS_NS, - PUBSERVER_CONTENT_NS, - PUBSERVER_NS, - SIGNERS_NS, - STATUS_NS, - TA_PROXY_SERVER_NS, - TA_SIGNER_SERVER_NS, - ] { - // Data structure is as follows: - // - // data_dir/ - // upgrade-data/ --> upgraded (may be missing) - // ns1, - // ns2, - // etc - // ns1, --> current - // ns2, - // etc - // - // archive-prev-v/ --> archived current dirs which were upgraded - // - let upgraded_dir = upgrade_base_dir.join(ns.as_str()); - let archive_dir = archive_base_dir.join(ns.as_str()); - let current_dir = data_dir.join(ns.as_str()); - - if upgraded_dir.exists() { - // Data was prepared. So we archive the current data and - // then move the prepped data. - move_dir(¤t_dir, &archive_dir)?; - move_dir(&upgraded_dir, ¤t_dir)?; - } else if current_dir.exists() { - // There was no new data for this directory. But, we make a backup - // so that we can have a consistent data set to fall back to in case - // of a downgrade. - file::backup_dir(¤t_dir, &archive_dir).map_err(|e| { - Error::Custom(format!( - "Could not backup directory {} to {} after migration: {}", - current_dir.to_string_lossy(), - archive_dir.to_string_lossy(), - e - )) - })?; - } - - let version_file = current_dir.join("version"); - if version_file.exists() { - debug!("Removing (no longer used) version file: {}", version_file.to_string_lossy()); - std::fs::remove_file(&version_file).map_err(|e| { - let context = format!( - "Could not remove (no longer used) version file at: {}", - version_file.to_string_lossy(), - ); - Error::IoError(KrillIoError::new(context, e)) - })?; - } + // For each NS + // + // Check if upgrade store to this version exists. + // If so: + // -- drop archive store if it exists + // -- archive current store (rename ns) + // -- move upgrade to current + info!( + "Finish data migrations for upgrade from {} to {}", + upgrade.from(), + upgrade.to() + ); + + for ns in [ + CASERVER_NS, + CA_OBJECTS_NS, + KEYS_NS, + PUBSERVER_CONTENT_NS, + PUBSERVER_NS, + SIGNERS_NS, + STATUS_NS, + TA_PROXY_SERVER_NS, + TA_SIGNER_SERVER_NS, + ] { + // Check if there is a non-empty upgrade store for this namespace + // that would need to be migrated. + let mut upgrade_store = KeyValueStore::create_upgrade_store(&config.storage_uri, ns)?; + if !upgrade_store.is_empty()? { + info!("Migrate new data for {} and archive old", ns); + let mut current_store = KeyValueStore::create(&config.storage_uri, ns)?; + if !current_store.is_empty()? { + current_store.migrate_to_archive(&config.storage_uri, ns)?; } - // remove the upgrade base dir - if it's empty - so ignore error. - let _ = fs::remove_dir(&upgrade_base_dir); - } - - // move the dirs - fn move_dir(from: &Path, to: &Path) -> KrillResult<()> { - if let Some(parent) = to.parent() { - if !parent.exists() { - file::create_dir_all(parent).map_err(Error::IoError)?; - } - } - std::fs::rename(from, to).map_err(|e| { - let context = format!( - "Could not rename directory from: {} to: {}.", - from.to_string_lossy(), - to.to_string_lossy() - ); - Error::IoError(KrillIoError::new(context, e)) - }) - } - } - - // Remove version files that are no longer required - if let Some(data_dir) = data_dir_from_storage_uri(&config.storage_uri) { - for ns in &[ - CASERVER_NS, - CA_OBJECTS_NS, - KEYS_NS, - PUBSERVER_CONTENT_NS, - PUBSERVER_NS, - SIGNERS_NS, - STATUS_NS, - TA_PROXY_SERVER_NS, - TA_SIGNER_SERVER_NS, - ] { - let path = data_dir.join(ns.as_str()).join("version"); - if path.exists() { - debug!("Removing version excess file: {}", path.to_string_lossy()); - std::fs::remove_file(&path).map_err(|e| { - let context = format!("Could not remove old version file at: {}", path.to_string_lossy(),); - Error::IoError(KrillIoError::new(context, e)) - })?; + upgrade_store.migrate_to_current(&config.storage_uri, ns)?; + } else { + // No migration needed, but check if we have a current store + // for this namespace that still includes a version file. If + // so, remove it. + let current_store = KeyValueStore::create(&config.storage_uri, ns)?; + let version_key = Key::new_global(segment!("version")); + if current_store.has(&version_key)? { + debug!("Removing excess version key in ns: {}", ns); + current_store.drop_key(&version_key)?; } } } @@ -918,82 +815,71 @@ pub fn finalise_data_migration( /// one to the mapping in the signer store, if any. #[cfg(feature = "hsm")] fn record_preexisting_openssl_keys_in_signer_mapper(config: &Config) -> Result<(), UpgradeError> { - match data_dir_from_storage_uri(&config.storage_uri) { - None => Ok(()), - Some(data_dir) => { - if !data_dir.join(SIGNERS_NS.as_str()).exists() { - let mut num_recorded_keys = 0; - let keys_dir = data_dir.join(KEYS_NS.as_str()); - - info!( - "Scanning for not yet mapped OpenSSL signer keys in {} to record in the signer store", - keys_dir.to_string_lossy() - ); - - let probe_interval = Duration::from_secs(config.signer_probe_retry_seconds); - let krill_signer = KrillSignerBuilder::new(&config.storage_uri, probe_interval, &config.signers) - .with_default_signer(config.default_signer()) - .with_one_off_signer(config.one_off_signer()) - .build() - .unwrap(); - - // For every file (key) in the legacy OpenSSL signer keys directory - if let Ok(dir_iter) = keys_dir.read_dir() { - let mut openssl_signer_handle: Option = None; - - for entry in dir_iter { - let entry = entry.map_err(|err| { - UpgradeError::IoError(KrillIoError::new( - format!( - "I/O error while looking for signer keys to register in: {}", - keys_dir.to_string_lossy() - ), - err, - )) - })?; - - if entry.path().is_file() { - // Is it a key identifier? - if let Ok(key_id) = KeyIdentifier::from_str(&entry.file_name().to_string_lossy()) { - // Is the key already recorded in the mapper? It shouldn't be, but asking will cause the initial - // registration of the OpenSSL signer to occur and for it to be assigned a handle. We need the - // handle so that we can register keys with the mapper. - if krill_signer.get_key_info(&key_id).is_err() { - // No, record it - - // Find out the handle of the OpenSSL signer used to create this key, if not yet known. - if openssl_signer_handle.is_none() { - // No, find it by asking each of the active signers if they have the key because one of - // them must have it and it should be the one and only OpenSSL signer that Krill was - // using previously. We can't just find and use the only OpenSSL signers as Krill may - // have been configured with more than one each with separate keys directories. - for (a_signer_handle, a_signer) in krill_signer.get_active_signers().iter() { - if a_signer.get_key_info(&key_id).is_ok() { - openssl_signer_handle = Some(a_signer_handle.clone()); - break; - } - } - } - - // Record the key in the signer mapper as being owned by the found signer handle. - if let Some(signer_handle) = &openssl_signer_handle { - let internal_key_id = key_id.to_string(); - if let Some(mapper) = krill_signer.get_mapper() { - mapper.add_key(signer_handle, &key_id, &internal_key_id)?; - num_recorded_keys += 1; - } - } - } + let signers_key_store = KeyValueStore::create(&config.storage_uri, SIGNERS_NS)?; + if signers_key_store.is_empty()? { + let mut num_recorded_keys = 0; + // If the key value store for the "signers" namespace is empty, then it was not yet initialised + // and we may need to import keys from a previous krill installation (earlier version, or a custom + // build that has the hsm feature disabled.) + + let keys_key_store = KeyValueStore::create(&config.storage_uri, KEYS_NS)?; + info!("Mapping OpenSSL signer keys, using uri: {}", config.storage_uri); + + let probe_interval = std::time::Duration::from_secs(config.signer_probe_retry_seconds); + let krill_signer = + crate::commons::crypto::KrillSignerBuilder::new(&config.storage_uri, probe_interval, &config.signers) + .with_default_signer(config.default_signer()) + .with_one_off_signer(config.one_off_signer()) + .build() + .unwrap(); + + // For every file (key) in the legacy OpenSSL signer keys directory + + let mut openssl_signer_handle: Option = None; + + for key in keys_key_store.keys(&Scope::global(), "")? { + debug!("Found key: {}", key); + // Is it a key identifier? + if let Ok(key_id) = KeyIdentifier::from_str(key.name().as_str()) { + // Is the key already recorded in the mapper? It shouldn't be, but asking will cause the initial + // registration of the OpenSSL signer to occur and for it to be assigned a handle. We need the + // handle so that we can register keys with the mapper. + if krill_signer.get_key_info(&key_id).is_err() { + // No, record it + + // Find out the handle of the OpenSSL signer used to create this key, if not yet known. + if openssl_signer_handle.is_none() { + // No, find it by asking each of the active signers if they have the key because one of + // them must have it and it should be the one and only OpenSSL signer that Krill was + // using previously. We can't just find and use the only OpenSSL signers as Krill may + // have been configured with more than one each with separate keys directories. + for (a_signer_handle, a_signer) in krill_signer.get_active_signers().iter() { + if a_signer.get_key_info(&key_id).is_ok() { + openssl_signer_handle = Some(a_signer_handle.clone()); + break; } } } - } - info!("Recorded {} key identifiers in the signer store", num_recorded_keys); + // Record the key in the signer mapper as being owned by the found signer handle. + if let Some(signer_handle) = &openssl_signer_handle { + let internal_key_id = key_id.to_string(); + if let Some(mapper) = krill_signer.get_mapper() { + mapper.add_key(signer_handle, &key_id, &internal_key_id)?; + num_recorded_keys += 1; + } + } + } + } else { + debug!("Could not parse key as key identifier: {}", key); } - - Ok(()) } + + info!("Recorded {} key identifiers in the signer store", num_recorded_keys); + Ok(()) + } else { + debug!("Signers were set up before. No need to migrate keys."); + Ok(()) } } @@ -1026,58 +912,47 @@ fn upgrade_versions( config: &Config, properties_manager: &PropertiesManager, ) -> Result, UpgradeError> { - if let Ok(current) = properties_manager.current_krill_version() { - // We found the KrillVersion stored in the properties manager - // introduced in Krill 0.14.0. + if properties_manager.is_initialized() { + // The properties manager was introduced in Krill 0.14.0. + // If it's initialised then it MUST have a Krill Version. + let current = properties_manager.current_krill_version()?; UpgradeVersions::for_current(current) - } else if let Some(data_dir) = data_dir_from_storage_uri(&config.storage_uri) { - // If the disk is used for storage, then we need to check - // if there are any pre Krill 0.14.0 version files in the - // usual places. If so, then this is an upgrade. + } else { + // No KrillVersion yet. So, either this is an older Krill version, + // or this a fresh installation. // - // If there are no such files, then we know that this is a - // new clean installation. Otherwise, we would have found - // the properties_manager.current_krill_version(). - let mut current = None; - - // So.. try to find the most recent version among those files - // in as far as they exist. - for ns in &[CASERVER_NS, PUBSERVER_NS, PUBSERVER_CONTENT_NS] { - let path = data_dir.join(ns.as_str()).join("version"); - if let Ok(bytes) = file::read(&path) { - if let Ok(new_version_seen_on_disk) = serde_json::from_slice::(&bytes) { - if let Some(previous_seen_on_disk) = current.clone() { - if new_version_seen_on_disk > previous_seen_on_disk { - current = Some(new_version_seen_on_disk); - } - } else { - current = Some(new_version_seen_on_disk); + // If this is an existing older Krill installation then we will + // find version files (keys) in one or more existing key value + // stores used for the various entities in Krill. + // + // If can't find any versions then this is a clean install. + + let mut current: Option = None; + + // Scan the following data stores. The *latest* version seen will determine + // the actual installed Krill version - this is because these version files + // did not always get updated in each store - but only in stores that needed + // an upgrade (at least this is true for some past migrations). So, it's the + // latest version (if any) that counts here. + for ns in &[CASERVER_NS, CA_OBJECTS_NS, PUBSERVER_NS, PUBSERVER_CONTENT_NS] { + let kv_store = KeyValueStore::create(&config.storage_uri, ns)?; + let key = Key::new_global(segment!("version")); + + if let Some(key_store_version) = kv_store.get::(&key)? { + if let Some(last_seen) = ¤t { + if &key_store_version > last_seen { + current = Some(key_store_version) } + } else { + current = Some(key_store_version); } } } match current { - None => { - info!("Clean installation for Krill version {}", KrillVersion::code_version()); - Ok(None) - } + None => Ok(None), Some(current) => UpgradeVersions::for_current(current), } - } else { - // No disk was used. We do not support upgrading from <0.14.0 to 0.14.0 or - // above AND migrating to a database at the same time. If users want this - // then they should first upgrade using disk based storage and then migrate - // the data content to a new storage option. See issue #1079 - info!( - "Clean installation using database storage for Krill version {}", - KrillVersion::code_version() - ); - info!("NOTE: if you meant to upgrade an existing Krill <0.14.0 installation"); - info!(" then you should stop this instance, clear the new database, then"); - info!(" upgrade your old installation using the disk as a storage option,"); - info!(" and then migrate your data to a database."); - Ok(None) } } @@ -1087,21 +962,29 @@ fn upgrade_versions( mod tests { use std::path::PathBuf; - use crate::{ - commons::util::{file, storage::storage_uri_from_data_dir}, - test::tmp_dir, - }; + use kvx::Namespace; + use url::Url; - use super::*; + use crate::test; - async fn test_upgrade(source: PathBuf) { - let (data_dir, cleanup) = tmp_dir(); - let storage_uri = storage_uri_from_data_dir(&data_dir).unwrap(); - file::backup_dir(&source, &data_dir).unwrap(); + use super::*; - let config = Config::test(&storage_uri, Some(&data_dir), false, false, false, false); + fn test_upgrade(base_dir: &str, namespaces: &[&str]) { + // Copy data for the given names spaces into memory for testing. + let mem_storage_base_uri = test::mem_storage(); + let bogus_path = PathBuf::from("/dev/null"); // needed for tls_dir etc, but will be ignored here + let config = Config::test(&mem_storage_base_uri, Some(&bogus_path), false, false, false, false); let _ = config.init_logging(); + let source_url = Url::parse(&format!("local://{}", base_dir)).unwrap(); + for ns in namespaces { + let namespace = Namespace::parse(ns).unwrap(); + let source_store = KeyValueStore::create(&source_url, namespace).unwrap(); + let target_store = KeyValueStore::create(&mem_storage_base_uri, namespace).unwrap(); + + target_store.import(&source_store).unwrap(); + } + let properties_manager = PropertiesManager::create(&config.storage_uri, config.use_history_cache).unwrap(); prepare_upgrade_data_migrations(UpgradeMode::PrepareOnly, &config, &properties_manager) @@ -1114,26 +997,37 @@ mod tests { .unwrap(); finalise_data_migration(report.versions(), &config, &properties_manager).unwrap(); - - cleanup(); } - #[tokio::test] - async fn prepare_then_upgrade_0_9_5() { - let source = PathBuf::from("test-resources/migrations/v0_9_5/"); - test_upgrade(source).await; + #[test] + fn prepare_then_upgrade_0_9_5() { + test_upgrade( + "test-resources/migrations/v0_9_5/", + &["ca_objects", "cas", "pubd", "pubd_objects"], + ); } - #[tokio::test] - async fn prepare_then_upgrade_0_12_1() { - let source = PathBuf::from("test-resources/migrations/v0_12_1/"); - test_upgrade(source).await; + #[test] + fn prepare_then_upgrade_0_12_1() { + test_upgrade("test-resources/migrations/v0_12_1/", &["pubd", "pubd_objects"]); } - #[tokio::test] - async fn prepare_then_upgrade_0_13_0() { - let source = PathBuf::from("test-resources/migrations/v0_13_1/"); - test_upgrade(source).await; + #[test] + fn prepare_then_upgrade_0_13_0() { + test_upgrade( + "test-resources/migrations/v0_13_1/", + &[ + "ca_objects", + "cas", + "keys", + "pubd", + "pubd_objects", + "signers", + "status", + "ta_proxy", + "ta_signer", + ], + ); } #[test] @@ -1146,14 +1040,18 @@ mod tests { fn unmapped_keys_test_core(do_upgrade: bool) { let expected_key_id = KeyIdentifier::from_str("5CBCAB14B810C864F3EEA8FD102B79F4E53FCC70").unwrap(); - // Place a key previously created by an OpenSSL signer in the KEYS_NS under the Krill data dir. - // Then run the upgrade. It should find the key and add it to the mapper. - let (data_dir, cleanup) = tmp_dir(); - let storage_uri = storage_uri_from_data_dir(&data_dir).unwrap(); - let source = PathBuf::from("test-resources/migrations/unmapped_keys/"); - file::backup_dir(&source, &data_dir).unwrap(); + // Copy test data into test storage + let mem_storage_base_uri = test::mem_storage(); + + let source_url = Url::parse("local://test-resources/migrations/unmapped_keys/").unwrap(); + let source_store = KeyValueStore::create(&source_url, KEYS_NS).unwrap(); - let mut config = Config::test(&storage_uri, Some(&data_dir), false, false, false, false); + let target_store = KeyValueStore::create(&mem_storage_base_uri, KEYS_NS).unwrap(); + target_store.import(&source_store).unwrap(); + + let bogus_path = PathBuf::from("/dev/null"); // needed for tls_dir etc, but will be ignored here + + let mut config = Config::test(&mem_storage_base_uri, Some(&bogus_path), false, false, false, false); let _ = config.init_logging(); config.process().unwrap(); @@ -1164,12 +1062,13 @@ mod tests { // Now test that a newly initialized `KrillSigner` with a default OpenSSL signer // is associated with the newly created mapper store and is thus able to use the // key that we placed on disk. - let probe_interval = Duration::from_secs(config.signer_probe_retry_seconds); - let krill_signer = KrillSignerBuilder::new(&storage_uri, probe_interval, &config.signers) - .with_default_signer(config.default_signer()) - .with_one_off_signer(config.one_off_signer()) - .build() - .unwrap(); + let probe_interval = std::time::Duration::from_secs(config.signer_probe_retry_seconds); + let krill_signer = + crate::commons::crypto::KrillSignerBuilder::new(&mem_storage_base_uri, probe_interval, &config.signers) + .with_default_signer(config.default_signer()) + .with_one_off_signer(config.one_off_signer()) + .build() + .unwrap(); // Trigger the signer to be bound to the one the migration just registered in the mapper krill_signer.random_serial().unwrap(); @@ -1181,13 +1080,11 @@ mod tests { if do_upgrade { // Verify that the mapper has a record of the test key belonging to the signer - assert!(mapper.get_signer_for_key(&expected_key_id).is_ok()); + mapper.get_signer_for_key(&expected_key_id).unwrap(); } else { // Verify that the mapper does NOT have a record of the test key belonging to the signer assert!(mapper.get_signer_for_key(&expected_key_id).is_err()); } - - cleanup(); } #[cfg(all(feature = "hsm", not(any(feature = "hsm-tests-kmip", feature = "hsm-tests-pkcs11"))))] diff --git a/src/upgrades/pre_0_10_0/cas_migration.rs b/src/upgrades/pre_0_10_0/cas_migration.rs index 6239a3a6c..fde01bb10 100644 --- a/src/upgrades/pre_0_10_0/cas_migration.rs +++ b/src/upgrades/pre_0_10_0/cas_migration.rs @@ -5,7 +5,7 @@ use rpki::{ca::idexchange::CaHandle, repository::x509::Time}; use crate::commons::eventsourcing::{StoredCommand, StoredCommandBuilder}; use crate::daemon::ca::CaObjects; -use crate::upgrades::{OldStoredCommand, UnconvertedEffect}; +use crate::upgrades::UnconvertedEffect; use crate::{ commons::{ api::CertAuthStorableCommand, @@ -18,6 +18,7 @@ use crate::{ }, upgrades::{ pre_0_10_0::{Pre0_10CertAuthEvent, Pre0_10CertAuthInitEvent}, + pre_0_14_0::OldStoredCommand, UpgradeAggregateStorePre0_14, UpgradeError, UpgradeMode, UpgradeResult, }, }; @@ -35,7 +36,7 @@ struct CaObjectsMigration { impl CaObjectsMigration { fn create(config: &Config) -> Result { let current_store = KeyValueStore::create(&config.storage_uri, CA_OBJECTS_NS)?; - let new_store = KeyValueStore::create(config.upgrade_storage_uri(), CA_OBJECTS_NS)?; + let new_store = KeyValueStore::create_upgrade_store(&config.storage_uri, CA_OBJECTS_NS)?; Ok(CaObjectsMigration { current_store, new_store, @@ -43,11 +44,12 @@ impl CaObjectsMigration { } fn prepare_new_data_for(&self, ca: &CaHandle) -> Result<(), UpgradeError> { - let key = Key::new_global(Segment::parse_lossy(ca.as_str())); // ca should always be a valid Segment + let key = Key::new_global(Segment::parse_lossy(&format!("{}.json", ca))); // ca should always be a valid Segment if let Some(old_objects) = self.current_store.get::(&key)? { let converted: CaObjects = old_objects.try_into()?; self.new_store.store(&key, &converted)?; + debug!("Stored updated objects for CA {} in {}", ca, self.new_store); } Ok(()) @@ -67,9 +69,10 @@ pub struct CasMigration { impl CasMigration { pub fn upgrade(mode: UpgradeMode, config: &Config) -> UpgradeResult<()> { let current_kv_store = KeyValueStore::create(&config.storage_uri, CASERVER_NS)?; - let new_kv_store = KeyValueStore::create(config.upgrade_storage_uri(), CASERVER_NS)?; - let new_agg_store = AggregateStore::::create( - config.upgrade_storage_uri(), + let new_kv_store = KeyValueStore::create_upgrade_store(&config.storage_uri, CASERVER_NS)?; + + let new_agg_store = AggregateStore::::create_upgrade_store( + &config.storage_uri, CASERVER_NS, config.use_history_cache, )?; diff --git a/src/upgrades/pre_0_10_0/old_events.rs b/src/upgrades/pre_0_10_0/old_events.rs index 6850d8dc3..f832f27da 100644 --- a/src/upgrades/pre_0_10_0/old_events.rs +++ b/src/upgrades/pre_0_10_0/old_events.rs @@ -34,7 +34,7 @@ use crate::{ }, daemon::ta::{TaCertDetails, TrustAnchorLocator}, pubd::{Publisher, RepositoryAccessEvent, RepositoryAccessInitEvent}, - upgrades::{OldStoredEvent, UpgradeError}, + upgrades::UpgradeError, }; #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] @@ -1525,8 +1525,6 @@ impl Eq for OldPublishedCrl {} //------------ OldRepositoryAccessIni ------------------------------------------- -pub type Pre0_10RepositoryAccessIni = OldStoredEvent; - #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct Pre0_10RepositoryAccessInitDetails { id_cert: IdCert, @@ -1552,8 +1550,6 @@ impl fmt::Display for Pre0_10RepositoryAccessInitDetails { //------------ OldRepositoryAccessEvent ----------------------------------------- -pub type Pre0_10RepositoryAccessEvent = OldStoredEvent; - #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] #[allow(clippy::large_enum_variant)] #[serde(rename_all = "snake_case", tag = "type")] diff --git a/src/upgrades/pre_0_10_0/pubd_migration.rs b/src/upgrades/pre_0_10_0/pubd_migration.rs index 6cee9870a..58df57b6d 100644 --- a/src/upgrades/pre_0_10_0/pubd_migration.rs +++ b/src/upgrades/pre_0_10_0/pubd_migration.rs @@ -9,10 +9,9 @@ use crate::{ constants::PUBSERVER_NS, daemon::config::Config, pubd::{RepositoryAccess, RepositoryAccessEvent, RepositoryAccessInitEvent}, - upgrades::{ - OldRepositoryAccessEvent, OldRepositoryAccessInitEvent, OldStoredCommand, UnconvertedEffect, - UpgradeAggregateStorePre0_14, UpgradeMode, UpgradeResult, UpgradeVersions, - }, + upgrades::pre_0_10_0::{Pre0_10RepositoryAccessEventDetails, Pre0_10RepositoryAccessInitDetails}, + upgrades::pre_0_14_0::OldStoredCommand, + upgrades::{UnconvertedEffect, UpgradeAggregateStorePre0_14, UpgradeMode, UpgradeResult, UpgradeVersions}, }; /// Migrates the events, snapshots and info for the event-sourced RepositoryAccess. @@ -26,9 +25,9 @@ pub struct PublicationServerRepositoryAccessMigration { impl PublicationServerRepositoryAccessMigration { pub fn upgrade(mode: UpgradeMode, config: &Config, versions: &UpgradeVersions) -> UpgradeResult<()> { let current_kv_store = KeyValueStore::create(&config.storage_uri, PUBSERVER_NS)?; - let new_kv_store = KeyValueStore::create(config.upgrade_storage_uri(), PUBSERVER_NS)?; + let new_kv_store = KeyValueStore::create_upgrade_store(&config.storage_uri, PUBSERVER_NS)?; let new_agg_store = - AggregateStore::create(config.upgrade_storage_uri(), PUBSERVER_NS, config.use_history_cache)?; + AggregateStore::create_upgrade_store(&config.storage_uri, PUBSERVER_NS, config.use_history_cache)?; let store_migration = PublicationServerRepositoryAccessMigration { current_kv_store, @@ -39,7 +38,7 @@ impl PublicationServerRepositoryAccessMigration { if store_migration .current_kv_store .has_scope(&Scope::from_segment(segment!("0")))? - && versions.from > KrillVersion::release(0, 9, 0) + && versions.from >= KrillVersion::release(0, 9, 0) && versions.from < KrillVersion::candidate(0, 10, 0, 1) { store_migration.upgrade(mode) @@ -52,8 +51,8 @@ impl PublicationServerRepositoryAccessMigration { impl UpgradeAggregateStorePre0_14 for PublicationServerRepositoryAccessMigration { type Aggregate = RepositoryAccess; - type OldInitEvent = OldRepositoryAccessInitEvent; - type OldEvent = OldRepositoryAccessEvent; + type OldInitEvent = Pre0_10RepositoryAccessInitDetails; + type OldEvent = Pre0_10RepositoryAccessEventDetails; type OldStorableDetails = StorableRepositoryCommand; fn store_name(&self) -> &str { @@ -81,7 +80,7 @@ impl UpgradeAggregateStorePre0_14 for PublicationServerRepositoryAccessMigration ) -> UpgradeResult> { let details = StorableRepositoryCommand::Init; let builder = StoredCommandBuilder::::new(actor, time, handle, 0, details); - let init_event: RepositoryAccessInitEvent = old_init.into_details(); + let init_event: RepositoryAccessInitEvent = old_init.into(); Ok(builder.finish_with_init_event(init_event)) } @@ -103,8 +102,7 @@ impl UpgradeAggregateStorePre0_14 for PublicationServerRepositoryAccessMigration let new_command = match old_effect { UnconvertedEffect::Error { msg } => new_command_builder.finish_with_error(msg), UnconvertedEffect::Success { events } => { - let full_events: Vec = - events.into_iter().map(|old| old.into_details()).collect(); + let full_events: Vec = events.into_iter().map(|old| old.into()).collect(); new_command_builder.finish_with_events(full_events) } }; diff --git a/src/upgrades/pre_0_14_0/mod.rs b/src/upgrades/pre_0_14_0/mod.rs index 4fddebdd2..be5a3182d 100644 --- a/src/upgrades/pre_0_14_0/mod.rs +++ b/src/upgrades/pre_0_14_0/mod.rs @@ -1,6 +1,6 @@ use std::{fmt, str::FromStr}; -use kvx::Segment; +use kvx::Namespace; use rpki::{ca::idexchange::MyHandle, repository::x509::Time}; use crate::{ @@ -239,16 +239,16 @@ pub struct GenericUpgradeAggregateStore { } impl GenericUpgradeAggregateStore { - pub fn upgrade(name_space: &Segment, mode: UpgradeMode, config: &Config) -> UpgradeResult<()> { + pub fn upgrade(name_space: &Namespace, mode: UpgradeMode, config: &Config) -> UpgradeResult<()> { let current_kv_store = KeyValueStore::create(&config.storage_uri, name_space)?; if current_kv_store.scopes()?.is_empty() { // nothing to do here Ok(()) } else { - let new_kv_store = KeyValueStore::create(config.upgrade_storage_uri(), name_space)?; + let new_kv_store = KeyValueStore::create_upgrade_store(&config.storage_uri, name_space)?; let new_agg_store = - AggregateStore::::create(config.upgrade_storage_uri(), name_space, config.use_history_cache)?; + AggregateStore::::create_upgrade_store(&config.storage_uri, name_space, config.use_history_cache)?; let store_migration = GenericUpgradeAggregateStore { store_name: name_space.to_string(), diff --git a/test-resources/status_store/migration-0.9.5/ta/status.json b/test-resources/status_store/migration-0.9.5/status/ta/status.json similarity index 100% rename from test-resources/status_store/migration-0.9.5/ta/status.json rename to test-resources/status_store/migration-0.9.5/status/ta/status.json diff --git a/test-resources/status_store/migration-0.9.5/testbed/status.json b/test-resources/status_store/migration-0.9.5/status/testbed/status.json similarity index 100% rename from test-resources/status_store/migration-0.9.5/testbed/status.json rename to test-resources/status_store/migration-0.9.5/status/testbed/status.json diff --git a/tests/auth_check.rs b/tests/auth_check.rs index 20aaf8600..c60b531ce 100644 --- a/tests/auth_check.rs +++ b/tests/auth_check.rs @@ -5,7 +5,7 @@ use rpki::ca::idexchange::Handle; use krill::{ commons::api::Token, - test::{init_ca, start_krill_with_custom_config, test_config, tmp_storage}, + test::{init_ca, mem_storage, start_krill_with_custom_config, test_config}, }; extern crate krill; @@ -16,7 +16,7 @@ async fn auth_check() { // Use a copy of the default test Krill config but change the server admin token thereby hopefully causing the // bearer token sent by the test suite support functions not to match and thus be rejected which in turn should // cause a Rust panic. - let storage_uri = tmp_storage(); + let storage_uri = mem_storage(); let mut config = test_config(&storage_uri, None, false, false, false, false); config.admin_token = Token::from("wrong secret"); diff --git a/tests/benchmark.rs b/tests/benchmark.rs index c5c5cc4d8..edd96790f 100644 --- a/tests/benchmark.rs +++ b/tests/benchmark.rs @@ -6,7 +6,7 @@ use log::LevelFilter; #[tokio::test(flavor = "multi_thread")] async fn benchmark() { let (data_dir, cleanup) = tmp_dir(); - let storage_uri = tmp_storage(); + let storage_uri = mem_storage(); let cas = 10; let ca_roas = 10; diff --git a/tests/functional_ca_import.rs b/tests/functional_ca_import.rs index 31174da0e..48e7a19fe 100644 --- a/tests/functional_ca_import.rs +++ b/tests/functional_ca_import.rs @@ -11,7 +11,7 @@ async fn functional_ca_import() { // Start an empty Krill instance. let (data_dir, cleanup) = tmp_dir(); - let krill_storage = tmp_storage(); + let krill_storage = mem_storage(); let mut config = test_config(&krill_storage, Some(&data_dir), false, false, false, false); config.ta_support_enabled = true; config.ta_signer_enabled = true; diff --git a/tests/functional_keyroll.rs b/tests/functional_keyroll.rs index d642017f2..0aa7f8d57 100644 --- a/tests/functional_keyroll.rs +++ b/tests/functional_keyroll.rs @@ -23,8 +23,7 @@ use krill::{ #[tokio::test] async fn functional_keyroll() { let (data_dir, cleanup) = tmp_dir(); - let storage_uri = tmp_storage(); - // let storage_uri = util::storage::storage_uri_from_data_dir(&data_dir).unwrap(); + let storage_uri = mem_storage(); let config = test_config(&storage_uri, Some(&data_dir), true, false, false, false); start_krill(config).await;