Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ch-app): Remove Blockchain, add integration tests #86

Merged
merged 5 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
299 changes: 130 additions & 169 deletions clearing-house-app/Cargo.lock

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions clearing-house-app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ edition = "2021"
# JWT
biscuit = "0.6.0"
# Database
mongodb = { version = ">= 2.7.0" , features = ["openssl-tls"]}
mongodb = { version = ">= 2.7.0", features = ["openssl-tls"] }
# Serialization
serde = { version = ">1.0.184", features = ["derive"] }
serde = { version = "> 1.0.184", features = ["derive"] }
serde_json = "1"
# Error handling
anyhow = "1"
Expand All @@ -26,7 +26,6 @@ aes = "0.8.3"
aes-gcm-siv = "0.11.1"
hkdf = "0.12.3"
sha2 = "0.10.7"
blake2-rfc = "0.2.18"
ring = "0.16.20"
# Fixed size arrays
generic-array = "0.14.7"
Expand Down Expand Up @@ -54,15 +53,19 @@ axum = { version = "0.6.20", features = ["json", "http2"] }
# Helper to allow defining traits for async functions
async-trait = "0.1.73"
# Helper for working with futures
futures = "0.3.28"
futures = "0.3.29"
# Helper for creating custom error types
thiserror = "1.0.48"
# Optional: Sentry integration
sentry = { version = "0.31.7", optional = true }

[dev-dependencies]
# Controlling execution of unit test cases, which could interfere with each other
serial_test = "2.0.0"
# Tempfile creation for testing
tempfile = "3.8.0"
tempfile = "3.8"
tower = { version = "0.4", features = ["util"] }
hyper = { version = "0.14.27", features = ["full"] }

[features]
default = []
Expand Down
Binary file removed clearing-house-app/certs/daps-dev.der
Binary file not shown.
Binary file removed clearing-house-app/certs/daps.der
Binary file not shown.
20 changes: 6 additions & 14 deletions clearing-house-app/src/db/doc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use crate::model::SortingOrder;
use anyhow::anyhow;
use futures::StreamExt;
use mongodb::bson::doc;
use mongodb::options::{
AggregateOptions, CreateCollectionOptions, IndexOptions, UpdateOptions, WriteConcern,
};
use mongodb::options::{AggregateOptions, CreateCollectionOptions, UpdateOptions, WriteConcern};
use mongodb::{bson, Client, IndexModel};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -86,7 +84,7 @@ impl DataStore {
};

// This purpose of this index is to ensure that the transaction counter is unique
let mut index_options = IndexOptions::default();
/*let mut index_options = IndexOptions::default();
index_options.unique = Some(true);
let mut index_model = IndexModel::default();
index_model.keys = doc! {format!("{}.{}",MONGO_DOC_ARRAY, MONGO_TC): 1};
Expand All @@ -107,7 +105,7 @@ impl DataStore {
debug!("... failed.");
return Err(anyhow!("Failed to create index"));
}
}
}*/

