Skip to content

Commit

Permalink
feat: support cassandra database (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
dracarys18 authored Dec 20, 2024
1 parent 556e21d commit d30bb6f
Show file tree
Hide file tree
Showing 13 changed files with 1,585 additions and 610 deletions.
1,962 changes: 1,387 additions & 575 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ opentelemetry-prometheus = "0.16.0"
prometheus = "0.13.4"
serde = { version = "1.0.202", features = ["derive"] }
serde_json = "1.0.117"
masking = { git = "https://github.com/juspay/hyperswitch", tag = "v1.109.0" }
masking = { git = "https://github.com/juspay/hyperswitch.git",tag = "2024.12.17.0",features = ["cassandra"] }
async-trait = "0.1.80"
hex = "0.4.3"
charybdis = { git = "https://github.com/dracarys18/charybdis.git", rev = "1637c263f512adf7840705e3ab405a113437a6e3" }
scylla = { git = "https://github.com/juspay/scylla-rust-driver.git",rev = "5700aa2847b25437cdd4fcf34d707aa90dca8b89", features = ["time-03"]}
ring = { version = "0.17.8", features = ["std"] }
strum = { version = "0.26", features = ["derive"] }
futures = "0.3.30"
Expand Down
7 changes: 7 additions & 0 deletions config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ dbname = "encryption_db"
pool_size = 5
min_idle = 2

[cassandra]
known_nodes = ["localhost:9042"]
pool_size = 16
keyspace = "encryption_db"
timeout = 60
cache_size = 120

[log]
log_level = "debug"
log_format = "console"
Expand Down
5 changes: 4 additions & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ use crate::{config::Config, crypto::KeyManagerClient, storage::DbState};

use crate::storage::adapter;

#[cfg(not(feature = "cassandra"))]
use diesel_async::pooled_connection::bb8::Pool;
#[cfg(not(feature = "cassandra"))]
use diesel_async::AsyncPgConnection;

use rayon::{ThreadPool, ThreadPoolBuilder};

#[cfg(not(feature = "cassandra"))]
type StorageState = DbState<Pool<AsyncPgConnection>, adapter::PostgreSQL>;

#[cfg(feature = "cassandra")]
type StorageState = DbState<Pool<AsyncPgConnection>, adapter::Cassandra>;
type StorageState = DbState<scylla::CachingSession, adapter::Cassandra>;

pub struct AppState {
pub conf: Config,
Expand Down
17 changes: 16 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use crate::{
env::observability::LogConfig,
errors::{self, CustomResult},
};

use std::num::NonZeroUsize;

use config::File;
use serde::Deserialize;
use std::sync::Arc;
Expand Down Expand Up @@ -133,12 +136,22 @@ pub struct Config {
pub metrics_server: Server,
pub database: Database,
pub secrets: Secrets,
pub cassandra: Cassandra,
pub log: LogConfig,
pub pool_config: PoolConfig,
#[cfg(feature = "mtls")]
pub certs: Certs,
}

#[derive(Deserialize, Debug)]
pub struct Cassandra {
pub known_nodes: Vec<String>,
pub keyspace: String,
pub timeout: u32,
pub pool_size: NonZeroUsize,
pub cache_size: usize,
}

#[derive(Deserialize, Debug)]
pub struct Database {
pub port: u16,
Expand Down Expand Up @@ -215,7 +228,9 @@ impl Config {
.add_source(
config::Environment::with_prefix("CRIPTA")
.try_parsing(true)
.separator("__"),
.separator("__")
.list_separator(",")
.with_list_parse_key("cassandra.known_nodes"),
)
.build()
.expect("Unable to find configuration");
Expand Down
2 changes: 1 addition & 1 deletion src/crypto/aes256.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl<'de> Deserialize<'de> for GcmAes256 {
{
struct Aes256Visitor;

impl<'de> Visitor<'de> for Aes256Visitor {
impl Visitor<'_> for Aes256Visitor {
type Value = GcmAes256;

fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down
19 changes: 18 additions & 1 deletion src/errors/db.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::env::observability as logger;
use diesel::result::{DatabaseErrorKind, Error as diesel_error};
use error_stack::{report, ResultExt};
use thiserror::Error;
Expand All @@ -8,7 +9,7 @@ pub enum ConnectionError {
ConnectionEstablishFailed,
}

#[derive(Error, Debug)]
#[derive(Error, Debug, PartialEq, Eq)]
pub enum DatabaseError {
#[error("Failed to get the connection out of the pool")]
ConnectionError(error_stack::Report<ConnectionError>),
Expand Down Expand Up @@ -47,6 +48,22 @@ impl<T> super::SwitchError<T, DatabaseError> for Result<T, diesel::result::Error
}
}

impl<T> super::SwitchError<T, DatabaseError> for Result<T, charybdis::errors::CharybdisError> {
fn switch(self) -> super::CustomResult<T, DatabaseError> {
self.map_err(|err| {
let (err, message) = match err {
charybdis::errors::CharybdisError::NotFoundError(err) => {
(DatabaseError::NotFound, err)
}
err => {
logger::error!(err=?err);
(DatabaseError::Others, "An unknown error occurred")
}
};
report!(err).attach_printable(message)
})
}
}
impl<T> super::SwitchError<T, DatabaseError> for super::CustomResult<T, super::CryptoError> {
fn switch(self) -> super::CustomResult<T, super::DatabaseError> {
self.change_context(DatabaseError::InvalidValue)
Expand Down
1 change: 0 additions & 1 deletion src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ where
/// # Panics
///
/// Panics if unable to connect to Database
#[allow(clippy::expect_used)]
pub async fn from_config(
config: &Config,
) -> DbState<<Self as DbAdapter>::Pool, <Self as DbAdapter>::AdapterType> {
Expand Down
33 changes: 21 additions & 12 deletions src/storage/adapter/cassandra.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,34 @@
mod dek;

use crate::storage::{adapter::Cassandra, errors, Config, Connection, DbState};
use crate::storage::{adapter::Cassandra, errors, Config, DbState};

#[async_trait::async_trait]
impl super::DbAdapter
for DbState<
diesel_async::pooled_connection::bb8::Pool<diesel_async::AsyncPgConnection>,
Cassandra,
>
{
type Conn<'a> = Connection<'a>;
impl super::DbAdapter for DbState<scylla::CachingSession, Cassandra> {
type Conn<'a> = &'a scylla::CachingSession;
type AdapterType = Cassandra;
type Pool = diesel_async::pooled_connection::bb8::Pool<diesel_async::AsyncPgConnection>;
type Pool = scylla::CachingSession;

async fn from_config(_config: &Config) -> Self {
unimplemented!("Not implemented Yet")
#[allow(clippy::expect_used)]
async fn from_config(config: &Config) -> Self {
let session = scylla::SessionBuilder::new()
.known_nodes(&config.cassandra.known_nodes)
.pool_size(scylla::transport::session::PoolSize::PerHost(
config.cassandra.pool_size,
))
.use_keyspace(&config.cassandra.keyspace, false)
.build()
.await
.expect("Unable to build the cassandra Pool");

Self {
_adapter: std::marker::PhantomData,
pool: scylla::CachingSession::from(session, config.cassandra.cache_size),
}
}

async fn get_conn<'a>(
&'a self,
) -> errors::CustomResult<Self::Conn<'a>, errors::ConnectionError> {
unimplemented!("Not implemented Yet")
Ok(&self.pool)
}
}
72 changes: 58 additions & 14 deletions src/storage/adapter/cassandra/dek.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use charybdis::operations::Insert;

use super::DbState;

use crate::{
errors::{self, CustomResult},
env::observability as logger,
errors::{self, CustomResult, DatabaseError, SwitchError},
storage::{
adapter::Cassandra,
dek::DataKeyStorageInterface,
Expand All @@ -10,32 +13,73 @@ use crate::{
types::{key::Version, Identifier},
};

use charybdis::options::Consistency;
use error_stack::ResultExt;

#[async_trait::async_trait]
impl DataKeyStorageInterface
for DbState<
diesel_async::pooled_connection::bb8::Pool<diesel_async::AsyncPgConnection>,
Cassandra,
>
{
impl DataKeyStorageInterface for DbState<scylla::CachingSession, Cassandra> {
async fn get_or_insert_data_key(
&self,
_new: DataKeyNew,
new: DataKeyNew,
) -> CustomResult<DataKey, errors::DatabaseError> {
Err(error_stack::report!(errors::DatabaseError::UniqueViolation))
let connection = self.get_conn().await.switch()?;
let key: DataKey = new.into();

let find_query = self
.get_key(
key.version,
&Identifier::try_from((key.data_identifier.clone(), key.key_identifier.clone()))
.change_context(errors::DatabaseError::Others)?,
)
.await;

match find_query {
Ok(key) => Ok(key),
Err(err) => {
if !err.current_context().eq(&DatabaseError::NotFound) {
logger::error!(database_err=?err);
}
key.insert()
.consistency(Consistency::EachQuorum)
.execute(connection)
.await
.switch()?;
Ok(key)
}
}
}

async fn get_latest_version(
&self,
_identifier: &Identifier,
identifier: &Identifier,
) -> CustomResult<Version, errors::DatabaseError> {
Err(error_stack::report!(errors::DatabaseError::UniqueViolation))
let (data_id, key_id) = identifier.get_identifier();
let connection = self.get_conn().await.switch()?;

let data_key = DataKey::find_first_by_key_identifier_and_data_identifier(key_id, data_id)
.consistency(scylla::statement::Consistency::LocalQuorum)
.execute(connection)
.await
.switch()?;

Ok(data_key.version)
}

async fn get_key(
&self,
_v: Version,
_identifier: &Identifier,
v: Version,
identifier: &Identifier,
) -> CustomResult<DataKey, errors::DatabaseError> {
Err(error_stack::report!(errors::DatabaseError::UniqueViolation))
let (data_id, key_id) = identifier.get_identifier();
let connection = self.get_conn().await.switch()?;

let data_key =
DataKey::find_by_key_identifier_and_data_identifier_and_version(key_id, data_id, v)
.consistency(scylla::statement::Consistency::LocalQuorum)
.execute(connection)
.await
.switch()?;

Ok(data_key)
}
}
25 changes: 25 additions & 0 deletions src/storage/types/dek.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{schema::data_key_store, types::key::Version};
use charybdis::macros::charybdis_model;
use diesel::{Identifiable, Insertable, Queryable};
use masking::StrongSecret;
use time::PrimitiveDateTime;
Expand All @@ -15,6 +16,15 @@ pub struct DataKeyNew {
pub token: Option<StrongSecret<String>>,
}

#[charybdis_model(
table_name = data_key_store,
partition_keys = [key_identifier, data_identifier],
clustering_keys = [version],
table_options = r#"
CLUSTERING ORDER BY (version DESC)
AND gc_grace_seconds = 86400
"#
)]
#[derive(Queryable, Identifiable)]
#[diesel(table_name = data_key_store)]
pub struct DataKey {
Expand All @@ -27,3 +37,18 @@ pub struct DataKey {
pub source: String,
pub token: Option<StrongSecret<String>>,
}

impl From<DataKeyNew> for DataKey {
fn from(value: DataKeyNew) -> Self {
Self {
id: 0,
key_identifier: value.key_identifier,
data_identifier: value.data_identifier,
encryption_key: value.encryption_key,
version: value.version,
created_at: value.created_at,
source: value.source,
token: value.token,
}
}
}
4 changes: 2 additions & 2 deletions src/types/core/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<'de> Deserialize<'de> for DecryptedData {
{
struct DecryptedDataVisitor;

impl<'de> Visitor<'de> for DecryptedDataVisitor {
impl Visitor<'_> for DecryptedDataVisitor {
type Value = DecryptedData;

fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down Expand Up @@ -98,7 +98,7 @@ impl<'de> Deserialize<'de> for EncryptedData {
{
struct EncryptedDataVisitor;

impl<'de> Visitor<'de> for EncryptedDataVisitor {
impl Visitor<'_> for EncryptedDataVisitor {
type Value = EncryptedData;

fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down
Loading

0 comments on commit d30bb6f

Please sign in to comment.