From 7637e5e0f345fa499bdd87af158cdc7e45c2d5ea Mon Sep 17 00:00:00 2001 From: colmazia Date: Wed, 4 Sep 2024 18:01:58 +0700 Subject: [PATCH] update okx to match new bothan --- Cargo.lock | 20 ++++++++ Cargo.toml | 1 + bothan-kraken/src/worker/asset_worker.rs | 6 ++- bothan-okx/examples/okx_basic.rs | 35 ++++++++++---- bothan-okx/src/worker.rs | 61 ++++++++++-------------- bothan-okx/src/worker/asset_worker.rs | 61 +++++++++++------------- bothan-okx/src/worker/builder.rs | 51 +++++++++----------- bothan-okx/src/worker/error.rs | 9 ++-- bothan-okx/src/worker/opts.rs | 4 +- 9 files changed, 138 insertions(+), 110 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1ae42634..4698866a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -477,6 +477,26 @@ dependencies = [ "ws-mock", ] +[[package]] +name = "bothan-okx" +version = "0.1.0" +dependencies = [ + "async-trait", + "bothan-core", + "chrono", + "futures-util", + "rand", + "rust_decimal", + "serde", + "serde_json", + "thiserror", + "tokio", + "tokio-tungstenite", + "tracing", + "tracing-subscriber", + "ws-mock", +] + [[package]] name = "bumpalo" version = "3.16.0" diff --git a/Cargo.toml b/Cargo.toml index 305b122e..633614db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "bothan-binance", "bothan-coingecko", "bothan-kraken", + "bothan-okx", "bothan-api/server", "bothan-api/server-cli", ] diff --git a/bothan-kraken/src/worker/asset_worker.rs b/bothan-kraken/src/worker/asset_worker.rs index e78b65f0..4d076e85 100644 --- a/bothan-kraken/src/worker/asset_worker.rs +++ b/bothan-kraken/src/worker/asset_worker.rs @@ -133,7 +133,11 @@ fn parse_ticker(ticker: TickerResponse) -> Result { let id = ticker.symbol.clone(); let price_value = Decimal::from_f64(ticker.last).ok_or(WorkerError::InvalidPrice(ticker.last))?; - Ok(AssetInfo::new(id, price_value, 0)) + Ok(AssetInfo::new( + id, + price_value, + chrono::Utc::now().timestamp(), + )) } async fn store_ticker(store: &WorkerStore, ticker: TickerResponse) -> Result<(), WorkerError> { diff --git a/bothan-okx/examples/okx_basic.rs b/bothan-okx/examples/okx_basic.rs index f4121bc9..e131c88f 100644 --- a/bothan-okx/examples/okx_basic.rs +++ b/bothan-okx/examples/okx_basic.rs @@ -1,20 +1,39 @@ +use std::time::Duration; + +use tokio::time::sleep; use tracing_subscriber::fmt::init; -use bothan_core::worker::AssetWorker; -use bothan_okx::OkxWorkerBuilder; +use bothan_core::registry::Registry; +use bothan_core::store::SharedStore; +use bothan_core::worker::{AssetWorker, AssetWorkerBuilder}; +use bothan_okx::{OkxWorkerBuilder, OkxWorkerBuilderOpts}; #[tokio::main] async fn main() { init(); - let worker = OkxWorkerBuilder::default().build().await.unwrap(); - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + let path = std::env::current_dir().unwrap(); + let registry = Registry::default().validate().unwrap(); + let store = SharedStore::new(registry, path.as_path()).await.unwrap(); + + let worker_store = store.create_worker_store(OkxWorkerBuilder::worker_name()); + let opts = OkxWorkerBuilderOpts::default(); + + let worker = OkxWorkerBuilder::new(worker_store, opts) + .build() + .await + .unwrap(); + worker - .add_query_ids(vec!["BTC-USDT".to_string(), "ETH-USDT".to_string()]) + .set_query_ids(vec!["BTC-USDT".to_string(), "ETH-USDT".to_string()]) .await .unwrap(); + + sleep(Duration::from_secs(2)).await; + loop { - let data = worker.get_assets(&["BTC-USDT", "ETH-USDT"]).await; - println!("{:?}", data); - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + let btc_data = worker.get_asset("BTC-USDT").await; + let eth_data = worker.get_asset("ETH-USDT").await; + println!("{:?}, {:?}", btc_data, eth_data); + sleep(Duration::from_secs(5)).await; } } diff --git a/bothan-okx/src/worker.rs b/bothan-okx/src/worker.rs index 40669881..43cce7b7 100644 --- a/bothan-okx/src/worker.rs +++ b/bothan-okx/src/worker.rs @@ -1,10 +1,8 @@ -use std::sync::Arc; - use tokio::sync::mpsc::Sender; -use tracing::error; -use bothan_core::store::Store; -use bothan_core::worker::{AssetStatus, AssetWorker, Error}; +use bothan_core::store::error::Error as StoreError; +use bothan_core::store::WorkerStore; +use bothan_core::worker::{AssetState, AssetWorker, SetQueryIDError}; use crate::api::websocket::OkxWebSocketConnector; @@ -17,7 +15,7 @@ mod types; /// A worker that fetches and stores the asset information from Okx's API. pub struct OkxWorker { connector: OkxWebSocketConnector, - store: Arc, + store: WorkerStore, subscribe_tx: Sender>, unsubscribe_tx: Sender>, } @@ -26,7 +24,7 @@ impl OkxWorker { /// Create a new worker with the specified connector, store and channels. pub fn new( connector: OkxWebSocketConnector, - store: Arc, + store: WorkerStore, subscribe_tx: Sender>, unsubscribe_tx: Sender>, ) -> Self { @@ -41,39 +39,30 @@ impl OkxWorker { #[async_trait::async_trait] impl AssetWorker for OkxWorker { - /// Fetches the AssetStatus for the given cryptocurrency ids. - async fn get_assets(&self, ids: &[&str]) -> Vec { - self.store.get_assets(ids).await + /// Fetches the AssetStatus for the given cryptocurrency id. + async fn get_asset(&self, id: &str) -> Result { + self.store.get_asset(&id).await } - /// Adds the specified cryptocurrency IDs to the query set. - async fn add_query_ids(&self, ids: Vec) -> Result<(), Error> { - let to_sub = self.store.add_query_ids(ids).await; - - if let Err(e) = self.subscribe_tx.send(to_sub.clone()).await { - error!("failed to add query ids: {}", e); - self.store.remove_query_ids(to_sub.as_slice()).await; - Err(Error::ModifyQueryIDsFailed(e.to_string())) - } else { - Ok(()) - } - } + /// Sets the specified cryptocurrency IDs to the query. If the ids are already in the query set, + /// it will not be resubscribed. + async fn set_query_ids(&self, ids: Vec) -> Result<(), SetQueryIDError> { + let (to_sub, to_unsub) = self + .store + .set_query_ids(ids) + .await + .map_err(|e| SetQueryIDError::new(e.to_string()))?; - /// Removes the specified cryptocurrency IDs from the query set. - async fn remove_query_ids(&self, ids: &[&str]) -> Result<(), Error> { - let to_unsub = self.store.remove_query_ids(ids).await; + self.subscribe_tx + .send(to_sub) + .await + .map_err(|e| SetQueryIDError::new(e.to_string()))?; - if let Err(e) = self.unsubscribe_tx.send(to_unsub.clone()).await { - error!("failed to remove query ids: {}", e); - self.store.add_query_ids(to_unsub).await; - Err(Error::ModifyQueryIDsFailed(e.to_string())) - } else { - Ok(()) - } - } + self.unsubscribe_tx + .send(to_unsub) + .await + .map_err(|e| SetQueryIDError::new(e.to_string()))?; - /// Retrieves the current set of queried cryptocurrency IDs. - async fn get_query_ids(&self) -> Vec { - self.store.get_query_ids().await + Ok(()) } } diff --git a/bothan-okx/src/worker/asset_worker.rs b/bothan-okx/src/worker/asset_worker.rs index 9a8fc30b..b29d219a 100644 --- a/bothan-okx/src/worker/asset_worker.rs +++ b/bothan-okx/src/worker/asset_worker.rs @@ -6,13 +6,13 @@ use tokio::sync::mpsc::Receiver; use tokio::time::{sleep, timeout}; use tracing::{debug, error, info, warn}; -use bothan_core::store::Store; +use bothan_core::store::WorkerStore; use bothan_core::types::AssetInfo; use crate::api::error::{MessageError, SendError}; use crate::api::types::{ChannelResponse, OkxResponse, TickerData}; use crate::api::{OkxWebSocketConnection, OkxWebSocketConnector}; -use crate::worker::error::ParseError; +use crate::worker::error::WorkerError; use crate::worker::types::{DEFAULT_TIMEOUT, RECONNECT_BUFFER}; use crate::worker::OkxWorker; @@ -54,9 +54,8 @@ async fn subscribe( connection: &mut OkxWebSocketConnection, ) -> Result<(), SendError> { if !ids.is_empty() { - connection - .subscribe_ticker(&ids.iter().map(|s| s.as_str()).collect::>()) - .await? + let ids_vec = ids.iter().map(|s| s.as_str()).collect::>(); + connection.subscribe_ticker(&ids_vec).await? } Ok(()) @@ -87,14 +86,14 @@ async fn handle_unsubscribe_recv(ids: Vec, connection: &mut OkxWebSocket if let Err(e) = unsubscribe(&ids, connection).await { error!("failed to unsubscribe to ids {:?}: {}", ids, e); } else { - info!("subscribed to ids {:?}", ids); + info!("unsubscribed to ids {:?}", ids); } } async fn handle_reconnect( connector: &OkxWebSocketConnector, connection: &mut OkxWebSocketConnection, - query_ids: &Store, + query_ids: &WorkerStore, ) { let mut retry_count: usize = 1; loop { @@ -104,8 +103,14 @@ async fn handle_reconnect( *connection = new_connection; // Resubscribe to all ids - let ids = query_ids.get_query_ids().await; - match subscribe(&ids, connection).await { + let Ok(ids) = query_ids.get_query_ids().await else { + error!("failed to get query ids from store"); + return; + }; + + let ids_vec = ids.into_iter().collect::>(); + + match subscribe(&ids_vec, connection).await { Ok(_) => { info!("resubscribed to all ids"); return; @@ -123,42 +128,34 @@ async fn handle_reconnect( } } -fn parse_ticker(ticker: TickerData) -> Result { +fn parse_ticker(ticker: TickerData) -> Result { + let id = ticker.inst_id.clone(); let price_value = Decimal::from_str_exact(&ticker.last)?; Ok(AssetInfo::new( - ticker.inst_id, + id, price_value, chrono::Utc::now().timestamp(), )) } -async fn store_tickers(tickers: Vec, store: &Store) -> Result<(), ParseError> { - let to_set = tickers - .into_iter() - .filter_map(|ticker| { - let id = ticker.inst_id.clone(); - match parse_ticker(ticker) { - Ok(asset_info) => Some((id, asset_info)), - Err(e) => { - warn!("failed to parse ticker data for {} with error {}", id, e); - None - } - } - }) - .collect::>(); - - store.set_assets(to_set).await; +async fn store_ticker(store: &WorkerStore, ticker: TickerData) -> Result<(), WorkerError> { + store + .set_asset(ticker.inst_id.clone(), parse_ticker(ticker)?) + .await?; Ok(()) } /// Processes the response from the Okx API. -async fn process_response(resp: OkxResponse, store: &Store) { +async fn process_response(resp: OkxResponse, store: &WorkerStore) { match resp { OkxResponse::ChannelResponse(resp) => match resp { ChannelResponse::Ticker(push_data) => { - match store_tickers(push_data.data, store).await { - Ok(_) => info!("saved data"), - Err(e) => error!("failed to save data: {}", e), + let tickers = push_data.data; + for ticker in tickers { + match store_ticker(store, ticker).await { + Ok(_) => info!("saved data"), + Err(e) => error!("failed to save data: {}", e), + } } } }, @@ -172,7 +169,7 @@ async fn handle_connection_recv( recv_result: Result, connector: &OkxWebSocketConnector, connection: &mut OkxWebSocketConnection, - store: &Store, + store: &WorkerStore, ) { match recv_result { Ok(resp) => { diff --git a/bothan-okx/src/worker/builder.rs b/bothan-okx/src/worker/builder.rs index 45fe3f1d..1076637f 100644 --- a/bothan-okx/src/worker/builder.rs +++ b/bothan-okx/src/worker/builder.rs @@ -2,40 +2,25 @@ use std::sync::Arc; use tokio::sync::mpsc::channel; -use bothan_core::store::Store; - use crate::api::OkxWebSocketConnector; use crate::worker::asset_worker::start_asset_worker; use crate::worker::error::BuildError; use crate::worker::opts::OkxWorkerBuilderOpts; use crate::worker::OkxWorker; +use bothan_core::store::WorkerStore; +use bothan_core::worker::AssetWorkerBuilder; /// Builds a `OkxWorker` with custom options. /// Methods can be chained to set the configuration values and the /// service is constructed by calling the [`build`](OkxWorkerBuilder::build) method. -/// # Example -/// ```no_run -/// use bothan_okx::OkxWorkerBuilder; -/// -/// -/// #[tokio::main] -/// async fn main() { -/// let worker = OkxWorkerBuilder::default() -/// .build() -/// .await -/// .unwrap(); -/// -/// // use worker ... -/// } -/// ``` pub struct OkxWorkerBuilder { - store: Arc, + store: WorkerStore, opts: OkxWorkerBuilderOpts, } impl OkxWorkerBuilder { /// Returns a new `OkxWorkerBuilder` with the given options. - pub fn new(store: Arc, opts: OkxWorkerBuilderOpts) -> Self { + pub fn new(store: WorkerStore, opts: OkxWorkerBuilderOpts) -> Self { Self { store, opts } } @@ -55,13 +40,30 @@ impl OkxWorkerBuilder { /// Sets the store for the `OkxWorker`. /// If not set, the store is created and owned by the worker. - pub fn with_store(mut self, store: Arc) -> Self { + pub fn with_store(mut self, store: WorkerStore) -> Self { self.store = store; self } +} + +#[async_trait::async_trait] +impl<'a> AssetWorkerBuilder<'a> for OkxWorkerBuilder { + type Opts = OkxWorkerBuilderOpts; + type Worker = OkxWorker; + type Error = BuildError; + + /// Returns a new `OkxWorkerBuilder` with the given options. + fn new(store: WorkerStore, opts: Self::Opts) -> Self { + Self { store, opts } + } + + /// Returns the name of the worker. + fn worker_name() -> &'static str { + "okx" + } /// Creates the configured `OkxWorker`. - pub async fn build(self) -> Result, BuildError> { + async fn build(self) -> Result, BuildError> { let url = self.opts.url; let ch_size = self.opts.internal_ch_size; @@ -83,10 +85,3 @@ impl OkxWorkerBuilder { Ok(worker) } } - -impl Default for OkxWorkerBuilder { - /// Create a new `OkxWorkerBuilder` with its default values. - fn default() -> Self { - Self::new(Arc::new(Store::default()), OkxWorkerBuilderOpts::default()) - } -} diff --git a/bothan-okx/src/worker/error.rs b/bothan-okx/src/worker/error.rs index ca927073..86a9e060 100644 --- a/bothan-okx/src/worker/error.rs +++ b/bothan-okx/src/worker/error.rs @@ -1,11 +1,14 @@ -use thiserror::Error; - use crate::api; +use bothan_core::store; +use thiserror::Error; #[derive(Error, Debug)] -pub(crate) enum ParseError { +pub(crate) enum WorkerError { #[error("value is not a valid decimal: {0}")] Underflow(#[from] rust_decimal::Error), + + #[error("failed to set data to the store: {0}")] + SetFailed(#[from] store::error::Error), } #[derive(Error, Debug)] diff --git a/bothan-okx/src/worker/opts.rs b/bothan-okx/src/worker/opts.rs index c78c7777..0142a763 100644 --- a/bothan-okx/src/worker/opts.rs +++ b/bothan-okx/src/worker/opts.rs @@ -1,4 +1,4 @@ -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use crate::api::types::DEFAULT_URL; use crate::worker::types::DEFAULT_CHANNEL_SIZE; @@ -8,7 +8,7 @@ use crate::worker::types::DEFAULT_CHANNEL_SIZE; /// `OkxWorkerBuilderOpts` provides a way to specify custom settings for creating a `OkxWorker`. /// This struct allows users to set optional parameters such as the WebSocket URL and the internal channel size, /// which will be used during the construction of the `OkxWorker`. -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct OkxWorkerBuilderOpts { #[serde(default = "default_url")] pub url: String,