Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support cassandra database #33

Merged
merged 11 commits into from
Dec 20, 2024
1,962 changes: 1,387 additions & 575 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ opentelemetry-prometheus = "0.16.0"
prometheus = "0.13.4"
serde = { version = "1.0.202", features = ["derive"] }
serde_json = "1.0.117"
masking = { git = "https://github.com/juspay/hyperswitch", tag = "v1.109.0" }
masking = { git = "https://github.com/juspay/hyperswitch.git",tag = "2024.12.17.0",features = ["cassandra"] }
async-trait = "0.1.80"
hex = "0.4.3"
charybdis = { git = "https://github.com/dracarys18/charybdis.git", rev = "1637c263f512adf7840705e3ab405a113437a6e3" }
scylla = { git = "https://github.com/juspay/scylla-rust-driver.git",rev = "5700aa2847b25437cdd4fcf34d707aa90dca8b89", features = ["time-03"]}
ring = { version = "0.17.8", features = ["std"] }
strum = { version = "0.26", features = ["derive"] }
futures = "0.3.30"
Expand Down
7 changes: 7 additions & 0 deletions config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ dbname = "encryption_db"
pool_size = 5
min_idle = 2

[cassandra]
known_nodes = ["localhost:9042"]
pool_size = 16
keyspace = "encryption_db"
timeout = 60
cache_size = 120

[log]
log_level = "debug"
log_format = "console"
Expand Down
5 changes: 4 additions & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ use crate::{config::Config, crypto::KeyManagerClient, storage::DbState};

use crate::storage::adapter;

#[cfg(not(feature = "cassandra"))]
use diesel_async::pooled_connection::bb8::Pool;
#[cfg(not(feature = "cassandra"))]
use diesel_async::AsyncPgConnection;

use rayon::{ThreadPool, ThreadPoolBuilder};

#[cfg(not(feature = "cassandra"))]
type StorageState = DbState<Pool<AsyncPgConnection>, adapter::PostgreSQL>;

#[cfg(feature = "cassandra")]
type StorageState = DbState<Pool<AsyncPgConnection>, adapter::Cassandra>;
type StorageState = DbState<scylla::CachingSession, adapter::Cassandra>;

