Skip to content

Commit

Permalink
feat: enable pedantic linter and fix clippy findings where appropiate
Browse files Browse the repository at this point in the history
  • Loading branch information
schoenenberg committed Mar 14, 2024
1 parent 8cfb5e1 commit df0a5d4
Show file tree
Hide file tree
Showing 16 changed files with 466 additions and 429 deletions.
405 changes: 183 additions & 222 deletions clearing-house-app/Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion clearing-house-app/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
[package]
name = "clearing-house-app"
version = "0.10.0"
license = "Apache-2.0"
repository = "https://github.com/ids-basecamp/clearinghouse"
authors = [
"Mark Gall <mark.gall@aisec.fraunhofer.de>",
"Georg Bramm <georg.bramm@aisec.fraunhofer.de>",
Expand Down Expand Up @@ -32,7 +34,7 @@ rand = "0.8.5"
# lazy initialization of static variables
once_cell = "1.18.0"
# Base64 encoding
base64 = "0.21.7 "
base64 = "0.21.7"
# UUID generation
uuid = { version = "1", features = ["serde", "v4"] }
# Big integer handling (RSA key modulus and exponent)
Expand Down
13 changes: 8 additions & 5 deletions clearing-house-app/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt::Display;

/// Represents the configuration for the application
#[derive(Debug, serde::Deserialize)]
pub(crate) struct CHConfig {
Expand Down Expand Up @@ -33,15 +35,16 @@ impl From<LogLevel> for tracing::Level {
}
}

impl ToString for LogLevel {
fn to_string(&self) -> String {
match self {
impl Display for LogLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let str = match self {
LogLevel::Trace => String::from("TRACE"),
LogLevel::Debug => String::from("DEBUG"),
LogLevel::Info => String::from("INFO"),
LogLevel::Warn => String::from("WARN"),
LogLevel::Error => String::from("ERROR"),
}
};
write!(f, "{str}")
}
}

Expand Down Expand Up @@ -119,7 +122,7 @@ mod test {
#[serial]
fn test_read_config_from_toml() {
// Create tempfile
let file = tempfile::Builder::new().suffix(".toml").tempfile().unwrap();
let file = tempfile::Builder::new().suffix(".toml").tempfile().expect("Failure to create tempfile");

// Write config to file
let toml = r#"database_url = "mongodb://localhost:27019"
Expand Down
102 changes: 58 additions & 44 deletions clearing-house-app/src/db/postgres_document_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ impl super::DocumentStore for PostgresDocumentStore {
let doc = DocumentRow::from(doc);

sqlx::query(
r#"INSERT INTO documents
r"INSERT INTO documents
(id, process_id, created_at, model_version, correlation_message,
transfer_contract, issued, issuer_connector, content_version, recipient_connector,
sender_agent, recipient_agent, payload, payload_type, message_id)
VALUES
($1, (SELECT id from processes where process_id = $2), $3, $4, $5,
$6, $7, $8, $9, $10,
$11, $12, $13, $14, $15)"#,
$11, $12, $13, $14, $15)",
)
.bind(doc.id) // 1
.bind(doc.process_id) // 2
Expand Down Expand Up @@ -61,26 +61,32 @@ impl super::DocumentStore for PostgresDocumentStore {
.fetch_optional(&self.db)
.await
.map(|r| r.is_some())
.map_err(|e| e.into())
.map_err(std::convert::Into::into)
}

async fn get_document(&self, id: &str, pid: &str) -> anyhow::Result<Option<Document>> {
sqlx::query_as::<_, DocumentRow>(
r#"SELECT documents.id, processes.process_id, documents.created_at, model_version, correlation_message,
r"SELECT documents.id, processes.process_id, documents.created_at, model_version, correlation_message,
transfer_contract, issued, issuer_connector, content_version, recipient_connector,
sender_agent, recipient_agent, payload, payload_type, message_id
FROM documents
LEFT JOIN processes ON processes.id = documents.process_id
WHERE id = $1 AND processes.process_id = $2"#,
WHERE id = $1 AND processes.process_id = $2",
)
.bind(id)
.bind(pid)
.fetch_optional(&self.db)
.await
.map(|r| r.map(DocumentRow::into))
.map_err(|e| e.into())
.bind(id)
.bind(pid)
.fetch_optional(&self.db)
.await
.map(|r| r.map(DocumentRow::into))
.map_err(std::convert::Into::into)
}

/// Get documents for a process
///
/// # Lints
///
/// Disabled `clippy::cast_possible_wrap` because cast is handled
#[allow(clippy::cast_possible_wrap)]
async fn get_documents_for_pid(
&self,
pid: &str,
Expand All @@ -96,27 +102,35 @@ impl super::DocumentStore for PostgresDocumentStore {

sqlx::query_as::<_, DocumentRow>(
format!(
r#"SELECT documents.id, processes.process_id, documents.created_at, model_version, correlation_message,
r"SELECT documents.id, processes.process_id, documents.created_at, model_version, correlation_message,
transfer_contract, issued, issuer_connector, content_version, recipient_connector,
sender_agent, recipient_agent, payload, payload_type, message_id
FROM documents
LEFT JOIN processes ON processes.id = documents.process_id
WHERE processes.process_id = $1 AND documents.created_at BETWEEN $2 AND $3
ORDER BY created_at {}
LIMIT $4 OFFSET $5"#,
sort_order
)
.as_str(),
ORDER BY created_at {sort_order}
LIMIT $4 OFFSET $5")
.as_str(),
)
.bind(pid)
.bind(date_from)
.bind(date_to)
.bind(size as i64)
.bind(((page - 1) * size) as i64)
.fetch_all(&self.db)
.await
.map(|r| r.into_iter().map(DocumentRow::into).collect())
.map_err(|e| e.into())
.bind(pid)
.bind(date_from)
.bind(date_to)
.bind(cast_i64(size)?)
.bind(cast_i64((page - 1) * size)?)
.fetch_all(&self.db)
.await
.map(|r| r.into_iter().map(DocumentRow::into).collect())
.map_err(std::convert::Into::into)
}
}

/// Cast u64 to i64 with out-of-range check
fn cast_i64(value: u64) -> anyhow::Result<i64> {
if value > i64::MAX as u64 {
Err(anyhow::anyhow!("size out-of-range"))
} else {
#[allow(clippy::cast_possible_wrap)]
Ok(value as i64)
}
}

Expand Down Expand Up @@ -161,29 +175,29 @@ impl From<Document> for DocumentRow {
}
}

impl Into<Document> for DocumentRow {
fn into(self) -> Document {
impl From<DocumentRow> for Document {
fn from(value: DocumentRow) -> Self {
use chrono::TimeZone;

Document {
id: self.id,
pid: self.process_id,
ts: chrono::Local.from_utc_datetime(&self.created_at),
Self {
id: value.id,
pid: value.process_id,
ts: chrono::Local.from_utc_datetime(&value.created_at),
content: crate::model::ids::message::IdsMessage {
model_version: self.model_version,
correlation_message: self.correlation_message,
transfer_contract: self.transfer_contract,
issued: self.issued.0,
issuer_connector: self.issuer_connector.0,
content_version: self.content_version,
recipient_connector: self.recipient_connector.map(|s| s.0),
sender_agent: self.sender_agent,
recipient_agent: self.recipient_agent.map(|s| s.0),
payload: self
model_version: value.model_version,
correlation_message: value.correlation_message,
transfer_contract: value.transfer_contract,
issued: value.issued.0,
issuer_connector: value.issuer_connector.0,
content_version: value.content_version,
recipient_connector: value.recipient_connector.map(|s| s.0),
sender_agent: value.sender_agent,
recipient_agent: value.recipient_agent.map(|s| s.0),
payload: value
.payload
.map(|s| String::from_utf8_lossy(s.as_ref()).to_string()),
payload_type: self.payload_type,
id: self.message_id,
payload_type: value.payload_type,
id: value.message_id,
..Default::default()
},
}
Expand Down
48 changes: 24 additions & 24 deletions clearing-house-app/src/db/postgres_process_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ impl PostgresProcessStore {
impl super::ProcessStore for PostgresProcessStore {
async fn get_processes(&self) -> anyhow::Result<Vec<Process>> {
sqlx::query_as::<_, ProcessRow>(
r#"SELECT p.process_id, p.created_at, ARRAY_AGG(c.client_id) AS owners FROM processes p
r"SELECT p.process_id, p.created_at, ARRAY_AGG(c.client_id) AS owners FROM processes p
LEFT JOIN process_owners po ON p.id = po.process_id
LEFT JOIN clients c ON po.client_id = c.id
GROUP BY p.process_id, p.created_at"#,
GROUP BY p.process_id, p.created_at",
)
.fetch_all(&self.db)
.await
.map(|r| r.into_iter().map(|p| p.into()).collect())
.map_err(|e| e.into())
.fetch_all(&self.db)
.await
.map(|r| r.into_iter().map(std::convert::Into::into).collect())
.map_err(std::convert::Into::into)
}

async fn delete_process(&self, pid: &str) -> anyhow::Result<bool> {
Expand All @@ -39,7 +39,7 @@ impl super::ProcessStore for PostgresProcessStore {
.execute(&self.db)
.await
.map(|r| r.rows_affected() == 1)
.map_err(|e| e.into())
.map_err(std::convert::Into::into)
}

async fn exists_process(&self, pid: &str) -> anyhow::Result<bool> {
Expand All @@ -48,22 +48,22 @@ impl super::ProcessStore for PostgresProcessStore {
.fetch_optional(&self.db)
.await
.map(|r| r.is_some())
.map_err(|e| e.into())
.map_err(std::convert::Into::into)
}

async fn get_process(&self, pid: &str) -> anyhow::Result<Option<Process>> {
sqlx::query_as::<_, ProcessRow>(
r#"SELECT p.process_id, p.created_at, ARRAY_AGG(c.client_id) AS owners FROM processes p
r"SELECT p.process_id, p.created_at, ARRAY_AGG(c.client_id) AS owners FROM processes p
LEFT JOIN process_owners po ON p.id = po.process_id
LEFT JOIN clients c ON po.client_id = c.id
WHERE p.process_id = $1
GROUP BY p.process_id, p.created_at"#,
GROUP BY p.process_id, p.created_at",
)
.bind(pid)
.fetch_optional(&self.db)
.await
.map(|r| r.map(|p| p.into()))
.map_err(|e| e.into())
.bind(pid)
.fetch_optional(&self.db)
.await
.map(|r| r.map(std::convert::Into::into))
.map_err(std::convert::Into::into)
}

async fn store_process(&self, process: Process) -> anyhow::Result<()> {
Expand All @@ -72,7 +72,7 @@ impl super::ProcessStore for PostgresProcessStore {

// Create a process
let process_row =
sqlx::query(r#"INSERT INTO processes (process_id) VALUES ($1) RETURNING id"#)
sqlx::query(r"INSERT INTO processes (process_id) VALUES ($1) RETURNING id")
.bind(&process.process_id)
.fetch_one(&mut *tx)
.await?;
Expand All @@ -81,7 +81,7 @@ impl super::ProcessStore for PostgresProcessStore {

for o in process.owners {
// Check if client exists
let client_row = sqlx::query(r#"SELECT id FROM clients WHERE client_id = $1"#)
let client_row = sqlx::query(r"SELECT id FROM clients WHERE client_id = $1")
.bind(&o)
.fetch_optional(&mut *tx)
.await?;
Expand All @@ -90,20 +90,20 @@ impl super::ProcessStore for PostgresProcessStore {
let client_row = match client_row {
Some(crow) => crow,
None => {
sqlx::query(r#"INSERT INTO clients (client_id) VALUES ($1) RETURNING id"#)
sqlx::query(r"INSERT INTO clients (client_id) VALUES ($1) RETURNING id")
.bind(&o)
.fetch_one(&mut *tx)
.await?
}
};

// Get id of client
let oid = client_row.get::<i32, _>("id");
let client_id = client_row.get::<i32, _>("id");

// Create process owner
sqlx::query(r#"INSERT INTO process_owners (process_id, client_id) VALUES ($1, $2)"#)
sqlx::query(r"INSERT INTO process_owners (process_id, client_id) VALUES ($1, $2)")
.bind(pid)
.bind(oid)
.bind(client_id)
.execute(&mut *tx)
.await?;
}
Expand All @@ -129,8 +129,8 @@ impl From<Process> for ProcessRow {
}
}

impl Into<Process> for ProcessRow {
fn into(self) -> Process {
Process::new(self.process_id, self.owners)
impl From<ProcessRow> for Process {
fn from(value: ProcessRow) -> Self {
Self::new(value.process_id, value.owners)
}
}
40 changes: 26 additions & 14 deletions clearing-house-app/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#![forbid(unsafe_code)]
#![warn(clippy::all, clippy::pedantic, clippy::unwrap_used)]
#![allow(clippy::module_name_repetitions)]

#[macro_use]
extern crate tracing;

Expand Down Expand Up @@ -35,22 +39,25 @@ pub(crate) struct AppState {
}

impl AppState {

/// Connect to the database and execute database migrations
async fn setup_postgres(conf: &config::CHConfig) -> anyhow::Result<sqlx::PgPool> {
info!("Connecting to database");
let pool = sqlx::PgPool::connect(&conf.database_url).await?;

info!("Migrating database");
sqlx::migrate!()
.run(&pool)
.await
.expect("Failed to migrate database!");

Ok(pool)
}

/// Initialize the application state from config
async fn init(conf: &config::CHConfig) -> anyhow::Result<Self> {
#[cfg(feature = "postgres")]
let pool = async {
info!("Connecting to database");
let pool = sqlx::PgPool::connect(&conf.database_url).await.unwrap();

info!("Migrating database");
sqlx::migrate!()
.run(&pool)
.await
.expect("Failed to migrate database!");

pool
}
.await;
let pool = Self::setup_postgres(conf).await?;

trace!("Initializing Process store");
#[cfg(feature = "mongodb")]
Expand Down Expand Up @@ -85,7 +92,7 @@ impl AppState {
));

let service_config = Arc::new(util::init_service_config(
ENV_LOGGING_SERVICE_ID.to_string(),
ENV_LOGGING_SERVICE_ID,
)?);
let signing_key = util::init_signing_key(conf.signing_key.as_deref())?;

Expand All @@ -97,6 +104,11 @@ impl AppState {
}
}

/// Initialize the application
///
/// # Errors
///
/// Throws an error if the `AppState` cannot be initialized
pub async fn app() -> anyhow::Result<axum::Router> {
// Read configuration
let conf = config::read_config(None);
Expand Down
Loading

0 comments on commit df0a5d4

Please sign in to comment.