Skip to content

Commit

Permalink
update okx to match new bothan
Browse files Browse the repository at this point in the history
  • Loading branch information
colmazia committed Sep 4, 2024
1 parent 291ca2d commit 7637e5e
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 110 deletions.
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"bothan-binance",
"bothan-coingecko",
"bothan-kraken",
"bothan-okx",
"bothan-api/server",
"bothan-api/server-cli",
]
Expand Down
6 changes: 5 additions & 1 deletion bothan-kraken/src/worker/asset_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ fn parse_ticker(ticker: TickerResponse) -> Result<AssetInfo, WorkerError> {
let id = ticker.symbol.clone();
let price_value =
Decimal::from_f64(ticker.last).ok_or(WorkerError::InvalidPrice(ticker.last))?;
Ok(AssetInfo::new(id, price_value, 0))
Ok(AssetInfo::new(
id,
price_value,
chrono::Utc::now().timestamp(),
))
}

async fn store_ticker(store: &WorkerStore, ticker: TickerResponse) -> Result<(), WorkerError> {
Expand Down
35 changes: 27 additions & 8 deletions bothan-okx/examples/okx_basic.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,39 @@
use std::time::Duration;

use tokio::time::sleep;
use tracing_subscriber::fmt::init;

use bothan_core::worker::AssetWorker;
use bothan_okx::OkxWorkerBuilder;
use bothan_core::registry::Registry;
use bothan_core::store::SharedStore;
use bothan_core::worker::{AssetWorker, AssetWorkerBuilder};
use bothan_okx::{OkxWorkerBuilder, OkxWorkerBuilderOpts};

#[tokio::main]
async fn main() {
init();
let worker = OkxWorkerBuilder::default().build().await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let path = std::env::current_dir().unwrap();
let registry = Registry::default().validate().unwrap();
let store = SharedStore::new(registry, path.as_path()).await.unwrap();

let worker_store = store.create_worker_store(OkxWorkerBuilder::worker_name());
let opts = OkxWorkerBuilderOpts::default();

let worker = OkxWorkerBuilder::new(worker_store, opts)
.build()
.await
.unwrap();

worker
.add_query_ids(vec!["BTC-USDT".to_string(), "ETH-USDT".to_string()])
.set_query_ids(vec!["BTC-USDT".to_string(), "ETH-USDT".to_string()])
.await
.unwrap();

sleep(Duration::from_secs(2)).await;

loop {
let data = worker.get_assets(&["BTC-USDT", "ETH-USDT"]).await;
println!("{:?}", data);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
let btc_data = worker.get_asset("BTC-USDT").await;
let eth_data = worker.get_asset("ETH-USDT").await;
println!("{:?}, {:?}", btc_data, eth_data);
sleep(Duration::from_secs(5)).await;
}
}
61 changes: 25 additions & 36 deletions bothan-okx/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::sync::Arc;

use tokio::sync::mpsc::Sender;
use tracing::error;

use bothan_core::store::Store;
use bothan_core::worker::{AssetStatus, AssetWorker, Error};
use bothan_core::store::error::Error as StoreError;
use bothan_core::store::WorkerStore;
use bothan_core::worker::{AssetState, AssetWorker, SetQueryIDError};

use crate::api::websocket::OkxWebSocketConnector;

Expand All @@ -17,7 +15,7 @@ mod types;
/// A worker that fetches and stores the asset information from Okx's API.
pub struct OkxWorker {
connector: OkxWebSocketConnector,
store: Arc<Store>,
store: WorkerStore,
subscribe_tx: Sender<Vec<String>>,
unsubscribe_tx: Sender<Vec<String>>,
}
Expand All @@ -26,7 +24,7 @@ impl OkxWorker {
/// Create a new worker with the specified connector, store and channels.
pub fn new(
connector: OkxWebSocketConnector,
store: Arc<Store>,
store: WorkerStore,
subscribe_tx: Sender<Vec<String>>,
unsubscribe_tx: Sender<Vec<String>>,
) -> Self {
Expand All @@ -41,39 +39,30 @@ impl OkxWorker {

#[async_trait::async_trait]
impl AssetWorker for OkxWorker {
/// Fetches the AssetStatus for the given cryptocurrency ids.
async fn get_assets(&self, ids: &[&str]) -> Vec<AssetStatus> {
self.store.get_assets(ids).await
/// Fetches the AssetStatus for the given cryptocurrency id.
async fn get_asset(&self, id: &str) -> Result<AssetState, StoreError> {
self.store.get_asset(&id).await
}

/// Adds the specified cryptocurrency IDs to the query set.
async fn add_query_ids(&self, ids: Vec<String>) -> Result<(), Error> {
let to_sub = self.store.add_query_ids(ids).await;

if let Err(e) = self.subscribe_tx.send(to_sub.clone()).await {
error!("failed to add query ids: {}", e);
self.store.remove_query_ids(to_sub.as_slice()).await;
Err(Error::ModifyQueryIDsFailed(e.to_string()))
} else {
Ok(())
}
}
/// Sets the specified cryptocurrency IDs to the query. If the ids are already in the query set,
/// it will not be resubscribed.
async fn set_query_ids(&self, ids: Vec<String>) -> Result<(), SetQueryIDError> {
let (to_sub, to_unsub) = self
.store
.set_query_ids(ids)
.await
.map_err(|e| SetQueryIDError::new(e.to_string()))?;

/// Removes the specified cryptocurrency IDs from the query set.
async fn remove_query_ids(&self, ids: &[&str]) -> Result<(), Error> {
let to_unsub = self.store.remove_query_ids(ids).await;
self.subscribe_tx
.send(to_sub)
.await
.map_err(|e| SetQueryIDError::new(e.to_string()))?;

if let Err(e) = self.unsubscribe_tx.send(to_unsub.clone()).await {
error!("failed to remove query ids: {}", e);
self.store.add_query_ids(to_unsub).await;
Err(Error::ModifyQueryIDsFailed(e.to_string()))
} else {
Ok(())
}
}
self.unsubscribe_tx
.send(to_unsub)
.await
.map_err(|e| SetQueryIDError::new(e.to_string()))?;

/// Retrieves the current set of queried cryptocurrency IDs.
async fn get_query_ids(&self) -> Vec<String> {
self.store.get_query_ids().await
Ok(())
}
}
61 changes: 29 additions & 32 deletions bothan-okx/src/worker/asset_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use tokio::sync::mpsc::Receiver;
use tokio::time::{sleep, timeout};
use tracing::{debug, error, info, warn};

use bothan_core::store::Store;
use bothan_core::store::WorkerStore;
use bothan_core::types::AssetInfo;

use crate::api::error::{MessageError, SendError};
use crate::api::types::{ChannelResponse, OkxResponse, TickerData};
use crate::api::{OkxWebSocketConnection, OkxWebSocketConnector};
use crate::worker::error::ParseError;
use crate::worker::error::WorkerError;
use crate::worker::types::{DEFAULT_TIMEOUT, RECONNECT_BUFFER};
use crate::worker::OkxWorker;

Expand Down Expand Up @@ -54,9 +54,8 @@ async fn subscribe(
connection: &mut OkxWebSocketConnection,
) -> Result<(), SendError> {
if !ids.is_empty() {
connection
.subscribe_ticker(&ids.iter().map(|s| s.as_str()).collect::<Vec<&str>>())
.await?
let ids_vec = ids.iter().map(|s| s.as_str()).collect::<Vec<&str>>();
connection.subscribe_ticker(&ids_vec).await?
}

Ok(())
Expand Down Expand Up @@ -87,14 +86,14 @@ async fn handle_unsubscribe_recv(ids: Vec<String>, connection: &mut OkxWebSocket
if let Err(e) = unsubscribe(&ids, connection).await {
error!("failed to unsubscribe to ids {:?}: {}", ids, e);
} else {
info!("subscribed to ids {:?}", ids);
info!("unsubscribed to ids {:?}", ids);
}
}

async fn handle_reconnect(
connector: &OkxWebSocketConnector,
connection: &mut OkxWebSocketConnection,
query_ids: &Store,
query_ids: &WorkerStore,
) {
let mut retry_count: usize = 1;
loop {
Expand All @@ -104,8 +103,14 @@ async fn handle_reconnect(
*connection = new_connection;

// Resubscribe to all ids
let ids = query_ids.get_query_ids().await;
match subscribe(&ids, connection).await {
let Ok(ids) = query_ids.get_query_ids().await else {
error!("failed to get query ids from store");
return;
};

let ids_vec = ids.into_iter().collect::<Vec<String>>();

match subscribe(&ids_vec, connection).await {
Ok(_) => {
info!("resubscribed to all ids");
return;
Expand All @@ -123,42 +128,34 @@ async fn handle_reconnect(
}
}

fn parse_ticker(ticker: TickerData) -> Result<AssetInfo, ParseError> {
fn parse_ticker(ticker: TickerData) -> Result<AssetInfo, WorkerError> {
let id = ticker.inst_id.clone();
let price_value = Decimal::from_str_exact(&ticker.last)?;
Ok(AssetInfo::new(
ticker.inst_id,
id,
price_value,
chrono::Utc::now().timestamp(),
))
}

async fn store_tickers(tickers: Vec<TickerData>, store: &Store) -> Result<(), ParseError> {
let to_set = tickers
.into_iter()
.filter_map(|ticker| {
let id = ticker.inst_id.clone();
match parse_ticker(ticker) {
Ok(asset_info) => Some((id, asset_info)),
Err(e) => {
warn!("failed to parse ticker data for {} with error {}", id, e);
None
}
}
})
.collect::<Vec<(String, AssetInfo)>>();

store.set_assets(to_set).await;
async fn store_ticker(store: &WorkerStore, ticker: TickerData) -> Result<(), WorkerError> {
store
.set_asset(ticker.inst_id.clone(), parse_ticker(ticker)?)
.await?;
Ok(())
}

/// Processes the response from the Okx API.
async fn process_response(resp: OkxResponse, store: &Store) {
async fn process_response(resp: OkxResponse, store: &WorkerStore) {
match resp {
OkxResponse::ChannelResponse(resp) => match resp {
ChannelResponse::Ticker(push_data) => {
match store_tickers(push_data.data, store).await {
Ok(_) => info!("saved data"),
Err(e) => error!("failed to save data: {}", e),
let tickers = push_data.data;
for ticker in tickers {
match store_ticker(store, ticker).await {
Ok(_) => info!("saved data"),
Err(e) => error!("failed to save data: {}", e),
}
}
}
},
Expand All @@ -172,7 +169,7 @@ async fn handle_connection_recv(
recv_result: Result<OkxResponse, MessageError>,
connector: &OkxWebSocketConnector,
connection: &mut OkxWebSocketConnection,
store: &Store,
store: &WorkerStore,
) {
match recv_result {
Ok(resp) => {
Expand Down
Loading

0 comments on commit 7637e5e

Please sign in to comment.