pub struct AppState {
pub conf: Config,
Expand Down
17 changes: 16 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use crate::{
env::observability::LogConfig,
errors::{self, CustomResult},
};

use std::num::NonZeroUsize;

use config::File;
use serde::Deserialize;
use std::sync::Arc;
Expand Down Expand Up @@ -133,12 +136,22 @@ pub struct Config {
pub metrics_server: Server,
pub database: Database,
pub secrets: Secrets,
pub cassandra: Cassandra,
pub log: LogConfig,
pub pool_config: PoolConfig,
#[cfg(feature = "mtls")]
pub certs: Certs,
}

#[derive(Deserialize, Debug)]
pub struct Cassandra {
pub known_nodes: Vec<String>,
pub keyspace: String,
pub timeout: u32,
pub pool_size: NonZeroUsize,
pub cache_size: usize,
}

#[derive(Deserialize, Debug)]
pub struct Database {
pub port: u16,
Expand Down Expand Up @@ -215,7 +228,9 @@ impl Config {
.add_source(
config::Environment::with_prefix("CRIPTA")
.try_parsing(true)
.separator("__"),
.separator("__")
.list_separator(",")
.with_list_parse_key("cassandra.known_nodes"),
)
.build()
.expect("Unable to find configuration");
Expand Down
2 changes: 1 addition & 1 deletion src/crypto/aes256.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl<'de> Deserialize<'de> for GcmAes256 {
{
struct Aes256Visitor;

impl<'de> Visitor<'de> for Aes256Visitor {
impl Visitor<'_> for Aes256Visitor {
type Value = GcmAes256;

fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down
19 changes: 18 additions & 1 deletion src/errors/db.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::env::observability as logger;
use diesel::result::{DatabaseErrorKind, Error as diesel_error};
use error_stack::{report, ResultExt};
use thiserror::Error;
Expand All @@ -8,7 +9,7 @@ pub enum ConnectionError {
ConnectionEstablishFailed,
}

#[derive(Error, Debug)]
#[derive(Error, Debug, PartialEq, Eq)]
pub enum DatabaseError {
#[error("Failed to get the connection out of the pool")]
ConnectionError(error_stack::Report<ConnectionError>),
Expand Down Expand Up @@ -47,6 +48,22 @@ impl<T> super::SwitchError<T, DatabaseError> for Result<T, diesel::result::Error
}
}

impl<T> super::SwitchError<T, DatabaseError> for Result<T, charybdis::errors::CharybdisError> {
fn switch(self) -> super::CustomResult<T, DatabaseError> {
self.map_err(|err| {
let (err, message) = match err {
charybdis::errors::CharybdisError::NotFoundError(err) => {
(DatabaseError::NotFound, err)
}
err => {
logger::error!(err=?err);
(DatabaseError::Others, "An unknown error occurred")
}
};
report!(err).attach_printable(message)
})
}
}
impl<T> super::SwitchError<T, DatabaseError> for super::CustomResult<T, super::CryptoError> {
fn switch(self) -> super::CustomResult<T, super::DatabaseError> {
self.change_context(DatabaseError::InvalidValue)
Expand Down
1 change: 0 additions & 1 deletion src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ where
/// # Panics
///
/// Panics if unable to connect to Database
#[allow(clippy::expect_used)]
pub async fn from_config(
config: &Config,
) -> DbState<<Self as DbAdapter>::Pool, <Self as DbAdapter>::AdapterType> {
Expand Down
33 changes: 21 additions & 12 deletions src/storage/adapter/cassandra.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,34 @@
mod dek;

use crate::storage::{adapter::Cassandra, errors, Config, Connection, DbState};
use crate::storage::{adapter::Cassandra, errors, Config, DbState};

#[async_trait::async_trait]
impl super::DbAdapter
for DbState<
diesel_async::pooled_connection::bb8::Pool<diesel_async::AsyncPgConnection>,
Cassandra,
>
{
type Conn<'a> = Connection<'a>;
impl super::DbAdapter for DbState<scylla::CachingSession, Cassandra> {
type Conn<'a> = &'a scylla::CachingSession;
type AdapterType = Cassandra;
type Pool = diesel_async::pooled_connection::bb8::Pool<diesel_async::AsyncPgConnection>;
type Pool = scylla::CachingSession;

async fn from_config(_config: &Config) -> Self {
unimplemented!("Not implemented Yet")
#[allow(clippy::expect_used)]
async fn from_config(config: &Config) -> Self {
let session = scylla::SessionBuilder::new()
.known_nodes(&config.cassandra.known_nodes)
.pool_size(scylla::transport::session::PoolSize::PerHost(
config.cassandra.pool_size,
))
.use_keyspace(&config.cassandra.keyspace, false)
.build()
.await
.expect("Unable to build the cassandra Pool");

Self {
_adapter: std::marker::PhantomData,
pool: scylla::CachingSession::from(session, config.cassandra.cache_size),
}
}

async fn get_conn<'a>(
&'a self,
) -> errors::CustomResult<Self::Conn<'a>, errors::ConnectionError> {
unimplemented!("Not implemented Yet")
Ok(&self.pool)
}
}
72 changes: 58 additions & 14 deletions src/storage/adapter/cassandra/dek.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use charybdis::operations::Insert;

use super::DbState;

use crate::{
errors::{self, CustomResult},
env::observability as logger,
errors::{self, CustomResult, DatabaseError, SwitchError},
storage::{
adapter::Cassandra,
dek::DataKeyStorageInterface,
Expand All @@ -10,32 +13,73 @@ use crate::{
types::{key::Version, Identifier},
};

use charybdis::options::Consistency;
use error_stack::ResultExt;

#[async_trait::async_trait]
impl DataKeyStorageInterface
for DbState<
diesel_async::pooled_connection::bb8::Pool<diesel_async::AsyncPgConnection>,
Cassandra,
>
{
impl DataKeyStorageInterface for DbState<scylla::CachingSession, Cassandra> {
async fn get_or_insert_data_key(
&self,
_new: DataKeyNew,
new: DataKeyNew,
) -> CustomResult<DataKey, errors::DatabaseError> {
Err(error_stack::report!(errors::DatabaseError::UniqueViolation))
let connection = self.get_conn().await.switch()?;
let key: DataKey = new.into();

let find_query = self
.get_key(
key.version,
&Identifier::try_from((key.data_identifier.clone(), key.key_identifier.clone()))
.change_context(errors::DatabaseError::Others)?,
)
.await;

match find_query {
Ok(key) => Ok(key),
Err(err) => {
if !err.current_context().eq(&DatabaseError::NotFound) {
logger::error!(database_err=?err);
}
key.insert()
.consistency(Consistency::EachQuorum)
.execute(connection)
.await
.switch()?;
Ok(key)
}
}
}

async fn get_latest_version(
&self,
_identifier: &Identifier,
identifier: &Identifier,
) -> CustomResult<Version, errors::DatabaseError> {
Err(error_stack::report!(errors::DatabaseError::UniqueViolation))
let (data_id, key_id) = identifier.get_identifier();
let connection = self.get_conn().await.switch()?;

let data_key = DataKey::find_first_by_key_identifier_and_data_identifier(key_id, data_id)
.consistency(scylla::statement::Consistency::LocalQuorum)
.execute(connection)
.await
.switch()?;

Ok(data_key.version)
}

async fn get_key(
&self,
_v: Version,
_identifier: &Identifier,
v: Version,
identifier: &Identifier,
) -> CustomResult<DataKey, errors::DatabaseError> {
Err(error_stack::report!(errors::DatabaseError::UniqueViolation))
let (data_id, key_id) = identifier.get_identifier();
let connection = self.get_conn().await.switch()?;

let data_key =
DataKey::find_by_key_identifier_and_data_identifier_and_version(key_id, data_id, v)
.consistency(scylla::statement::Consistency::LocalQuorum)
.execute(connection)
.await
.switch()?;

Ok(data_key)
}
}
25 changes: 25 additions & 0 deletions src/storage/types/dek.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{schema::data_key_store, types::key::Version};
use charybdis::macros::charybdis_model;
use diesel::{Identifiable, Insertable, Queryable};
use masking::StrongSecret;
use time::PrimitiveDateTime;
Expand All @@ -15,6 +16,15 @@ pub struct DataKeyNew {
pub token: Option<StrongSecret<String>>,
}

#[charybdis_model(
table_name = data_key_store,
partition_keys = [key_identifier, data_identifier],
clustering_keys = [version],
table_options = r#"
CLUSTERING ORDER BY (version DESC)
AND gc_grace_seconds = 86400
"#
)]
#[derive(Queryable, Identifiable)]
#[diesel(table_name = data_key_store)]
pub struct DataKey {
Expand All @@ -27,3 +37,18 @@ pub struct DataKey {
pub source: String,
pub token: Option<StrongSecret<String>>,
}

impl From<DataKeyNew> for DataKey {
fn from(value: DataKeyNew) -> Self {
Self {
id: 0,
key_identifier: value.key_identifier,
data_identifier: value.data_identifier,
encryption_key: value.encryption_key,
version: value.version,
created_at: value.created_at,
source: value.source,
token: value.token,
}
}
}
4 changes: 2 additions & 2 deletions src/types/core/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<'de> Deserialize<'de> for DecryptedData {
{
struct DecryptedDataVisitor;

impl<'de> Visitor<'de> for DecryptedDataVisitor {
impl Visitor<'_> for DecryptedDataVisitor {
type Value = DecryptedData;

fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down Expand Up @@ -98,7 +98,7 @@ impl<'de> Deserialize<'de> for EncryptedData {
{
struct EncryptedDataVisitor;

impl<'de> Visitor<'de> for EncryptedDataVisitor {
impl Visitor<'_> for EncryptedDataVisitor {
type Value = EncryptedData;

fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down
Loading
Loading