diff --git a/Cargo.lock b/Cargo.lock index 1ae42634..7e48673c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -244,13 +244,23 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "bincode" -version = "1.3.3" +version = "2.0.0-rc.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +checksum = "f11ea1a0346b94ef188834a65c068a03aec181c94896d481d7a0a40d85b0ce95" dependencies = [ + "bincode_derive", "serde", ] +[[package]] +name = "bincode_derive" +version = "2.0.0-rc.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e30759b3b99a1b802a7a3aa21c85c3ded5c28e1c83170d82d70f08bbf7f3e4c" +dependencies = [ + "virtue", +] + [[package]] name = "bindgen" version = "0.69.4" @@ -3259,6 +3269,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "virtue" +version = "0.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dcc60c0624df774c82a0ef104151231d37da4962957d691c011c852b2473314" + [[package]] name = "want" version = "0.3.1" diff --git a/bothan-api/server-cli/src/commands.rs b/bothan-api/server-cli/src/commands.rs index 01db14eb..9ab6265c 100644 --- a/bothan-api/server-cli/src/commands.rs +++ b/bothan-api/server-cli/src/commands.rs @@ -1,3 +1,2 @@ pub mod config; pub mod start; -mod utils; diff --git a/bothan-api/server-cli/src/commands/config.rs b/bothan-api/server-cli/src/commands/config.rs index ac031c2d..01819c7d 100644 --- a/bothan-api/server-cli/src/commands/config.rs +++ b/bothan-api/server-cli/src/commands/config.rs @@ -1,13 +1,12 @@ use std::fs; use std::path::PathBuf; -use anyhow::Context; -use clap::{Parser, Subcommand}; - +use crate::bothan_home_dir; +use anyhow::{anyhow, Context}; use bothan_api::config::manager::crypto_info::sources::CryptoSourceConfigs; use bothan_api::config::AppConfig; - -use crate::commands::utils::bothan_home_dir; +use clap::{Parser, Subcommand}; +use tracing::info; #[derive(Parser)] pub struct ConfigCli { @@ -22,18 +21,27 @@ enum ConfigSubCommand { /// The path to where to initialize the configuration file (defaults to ./config.toml). #[arg(short, long)] path: Option, + + /// Whether to override the existing configuration file. + #[arg(short, long = "override")] + override_: bool, }, } impl ConfigCli { pub async fn run(&self) -> anyhow::Result<()> { match &self.subcommand { - ConfigSubCommand::Init { path } => { + ConfigSubCommand::Init { path, override_ } => { let config_path = match path { Some(p) => p.clone(), None => bothan_home_dir().join("config.toml"), }; + //check if the file already exists + if config_path.exists() && !override_ { + return Err(anyhow!("Config file already exists at: {:?}", config_path)); + } + if let Some(parent) = config_path.parent() { fs::create_dir_all(parent).with_context(|| { format!("Failed to create parent directories for {:?}", path) @@ -45,8 +53,8 @@ impl ConfigCli { let config_string = toml::to_string(&app_config).with_context(|| "Failed to serialize config")?; - fs::write(config_path, config_string).with_context(|| "Failed to write config")?; - println!("Initialized default config at: {:?}", path); + fs::write(&config_path, config_string).with_context(|| "Failed to write config")?; + info!("initialized default config at: {:?}", config_path); Ok(()) } } diff --git a/bothan-api/server-cli/src/commands/start.rs b/bothan-api/server-cli/src/commands/start.rs index ee9352a5..656f461d 100644 --- a/bothan-api/server-cli/src/commands/start.rs +++ b/bothan-api/server-cli/src/commands/start.rs @@ -5,11 +5,6 @@ use std::str::FromStr; use std::sync::Arc; use anyhow::{anyhow, Context}; -use clap::Parser; -use reqwest::header::{HeaderName, HeaderValue}; -use semver::VersionReq; -use tonic::transport::Server; - use bothan_api::api::CryptoQueryServer; use bothan_api::config::ipfs::IpfsAuthentication; use bothan_api::config::manager::crypto_info::sources::CryptoSourceConfigs; @@ -25,8 +20,11 @@ use bothan_core::registry::{Registry, Valid}; use bothan_core::store::SharedStore; use bothan_core::worker::AssetWorkerBuilder; use bothan_kraken::KrakenWorkerBuilder; - -use crate::commands::utils::bothan_home_dir; +use clap::Parser; +use reqwest::header::{HeaderName, HeaderValue}; +use semver::VersionReq; +use tonic::transport::Server; +use tracing::info; #[derive(Parser)] pub struct StartCli { @@ -48,14 +46,7 @@ pub struct StartCli { } impl StartCli { - pub async fn run(&self) -> anyhow::Result<()> { - let config_path = self - .config - .clone() - .unwrap_or(bothan_home_dir().join("config.toml")); - - let app_config = AppConfig::from(config_path)?; - + pub async fn run(&self, app_config: AppConfig) -> anyhow::Result<()> { let registry = match &self.registry { Some(p) => { let file = @@ -84,15 +75,11 @@ async fn start_server( registry: Registry, reset: bool, ) -> anyhow::Result<()> { - let log_level = &app_config.log.level; - tracing_subscriber::fmt() - .with_env_filter(format!("bothan_core={log_level},bothan_api={log_level}")) - .init(); - let store = init_store(&app_config, registry, reset).await?; let ipfs_client = init_ipfs_client(&app_config).await?; let crypto_server = Arc::new(init_crypto_server(&app_config, store, ipfs_client).await?); + info!("server started"); Server::builder() .add_service(PriceServiceServer::from_arc(crypto_server.clone())) .add_service(SignalServiceServer::from_arc(crypto_server.clone())) @@ -120,7 +107,7 @@ async fn init_store( let mut store = SharedStore::new(registry, &config.store.path) .await .with_context(|| "Failed to create store")?; - println!("Store created successfully at \"{:?}\"", &config.store.path); + info!("store created successfully at \"{:?}\"", &config.store.path); if !reset { store @@ -208,6 +195,6 @@ where .with_context(|| format!("Failed to build worker {worker_name}"))?; manager.add_worker(worker_name.to_string(), worker).await; - println!("Loaded {} worker", worker_name); + info!("loaded {} worker", worker_name); Ok(()) } diff --git a/bothan-api/server-cli/src/commands/utils.rs b/bothan-api/server-cli/src/commands/utils.rs deleted file mode 100644 index 6dc0a76b..00000000 --- a/bothan-api/server-cli/src/commands/utils.rs +++ /dev/null @@ -1,6 +0,0 @@ -use std::path::PathBuf; - -pub fn bothan_home_dir() -> PathBuf { - let home = dirs::home_dir().expect("Failed to get home directory"); - home.join(".bothan") -} diff --git a/bothan-api/server-cli/src/main.rs b/bothan-api/server-cli/src/main.rs index 1cb018fc..f7c1339c 100644 --- a/bothan-api/server-cli/src/main.rs +++ b/bothan-api/server-cli/src/main.rs @@ -1,5 +1,9 @@ +use std::path::PathBuf; + use clap::{Parser, Subcommand}; +use bothan_api::config::AppConfig; + use crate::commands::config::ConfigCli; use crate::commands::start::StartCli; @@ -10,6 +14,10 @@ mod commands; struct Cli { #[command(subcommand)] command: Option, + + // global args + #[arg(long, global = true)] + config: Option, } #[derive(Subcommand)] @@ -30,13 +38,30 @@ async fn main() { } }; + let config_path = cli + .config + .clone() + .unwrap_or(bothan_home_dir().join("config.toml")); + + let app_config = AppConfig::from(config_path).expect("Failed to load config"); + + let log_level = &app_config.log.level; + tracing_subscriber::fmt() + .with_env_filter(format!( + "bothan_core={log_level},bothan_api={log_level},bothan={log_level}" + )) + .init(); + if let Some(command) = &cli.command { - if let Err(e) = match command { + match command { Command::Config(config_cli) => config_cli.run().await, - Command::Start(start_cli) => start_cli.run().await, - } { - eprintln!("{}", e); - std::process::exit(1); + Command::Start(start_cli) => start_cli.run(app_config).await, } + .expect("Failed to run command"); } } + +pub fn bothan_home_dir() -> PathBuf { + let home = dirs::home_dir().expect("Failed to get home directory"); + home.join(".bothan") +} diff --git a/bothan-api/server/src/api/crypto.rs b/bothan-api/server/src/api/crypto.rs index ab97a0a9..f4275399 100644 --- a/bothan-api/server/src/api/crypto.rs +++ b/bothan-api/server/src/api/crypto.rs @@ -53,27 +53,27 @@ impl SignalService for CryptoQueryServer { Ok(Response::new(())) } Err(SetRegistryError::FailedToRetrieve(e)) => { - error!("Failed to retrieve registry: {}", e); + error!("failed to retrieve registry: {}", e); Err(Status::not_found("Failed to retrieve registry")) } Err(SetRegistryError::InvalidRegistry(e)) => { - error!("Invalid registry: {}", e); + error!("invalid registry: {}", e); Err(Status::invalid_argument("Registry is invalid")) } Err(SetRegistryError::UnsupportedVersion) => { - error!("Invalid registry"); + error!("invalid registry"); Err(Status::invalid_argument("Registry is invalid")) } Err(SetRegistryError::FailedToParse) => { - error!("Failed to parse registry"); + error!("failed to parse registry"); Err(Status::invalid_argument("Registry is invalid")) } Err(SetRegistryError::InvalidHash) => { - error!("Invalid IPFS hash"); + error!("invalid IPFS hash"); Err(Status::invalid_argument("Invalid IPFS hash")) } - Err(SetRegistryError::FailedToSetRegistry(_)) => { - error!("Failed to set registry"); + Err(SetRegistryError::FailedToSetRegistry(e)) => { + error!("failed to set registry: {e}"); Err(Status::internal("Failed to set registry")) } } diff --git a/bothan-api/server/src/api/utils.rs b/bothan-api/server/src/api/utils.rs index 01c95b63..cc600724 100644 --- a/bothan-api/server/src/api/utils.rs +++ b/bothan-api/server/src/api/utils.rs @@ -15,7 +15,7 @@ pub fn parse_price_state(id: String, price_state: PriceState) -> Price { match i64::try_from(mantissa_price) { Ok(p) => Price::new(id, p, Status::Available), Err(_) => { - warn!("Failed to convert {mantissa_price} to i64 for id {id}"); + warn!("failed to convert {mantissa_price} to i64 for id {id}"); Price::new(id, 0, Status::Unavailable) } } diff --git a/bothan-coinbase/src/api/websocket.rs b/bothan-coinbase/src/api/websocket.rs index fbde7f64..2941a378 100644 --- a/bothan-coinbase/src/api/websocket.rs +++ b/bothan-coinbase/src/api/websocket.rs @@ -29,7 +29,7 @@ impl CoinbaseWebSocketConnector { let status = resp.status(); if StatusCode::is_server_error(&status) || StatusCode::is_client_error(&status) { - warn!("Failed to connect with response code {}", resp.status()); + warn!("failed to connect with response code {}", resp.status()); return Err(Error::ConnectionFailure(resp.status())); } diff --git a/bothan-coinbase/src/service.rs b/bothan-coinbase/src/service.rs index 2db12fed..82bca4e1 100644 --- a/bothan-coinbase/src/service.rs +++ b/bothan-coinbase/src/service.rs @@ -78,7 +78,7 @@ impl Service for CoinbaseService { .collect(); if !sub_ids.is_empty() && self.cmd_tx.send(Command::Subscribe(sub_ids)).await.is_err() { - warn!("Failed to send subscribe command"); + warn!("failed to send subscribe command"); } result @@ -143,7 +143,7 @@ async fn process_command( .await .is_err() { - warn!("Failed to subscribe to ids: {:?}", ids); + warn!("failed to subscribe to ids: {:?}", ids); } } } @@ -177,7 +177,7 @@ async fn handle_reconnect( }; if command_tx.send(cmd).await.is_err() { - error!("Failed to send subscribe command"); + error!("failed to send subscribe command"); }; } diff --git a/bothan-core/Cargo.toml b/bothan-core/Cargo.toml index 0ab8b843..fe8e2b2e 100644 --- a/bothan-core/Cargo.toml +++ b/bothan-core/Cargo.toml @@ -20,7 +20,7 @@ reqwest = { workspace = true } rust_decimal = { workspace = true, features = ["maths", "serde-str"] } anyhow = "1.0.86" -bincode = "1.3.3" +bincode = "2.0.0-rc.3" chrono = "0.4.38" clap = { version = "4.5.16", features = ["derive"] } log = "0.4.22" diff --git a/bothan-core/src/registry.rs b/bothan-core/src/registry.rs index 3db68186..27c21fe6 100644 --- a/bothan-core/src/registry.rs +++ b/bothan-core/src/registry.rs @@ -1,6 +1,10 @@ use std::collections::HashMap; use std::marker::PhantomData; +use bincode::de::Decoder; +use bincode::enc::Encoder; +use bincode::error::{DecodeError, EncodeError}; +use bincode::{Decode, Encode}; use serde::{Deserialize, Serialize}; use crate::registry::signal::Signal; @@ -22,7 +26,7 @@ pub struct Valid; pub struct Registry { #[serde(flatten)] inner: HashMap, - #[serde(skip_serializing, default)] + #[serde(skip)] _state: PhantomData, } @@ -40,6 +44,15 @@ impl Registry { } } +impl Registry { + pub fn get(&self, signal_id: &str) -> Option<&Signal> { + self.inner.get(signal_id) + } + pub fn contains(&self, signal_id: &str) -> bool { + self.inner.contains_key(signal_id) + } +} + impl Default for Registry { fn default() -> Self { Registry { @@ -49,12 +62,19 @@ impl Default for Registry { } } -impl Registry { - pub fn get(&self, signal_id: &str) -> Option<&Signal> { - self.inner.get(signal_id) +impl Encode for Registry { + fn encode(&self, encoder: &mut E) -> Result<(), EncodeError> { + self.inner.encode(encoder) } - pub fn contains(&self, signal_id: &str) -> bool { - self.inner.contains_key(signal_id) +} + +impl Decode for Registry { + fn decode(decoder: &mut D) -> Result { + let inner = HashMap::decode(decoder)?; + Ok(Registry { + inner, + _state: PhantomData, + }) } } diff --git a/bothan-core/src/registry/post_processor.rs b/bothan-core/src/registry/post_processor.rs index dd936bcc..78991bd1 100644 --- a/bothan-core/src/registry/post_processor.rs +++ b/bothan-core/src/registry/post_processor.rs @@ -1,3 +1,4 @@ +use bincode::{Decode, Encode}; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -22,7 +23,7 @@ pub trait PostProcess { } /// The PostProcess enum represents the different types of post-processors that can be used. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)] #[serde(rename_all = "snake_case", tag = "function", content = "params")] pub enum PostProcessor { TickConvertor(tick::TickPostProcessor), diff --git a/bothan-core/src/registry/post_processor/tick.rs b/bothan-core/src/registry/post_processor/tick.rs index 072485f9..fbbe0797 100644 --- a/bothan-core/src/registry/post_processor/tick.rs +++ b/bothan-core/src/registry/post_processor/tick.rs @@ -1,3 +1,4 @@ +use bincode::{Decode, Encode}; use num_traits::FromPrimitive; use rust_decimal::{Decimal, MathematicalOps}; use serde::{Deserialize, Serialize}; @@ -10,7 +11,7 @@ const MAX_TICK: f64 = 524287.0; const MIN_TICK: f64 = 1.0; /// `TickPostProcessor` processes the given data into its tick value. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)] pub struct TickPostProcessor {} impl PostProcess for TickPostProcessor { diff --git a/bothan-core/src/registry/processor.rs b/bothan-core/src/registry/processor.rs index 0c573451..89777634 100644 --- a/bothan-core/src/registry/processor.rs +++ b/bothan-core/src/registry/processor.rs @@ -1,3 +1,4 @@ +use bincode::{Decode, Encode}; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -22,7 +23,7 @@ pub trait Process { } /// The Process enum represents the different types of processors that can be used. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)] #[serde(rename_all = "snake_case", tag = "function", content = "params")] pub enum Processor { Median(median::MedianProcessor), diff --git a/bothan-core/src/registry/processor/median.rs b/bothan-core/src/registry/processor/median.rs index af5e4609..85be151b 100644 --- a/bothan-core/src/registry/processor/median.rs +++ b/bothan-core/src/registry/processor/median.rs @@ -1,6 +1,7 @@ use std::cmp::max; use std::ops::{Add, Div, Sub}; +use bincode::{Decode, Encode}; use num_traits::FromPrimitive; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; @@ -10,7 +11,7 @@ use crate::registry::processor::{Process, ProcessError}; /// The `MedianProcessor` finds the median of a given data set. It also has a `min_source_count` which /// is the minimum number of sources required to calculate the median. If the given data set has less /// than `min_source_count` sources, it returns an error. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)] pub struct MedianProcessor { pub min_source_count: usize, } diff --git a/bothan-core/src/registry/signal.rs b/bothan-core/src/registry/signal.rs index 4b247f8c..9c7d4e29 100644 --- a/bothan-core/src/registry/signal.rs +++ b/bothan-core/src/registry/signal.rs @@ -1,3 +1,4 @@ +use bincode::{Decode, Encode}; use serde::{Deserialize, Serialize}; use crate::registry::post_processor::PostProcessor; @@ -5,7 +6,7 @@ use crate::registry::processor::Processor; use crate::registry::source::SourceQuery; /// `Signal` contains the sources, processor, and post-processors for a signal. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)] pub struct Signal { #[serde(rename = "sources")] pub source_queries: Vec, diff --git a/bothan-core/src/registry/source.rs b/bothan-core/src/registry/source.rs index e9702d14..72b12021 100644 --- a/bothan-core/src/registry/source.rs +++ b/bothan-core/src/registry/source.rs @@ -1,8 +1,9 @@ +use bincode::{Decode, Encode}; use num_traits::{CheckedAdd, CheckedDiv, CheckedMul, CheckedSub}; use serde::{Deserialize, Serialize}; /// Enum representing the possible operations that can be performed. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)] pub enum Operation { #[serde(rename = "+")] Add, @@ -32,7 +33,7 @@ impl Operation { /// Route is value in a sequence of operations of which the operation is performed on. /// For example, if the sequence is [a, b, c] and the operations are [+, *, -], the result /// would be (((input + a) * b) - c). -#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)] pub struct OperationRoute { /// The signal id of the value to be used in the operation. pub signal_id: String, @@ -51,7 +52,7 @@ impl OperationRoute { } /// Struct representing a source. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)] pub struct SourceQuery { /// The source id. pub source_id: String, diff --git a/bothan-core/src/store.rs b/bothan-core/src/store.rs index b6e5fe14..6dd7a57b 100644 --- a/bothan-core/src/store.rs +++ b/bothan-core/src/store.rs @@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::Arc; +use bincode::{config, decode_from_slice, encode_to_vec}; use rust_rocksdb::{Options, DB}; use tokio::sync::RwLock; use tracing::debug; @@ -55,15 +56,15 @@ impl SharedStore { pub async fn restore(&mut self) -> Result<(), Error> { let mut inner = self.inner.write().await; - let registry = inner + if let Some(unvalidated_registry) = inner .db .get(Key::Registry.to_prefixed_bytes())? - .map(|b| bincode::deserialize(b.as_slice())) - .transpose()?; - - if let Some(registry) = registry { + .map(|b| decode_from_slice::(b.as_slice(), config::standard())) + .transpose()? + .map(|(r, _)| r) + { + inner.registry = unvalidated_registry.validate()?; debug!("loaded registry"); - inner.registry = registry; } Ok(()) @@ -78,25 +79,26 @@ impl SharedStore { } async fn get_active_signal_ids(&self) -> Result, Error> { - let serialized = self + let encoded = self .inner .read() .await .db .get(Key::ActiveSignalIDs.to_prefixed_bytes())?; - let active_signal_ids = serialized - .map(|b| bincode::deserialize(b.as_slice())) - .transpose()?; + let active_signal_ids = encoded + .map(|b| decode_from_slice(b.as_slice(), config::standard())) + .transpose()? + .map(|(ids, _)| ids); Ok(active_signal_ids) } async fn set_active_signal_ids(&self, signal_ids: HashSet) -> Result<(), Error> { - let serialized = bincode::serialize(&signal_ids)?; + let encoded = encode_to_vec(&signal_ids, config::standard())?; self.inner .write() .await .db - .put(Key::ActiveSignalIDs.to_prefixed_bytes(), serialized)?; + .put(Key::ActiveSignalIDs.to_prefixed_bytes(), encoded)?; Ok(()) } @@ -106,10 +108,8 @@ impl SharedStore { async fn set_registry(&self, registry: Registry) -> Result<(), Error> { let mut inner = self.inner.write().await; - let serialized = bincode::serialize(®istry)?; - inner - .db - .put(Key::Registry.to_prefixed_bytes(), serialized)?; + let encoded = encode_to_vec(®istry, config::standard())?; + inner.db.put(Key::Registry.to_prefixed_bytes(), encoded)?; inner.registry = registry; Ok(()) } @@ -119,10 +119,11 @@ impl SharedStore { source_id: source_id.as_ref(), }; - let serialized = self.inner.read().await.db.get(key.to_prefixed_bytes())?; - let query_ids = serialized - .map(|b| bincode::deserialize(b.as_slice())) - .transpose()?; + let encoded = self.inner.read().await.db.get(key.to_prefixed_bytes())?; + let query_ids = encoded + .map(|b| decode_from_slice(b.as_slice(), config::standard())) + .transpose()? + .map(|(ids, _)| ids); Ok(query_ids) } @@ -147,12 +148,12 @@ impl SharedStore { source_id: source_id.as_ref(), }; - let serialized = bincode::serialize(&query_ids)?; + let encoded = encode_to_vec(&query_ids, config::standard())?; self.inner .write() .await .db - .put(key.to_prefixed_bytes(), serialized)?; + .put(key.to_prefixed_bytes(), encoded)?; Ok(()) } @@ -196,10 +197,11 @@ impl SharedStore { id: id.as_ref(), }; - let serialized = self.inner.read().await.db.get(key.to_prefixed_bytes())?; - let asset_info = serialized - .map(|b| bincode::deserialize(b.as_slice())) - .transpose()?; + let encoded = self.inner.read().await.db.get(key.to_prefixed_bytes())?; + let asset_info = encoded + .map(|b| decode_from_slice(b.as_slice(), config::standard())) + .transpose()? + .map(|(info, _)| info); Ok(asset_info) } @@ -218,12 +220,12 @@ impl SharedStore { id: id.as_ref(), }; - let serialized = bincode::serialize(&asset_info)?; + let encoded = encode_to_vec(&asset_info, config::standard())?; self.inner .write() .await .db - .put(key.to_prefixed_bytes(), serialized)?; + .put(key.to_prefixed_bytes(), encoded)?; Ok(()) } @@ -242,8 +244,8 @@ impl SharedStore { source_id: source_id.as_ref(), id: id.as_ref(), }; - let serialized = bincode::serialize(&asset_info)?; - inner.db.put(key.to_prefixed_bytes(), serialized)?; + let encoded = encode_to_vec(&asset_info, config::standard())?; + inner.db.put(key.to_prefixed_bytes(), encoded)?; } Ok(()) } diff --git a/bothan-core/src/store/error.rs b/bothan-core/src/store/error.rs index ca6bcef4..3bad9193 100644 --- a/bothan-core/src/store/error.rs +++ b/bothan-core/src/store/error.rs @@ -1,4 +1,4 @@ -use bincode::ErrorKind; +use crate::registry::validate::ValidationError; #[derive(Clone, Debug, thiserror::Error, PartialEq)] #[error("An error occurred while storing the data: {message}")] @@ -6,6 +6,14 @@ pub struct Error { message: String, } +impl From for Error { + fn from(error: ValidationError) -> Self { + Self { + message: error.to_string(), + } + } +} + impl From for Error { fn from(error: rust_rocksdb::Error) -> Self { Self { @@ -14,8 +22,16 @@ impl From for Error { } } -impl From> for Error { - fn from(error: Box) -> Self { +impl From for Error { + fn from(error: bincode::error::EncodeError) -> Self { + Self { + message: error.to_string(), + } + } +} + +impl From for Error { + fn from(error: bincode::error::DecodeError) -> Self { Self { message: error.to_string(), } diff --git a/bothan-core/src/types.rs b/bothan-core/src/types.rs index 8193e373..a6b939ac 100644 --- a/bothan-core/src/types.rs +++ b/bothan-core/src/types.rs @@ -1,3 +1,7 @@ +use bincode::de::Decoder; +use bincode::enc::Encoder; +use bincode::error::{DecodeError, EncodeError}; +use bincode::{Decode, Encode}; use derive_more::Display; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; @@ -19,3 +23,25 @@ impl AssetInfo { } } } + +impl Encode for AssetInfo { + fn encode(&self, encoder: &mut E) -> Result<(), EncodeError> { + Encode::encode(&self.id, encoder)?; + Encode::encode(&self.price.serialize(), encoder)?; + Encode::encode(&self.timestamp, encoder) + } +} + +impl Decode for AssetInfo { + fn decode(decoder: &mut D) -> Result { + let id: String = Decode::decode(decoder)?; + let price_serialized: [u8; 16] = Decode::decode(decoder)?; + let timestamp: i64 = Decode::decode(decoder)?; + + Ok(AssetInfo { + id, + price: Decimal::deserialize(price_serialized), + timestamp, + }) + } +} diff --git a/bothan-kraken/src/api/websocket.rs b/bothan-kraken/src/api/websocket.rs index 2173f915..8844a062 100644 --- a/bothan-kraken/src/api/websocket.rs +++ b/bothan-kraken/src/api/websocket.rs @@ -30,7 +30,7 @@ impl KrakenWebSocketConnector { let status = resp.status(); if StatusCode::is_server_error(&status) || StatusCode::is_client_error(&status) { - warn!("Failed to connect with response code {}", resp.status()); + warn!("failed to connect with response code {}", resp.status()); return Err(ConnectionError::UnsuccessfulHttpResponse(resp.status())); } diff --git a/bothan-okx/src/api/websocket.rs b/bothan-okx/src/api/websocket.rs index f6b80a12..a2d385ff 100644 --- a/bothan-okx/src/api/websocket.rs +++ b/bothan-okx/src/api/websocket.rs @@ -28,7 +28,7 @@ impl OkxWebSocketConnector { let status = resp.status(); if StatusCode::is_server_error(&status) || StatusCode::is_client_error(&status) { - warn!("Failed to connect with response code {}", resp.status()); + warn!("failed to connect with response code {}", resp.status()); return Err(Error::ConnectionFailure(resp.status())); } diff --git a/bothan-okx/src/service.rs b/bothan-okx/src/service.rs index 836b6467..da29082e 100644 --- a/bothan-okx/src/service.rs +++ b/bothan-okx/src/service.rs @@ -76,7 +76,7 @@ impl Service for OkxService { .collect(); if !sub_ids.is_empty() && self.cmd_tx.send(Command::Subscribe(sub_ids)).await.is_err() { - warn!("Failed to send subscribe command"); + warn!("failed to send subscribe command"); } result @@ -138,7 +138,7 @@ async fn process_command( let vec_ids = ids.iter().map(|x| x.as_str()).collect::>(); let mut locked = ws.lock().await; if locked.subscribe_ticker(vec_ids.as_slice()).await.is_err() { - warn!("Failed to subscribe to ids: {:?}", ids); + warn!("failed to subscribe to ids: {:?}", ids); } } }