Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[!feat] Update Binance #58

Merged
merged 30 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bothan-binance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ async-trait = { workspace = true }
bothan-core = { workspace = true }
humantime-serde = { workspace = true }
rand = { workspace = true }
rust_decimal = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
Expand All @@ -20,6 +21,7 @@ tracing-subscriber = { workspace = true }

futures-channel = "0.3.30"
futures-util = { version = "0.3.29", features = ["sink", "std"] }
anyhow = "1.0.86"
warittornc marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
ws-mock = { git = "https://github.com/bandprotocol/ws-mock.git", branch = "master" }
19 changes: 10 additions & 9 deletions bothan-binance/examples/binance_basic.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use tracing_subscriber::fmt::init;

use bothan_binance::BinanceServiceBuilder;
use bothan_core::service::Service;
use bothan_binance::BinanceStoreBuilder;
use bothan_core::worker::AssetWorker;

#[tokio::main]
async fn main() {
init();
let mut store = BinanceStoreBuilder::default().build().await.unwrap();

let service = BinanceServiceBuilder::default().build().await;
if let Ok(mut service) = service {
loop {
let data = service.get_price_data(&["btcusdt", "ethusdt"]).await;
println!("{:?}", data);
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
}
store.start().await;
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
store.add_query_ids(&["btcusdt", "ethusdt"]).await.unwrap();
loop {
let data = store.get_assets(&["btcusdt", "ethusdt"]).await;
println!("{:?}", data);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
3 changes: 3 additions & 0 deletions bothan-binance/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub use error::{ConnectionError, Error};
pub use websocket::{BinanceWebSocketConnection, BinanceWebSocketConnector};

pub mod error;
pub mod types;
pub mod websocket;
24 changes: 16 additions & 8 deletions bothan-binance/src/api/error.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
use tokio_tungstenite::tungstenite::{self, http::StatusCode};
use tokio_tungstenite::tungstenite;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("failed to connect with response code {0}")]
ConnectionFailure(StatusCode),
pub enum ConnectionError {
#[error("failed to connect to endpoint {source:?}")]
ConnectionFailure {
#[from]
source: tungstenite::Error,
},

#[error("failed to parse")]
Parse(#[from] serde_json::Error),
#[error("received unsuccessful HTTP response: {status:?}")]
UnsuccessfulHttpResponse {
status: tungstenite::http::StatusCode,
},
}

#[error("tungstenite error")]
Tungstenite(#[from] tungstenite::Error),
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("failed to parse message")]
warittornc marked this conversation as resolved.
Show resolved Hide resolved
Parse(#[from] serde_json::Error),

#[error("channel closed")]
ChannelClosed,
Expand Down
114 changes: 83 additions & 31 deletions bothan-binance/src/api/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,93 +2,138 @@ use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::http::StatusCode;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tracing::warn;
use tokio_tungstenite::{connect_async, tungstenite, MaybeTlsStream, WebSocketStream};

use crate::api::error::Error;
use crate::api::types::BinanceResponse;
use crate::api::{types::BinanceResponse, ConnectionError, Error};

/// Binance WebSocket Connector
/// A connector for establishing WebSocket connections to Binance.
pub struct BinanceWebSocketConnector {
url: String,
}

impl BinanceWebSocketConnector {
/// Create a new BinanceWebSocketConnector
/// Creates a new `BinanceWebSocketConnector` with the given URL.
///
/// # Examples
///
/// ```no_test
/// let connector = BinanceWebSocketConnector::new("wss://example.com/socket");
/// ```
pub fn new(url: impl Into<String>) -> Self {
Self { url: url.into() }
}

/// Creates a new connection to the Binance WebSocket while returning a `BinanceWebSocketConnection`.
/// If the connection fails, an `Error` is returned.
pub async fn connect(&self) -> Result<BinanceWebSocketConnection, Error> {
/// Establishes a WebSocket connection to the Binance server.
///
/// # Examples
///
/// ```no_test
/// let connector = BinanceWebSocketConnector::new("wss://example.com/socket");
/// let connection = connector.connect().await?;
/// ```
pub async fn connect(&self) -> Result<BinanceWebSocketConnection, ConnectionError> {
// Attempt to establish a WebSocket connection.
let (wss, resp) = connect_async(self.url.clone()).await?;

// Check the HTTP response status.
let status = resp.status();
if StatusCode::is_server_error(&status) || StatusCode::is_client_error(&status) {
warn!("Failed to connect with response code {}", resp.status());
return Err(Error::ConnectionFailure(resp.status()));
if status.as_u16() >= 400 {
warittornc marked this conversation as resolved.
Show resolved Hide resolved
return Err(ConnectionError::UnsuccessfulHttpResponse { status });
}

// Return the WebSocket connection.
Ok(BinanceWebSocketConnection::new(wss))
}
}

/// Binance WebSocket Connection
/// Represents an active WebSocket connection to Binance.
pub struct BinanceWebSocketConnection {
sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
}

impl BinanceWebSocketConnection {
/// Create a new BinanceWebSocketConnection
/// Creates a new `BinanceWebSocketConnection` by splitting the WebSocket stream into sender and receiver.
pub fn new(web_socket_stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
let (sender, receiver) = web_socket_stream.split();
Self { sender, receiver }
}

/// Subscribes to a list of symbols. If the subscription fails, an `Error` is returned.
/// Equivalent to subscribing to the `miniTicker` stream.
pub async fn subscribe(&mut self, ids: &[&str]) -> Result<(), Error> {
/// Subscribes to the mini ticker stream for the specified symbol IDs.
///
/// # Examples
///
/// ```no_test
/// let mut connection = connector.connect().await?;
/// connection.subscribe_mini_ticker_stream(&["btcusdt"]).await?;
/// ```
pub async fn subscribe_mini_ticker_stream(
&mut self,
ids: &[&str],
warittornc marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<(), tungstenite::Error> {
// Format the stream IDs for subscription.
let stream_ids = ids
.iter()
.map(|id| format!("{}@miniTicker", id))
.collect::<Vec<_>>();

// Create the subscription payload.
let payload = json!({
"method": "SUBSCRIBE",
"params": stream_ids,
"id": rand::random::<u32>()
});

// Send the subscription message.
let message = Message::Text(payload.to_string());
Ok(self.sender.send(message).await?)
self.sender.send(message).await
}

/// Unsubscribes from a list of symbols. If unable to subscribe, an `Error` is returned.
/// Equivalent to unsubscribing from the `miniTicker` stream.
pub async fn unsubscribe(&mut self, ids: &[&str]) -> Result<(), Error> {
/// Unsubscribes from the mini ticker stream for the specified symbol IDs.
///
/// # Examples
///
/// ```no_test
/// let mut connection = connector.connect().await?;
/// connection.unsubscribe_mini_ticker_stream(&["btcusdt"]).await?;
/// ```
pub async fn unsubscribe_mini_ticker_stream(
&mut self,
ids: &[&str],
) -> Result<(), tungstenite::Error> {
// Format the stream IDs for unsubscription.
let stream_ids = ids
.iter()
.map(|id| format!("{}@miniTicker", id))
.collect::<Vec<_>>();

// Create the unsubscription payload.
let payload = json!({
"method": "UNSUBSCRIBE",
"params": stream_ids,
"id": rand::random::<u32>()
});

// Send the unsubscription message.
let message = Message::Text(payload.to_string());
Ok(self.sender.send(message).await?)
self.sender.send(message).await
}

/// Awaits and returns the next message from the WebSocket connection. If the message is
/// successfully received, a `BinanceResponse` is returned.
/// Retrieves the next message from the WebSocket stream.
///
/// # Examples
///
/// ```no_test
/// let mut connection = connector.connect().await?;
/// if let Ok(response) = connection.next().await {
/// println!("Received response: {:?}", response);
/// }
/// ```
pub async fn next(&mut self) -> Result<BinanceResponse, Error> {
// Wait for the next message.
if let Some(result_msg) = self.receiver.next().await {
// Handle the received message.
return match result_msg {
Ok(Message::Text(msg)) => Ok(serde_json::from_str::<BinanceResponse>(&msg)?),
Ok(Message::Ping(_)) => Ok(BinanceResponse::Ping),
Expand All @@ -103,10 +148,11 @@ impl BinanceWebSocketConnection {

#[cfg(test)]
pub(crate) mod test {
use crate::api::types::{Data, MiniTickerInfo, StreamResponse};
use tokio::sync::mpsc;
use ws_mock::ws_mock_server::{WsMock, WsMockServer};

use crate::api::types::{Data, MiniTickerInfo, StreamResponse};

use super::*;

pub(crate) async fn setup_mock_server() -> WsMockServer {
Expand All @@ -115,9 +161,12 @@ pub(crate) mod test {

#[tokio::test]
async fn test_recv_ticker() {
// Se up the mock server and the WebSocket connector.
warittornc marked this conversation as resolved.
Show resolved Hide resolved
let server = setup_mock_server().await;
let connector = BinanceWebSocketConnector::new(server.uri().await);
let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);

// Create a mock mini ticker response.
let mock_ticker = MiniTickerInfo {
event_time: 10000,
symbol: "BTC".to_string(),
Expand All @@ -133,6 +182,7 @@ pub(crate) mod test {
data: Data::MiniTicker(mock_ticker),
};

// Mount the mock WebSocket server and send the mock response.
WsMock::new()
.forward_from_channel(mpsc_recv)
.mount(&server)
Expand All @@ -142,46 +192,48 @@ pub(crate) mod test {
.await
.unwrap();

// Connect to the mock WebSocket server and retrieve the response.
let mut connection = connector.connect().await.unwrap();

let resp = connection.next().await.unwrap();
assert_eq!(resp, BinanceResponse::Stream(mock_resp));
}

#[tokio::test]
async fn test_recv_ping() {
// Set up the mock server and the WebSocket connector.
let server = setup_mock_server().await;
let connector = BinanceWebSocketConnector::new(server.uri().await);
let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);

// Mount the mock WebSocket server and send a ping message.
WsMock::new()
.forward_from_channel(mpsc_recv)
.mount(&server)
.await;

mpsc_send.send(Message::Ping(vec![])).await.unwrap();

// Connect to the mock WebSocket server and retrieve the ping response.
let mut connection = connector.connect().await.unwrap();

let resp = connection.next().await.unwrap();
assert_eq!(resp, BinanceResponse::Ping);
}

#[tokio::test]
async fn test_recv_close() {
// Set up the mock server and the WebSocket connector.
let server = setup_mock_server().await;
let connector = BinanceWebSocketConnector::new(server.uri().await);
let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);

// Mount the mock WebSocket server and send a close message.
WsMock::new()
.forward_from_channel(mpsc_recv)
.mount(&server)
.await;

mpsc_send.send(Message::Close(None)).await.unwrap();

// Connect to the mock WebSocket server and verify the connection closure.
let mut connection = connector.connect().await.unwrap();

let resp = connection.next().await;
assert!(resp.is_err());
}
Expand Down
18 changes: 0 additions & 18 deletions bothan-binance/src/error.rs

This file was deleted.

8 changes: 3 additions & 5 deletions bothan-binance/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
pub use api::websocket::{BinanceWebSocketConnection, BinanceWebSocketConnector};
pub use service::builder::{BinanceServiceBuilder, BinanceServiceBuilderOpts};
pub use service::BinanceService;
pub use store::builder::{BinanceWorkerBuilder, BinanceWorkerBuilderOpts};
pub use store::BinanceWorker;

pub mod api;
pub mod error;
pub mod service;
pub mod types;
pub mod store;
Loading
Loading