// This creates a compound index over pid and the timestamp to enable paging using buckets
let mut compound_index_model = IndexModel::default();
Expand Down Expand Up @@ -163,7 +161,7 @@ impl DataStore {
MONGO_DOC_ARRAY: mongodb::bson::to_bson(&bucket_update)?,
},
"$inc": {"counter": 1},
"$setOnInsert": { "_id": format!("{}_{}", doc.pid.clone(), doc.ts), MONGO_DT_ID: doc.dt_id.clone(), MONGO_FROM_TS: doc.ts},
"$setOnInsert": { "_id": format!("{}_{}_{}", doc.pid.clone(), doc.ts, crate::util::new_uuid()), MONGO_DT_ID: doc.dt_id.clone(), MONGO_FROM_TS: doc.ts},
"$set": {MONGO_TO_TS: doc.ts},
}, update_options).await {
Ok(_r) => {
Expand Down Expand Up @@ -326,14 +324,14 @@ impl DataStore {
doc! {"$skip": skip_buckets},
// worst case: overlap between two buckets.
doc! {"$limit": 2},
doc! {"$unwind": format ! ("${}", MONGO_DOC_ARRAY)},
doc! {"$unwind": format! ("${}", MONGO_DOC_ARRAY)},
doc! {"$replaceRoot": { "newRoot": "$documents"}},
doc! {"$match":{
MONGO_TS: {"$gte": date_from.timestamp(), "$lte": date_to.timestamp()}
}},
doc! {"$sort": {MONGO_TS: sort_order}},
doc! {"$skip": start_entry as i32},
doc! { "$limit": size as i32},
doc! {"$limit": size as i32},
];

let coll = self
Expand Down Expand Up @@ -483,8 +481,6 @@ mod bucket {
pub struct DocumentBucketUpdate {
pub id: String,
pub ts: i64,
pub tc: i64,
pub hash: String,
pub keys_ct: String,
pub cts: Vec<String>,
}
Expand All @@ -494,8 +490,6 @@ mod bucket {
DocumentBucketUpdate {
id: doc.id.clone(),
ts: doc.ts,
tc: doc.tc,
hash: doc.hash.clone(),
keys_ct: doc.keys_ct.clone(),
cts: doc.cts.to_vec(),
}
Expand All @@ -512,8 +506,6 @@ mod bucket {
dt_id,
pid,
ts: bucket_update.ts,
tc: bucket_update.tc,
hash: bucket_update.hash.clone(),
keys_ct: bucket_update.keys_ct.clone(),
cts: bucket_update.cts.to_vec(),
}
Expand Down
85 changes: 85 additions & 0 deletions clearing-house-app/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#[macro_use]
extern crate tracing;

use crate::db::doc_store::DataStore;
use crate::db::key_store::KeyStore;
use crate::db::process_store::ProcessStore;
use crate::model::constants::ENV_LOGGING_SERVICE_ID;
use crate::util::ServiceConfig;
use std::sync::Arc;

mod config;
mod crypto;
mod db;
pub mod model;
mod ports;
mod services;
pub mod util;

/// Contains the application state
#[derive(Clone)]
pub struct AppState {
#[cfg_attr(not(doc_type), allow(dead_code))]
pub keyring_service: Arc<services::keyring_service::KeyringService>,
pub logging_service: Arc<services::logging_service::LoggingService>,
pub service_config: Arc<ServiceConfig>,
pub signing_key_path: String,
}

impl AppState {
/// Initialize the application state from config
async fn init(conf: &config::CHConfig) -> anyhow::Result<Self> {
trace!("Initializing Process store");
let process_store =
ProcessStore::init_process_store(&conf.process_database_url, conf.clear_db)
.await
.expect("Failure to initialize process store! Exiting...");
trace!("Initializing Keyring store");
let keyring_store = KeyStore::init_keystore(&conf.keyring_database_url, conf.clear_db)
.await
.expect("Failure to initialize keyring store! Exiting...");
trace!("Initializing Document store");
let doc_store = DataStore::init_datastore(&conf.document_database_url, conf.clear_db)
.await
.expect("Failure to initialize document store! Exiting...");

trace!("Initializing services");
let keyring_service = Arc::new(services::keyring_service::KeyringService::new(
keyring_store,
));
let doc_service = Arc::new(services::document_service::DocumentService::new(
doc_store,
keyring_service.clone(),
));
let logging_service = Arc::new(services::logging_service::LoggingService::new(
process_store,
doc_service.clone(),
));

let service_config = Arc::new(util::init_service_config(
ENV_LOGGING_SERVICE_ID.to_string(),
)?);
let signing_key = util::init_signing_key(conf.signing_key.as_deref())?;

Ok(Self {
signing_key_path: signing_key,
service_config,
keyring_service,
logging_service,
})
}
}

pub async fn app() -> anyhow::Result<axum::Router> {
// Read configuration
let conf = config::read_config(None);
config::configure_logging(&conf);

tracing::info!("Config read successfully! Initializing application ...");

// Initialize application state
let app_state = AppState::init(&conf).await?;

// Setup router
Ok(ports::router().with_state(app_state))
}
89 changes: 5 additions & 84 deletions clearing-house-app/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,104 +1,25 @@
#![forbid(unsafe_code)]
#![warn(clippy::unwrap_used)]

#[macro_use]
extern crate tracing;

use crate::db::doc_store::DataStore;
use crate::db::key_store::KeyStore;
use crate::db::process_store::ProcessStore;
use crate::model::constants::ENV_LOGGING_SERVICE_ID;
use crate::util::ServiceConfig;
use std::net::SocketAddr;
use std::sync::Arc;

mod config;
mod crypto;
mod db;
mod model;
mod ports;
mod services;
mod util;

/// Contains the application state
#[derive(Clone)]
pub(crate) struct AppState {
#[cfg_attr(not(doc_type), allow(dead_code))]
pub keyring_service: Arc<services::keyring_service::KeyringService>,
pub logging_service: Arc<services::logging_service::LoggingService>,
pub service_config: Arc<ServiceConfig>,
pub signing_key_path: String,
}

impl AppState {
/// Initialize the application state from config
async fn init(conf: &config::CHConfig) -> anyhow::Result<Self> {
trace!("Initializing Process store");
let process_store =
ProcessStore::init_process_store(&conf.process_database_url, conf.clear_db)
.await
.expect("Failure to initialize process store! Exiting...");
trace!("Initializing Keyring store");
let keyring_store = KeyStore::init_keystore(&conf.keyring_database_url, conf.clear_db)
.await
.expect("Failure to initialize keyring store! Exiting...");
trace!("Initializing Document store");
let doc_store = DataStore::init_datastore(&conf.document_database_url, conf.clear_db)
.await
.expect("Failure to initialize document store! Exiting...");

trace!("Initializing services");
let keyring_service = Arc::new(services::keyring_service::KeyringService::new(
keyring_store,
));
let doc_service = Arc::new(services::document_service::DocumentService::new(
doc_store,
keyring_service.clone(),
));
let logging_service = Arc::new(services::logging_service::LoggingService::new(
process_store,
doc_service.clone(),
));

let service_config = Arc::new(util::init_service_config(
ENV_LOGGING_SERVICE_ID.to_string(),
)?);
let signing_key = util::init_signing_key(conf.signing_key.as_deref())?;

Ok(Self {
signing_key_path: signing_key,
service_config,
keyring_service,
logging_service,
})
}
}

/// Main function: Reading config, initializing application state, starting server
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
#[cfg(feature = "sentry")]
let _guard = sentry::init(("https://347cc3aa30aa0c07d437da8c780838d3@o4506146399322112.ingest.sentry.io/4506155710480384", sentry::ClientOptions {
release: sentry::release_name!(),
..Default::default()
release: sentry::release_name!(),
..Default::default()
}));
// Read configuration
let conf = config::read_config(None);
config::configure_logging(&conf);

info!("Config read successfully! Initializing application ...");

// Initialize application state
let app_state = AppState::init(&conf).await?;

// Setup router
let app = ports::router().with_state(app_state);
let app = clearing_house_app::app().await?;

// Bind port and start server
let addr = SocketAddr::from(([0, 0, 0, 0], 8000));
info!("Starting server: Listening on {}", addr);
tracing::info!("Starting server: Listening on {}", addr);
Ok(axum::Server::bind(&addr)
.serve(app.into_make_service())
.with_graceful_shutdown(util::shutdown_signal())
.with_graceful_shutdown(clearing_house_app::util::shutdown_signal())
.await?)
}
Loading
Loading