Skip to content

Commit

Permalink
feat: weighted price from order book cache (#181)
Browse files Browse the repository at this point in the history
* feat: volume-based weighted price from order book cache

* fix(test): fix price app test

* chore(check_code): allow dead code in exchange price cache

* test: simplify weighted price test

* test: fix failing unit and integration test

* chore(clippy): remove needless lifetime

* fix: replace side from asks to bids when buying usd

* chore: rename price_cache to snapshot_cache

Co-authored-by: bodymindarts <justin@galoy.io>
  • Loading branch information
enigbe and bodymindarts authored Nov 10, 2022
1 parent 8d963ba commit 952850c
Show file tree
Hide file tree
Showing 11 changed files with 474 additions and 67 deletions.
2 changes: 1 addition & 1 deletion cli/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn health_check_error(
*n_errors += 1;
let span = tracing::Span::current();
span.record("component_name", name);
span.record("n_errors", &*n_errors);
span.record("n_errors", *n_errors);
span.record("error.message", tracing::field::display(&err));
if *n_errors > 4 {
span.record(
Expand Down
8 changes: 5 additions & 3 deletions price-server/src/app/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use thiserror::Error;

use crate::{currency::CurrencyError, exchange_price_cache::ExchangePriceCacheError};
use crate::{currency::CurrencyError, order_book_cache::OrderBookCacheError};
use shared::pubsub::SubscriberError;

#[allow(clippy::large_enum_variant)]
Expand All @@ -10,6 +10,8 @@ pub enum PriceAppError {
CurrencyError(#[from] CurrencyError),
#[error("PriceAppError - SubscriberError: {0}")]
SubscriberError(#[from] SubscriberError),
#[error("PriceAppError - ExchangePriceCacheError: {0}")]
ExchangePriceCacheError(#[from] ExchangePriceCacheError),
#[error("PriceAppError - SnapshotCacheError: {0}")]
OrderBookCacheError(#[from] OrderBookCacheError),
#[error("PriceAppError - FloatingPointConversion: {0}")]
FloatingPointConversion(#[from] rust_decimal::Error),
}
55 changes: 30 additions & 25 deletions price-server/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ use chrono::Duration;
use futures::stream::StreamExt;
use tracing::{info_span, instrument, Instrument};

use shared::{health::HealthCheckTrigger, payload::OkexBtcUsdSwapPricePayload, pubsub::*};

use super::exchange_price_cache::ExchangePriceCache;
use shared::{health::HealthCheckTrigger, payload::OkexBtcUsdSwapOrderBookPayload, pubsub::*};

use crate::OrderBookCache;
pub use crate::{currency::*, fee_calculator::*};
pub use error::*;

pub struct PriceApp {
price_cache: ExchangePriceCache,
snapshot_cache: OrderBookCache,
fee_calculator: FeeCalculator,
}

Expand All @@ -23,7 +22,9 @@ impl PriceApp {
pubsub_cfg: PubSubConfig,
) -> Result<Self, PriceAppError> {
let subscriber = Subscriber::new(pubsub_cfg).await?;
let mut stream = subscriber.subscribe::<OkexBtcUsdSwapPricePayload>().await?;
let mut stream = subscriber
.subscribe::<OkexBtcUsdSwapOrderBookPayload>()
.await?;
tokio::spawn(async move {
while let Some(check) = health_check_trigger.next().await {
check
Expand All @@ -32,10 +33,10 @@ impl PriceApp {
}
});

let price_cache = ExchangePriceCache::new(Duration::seconds(30));
let order_book_cache = OrderBookCache::new(Duration::seconds(30));
let fee_calculator = FeeCalculator::new(fee_calc_cfg);
let app = Self {
price_cache: price_cache.clone(),
snapshot_cache: order_book_cache.clone(),
fee_calculator,
};

Expand All @@ -49,7 +50,7 @@ impl PriceApp {
shared::tracing::inject_tracing_data(&span, &msg.meta.tracing_data);

async {
price_cache.apply_update(msg).await;
order_book_cache.apply_update(msg).await;
}
.instrument(span)
.await;
Expand All @@ -64,8 +65,8 @@ impl PriceApp {
sats: Sats,
) -> Result<UsdCents, PriceAppError> {
let cents = self
.price_cache
.latest_tick()
.snapshot_cache
.latest_snapshot()
.await?
.buy_usd()
.cents_from_sats(sats);
Expand All @@ -78,8 +79,8 @@ impl PriceApp {
sats: Sats,
) -> Result<UsdCents, PriceAppError> {
let cents = self
.price_cache
.latest_tick()
.snapshot_cache
.latest_snapshot()
.await?
.sell_usd()
.cents_from_sats(sats);
Expand All @@ -92,8 +93,8 @@ impl PriceApp {
sats: Sats,
) -> Result<UsdCents, PriceAppError> {
let cents = self
.price_cache
.latest_tick()
.snapshot_cache
.latest_snapshot()
.await?
.buy_usd()
.cents_from_sats(sats);
Expand All @@ -106,8 +107,8 @@ impl PriceApp {
sats: Sats,
) -> Result<UsdCents, PriceAppError> {
let cents = self
.price_cache
.latest_tick()
.snapshot_cache
.latest_snapshot()
.await?
.sell_usd()
.cents_from_sats(sats);
Expand All @@ -120,8 +121,8 @@ impl PriceApp {
cents: UsdCents,
) -> Result<Sats, PriceAppError> {
let sats = self
.price_cache
.latest_tick()
.snapshot_cache
.latest_snapshot()
.await?
.buy_usd()
.sats_from_cents(cents);
Expand All @@ -134,8 +135,8 @@ impl PriceApp {
cents: UsdCents,
) -> Result<Sats, PriceAppError> {
let sats = self
.price_cache
.latest_tick()
.snapshot_cache
.latest_snapshot()
.await?
.sell_usd()
.sats_from_cents(cents);
Expand All @@ -148,8 +149,8 @@ impl PriceApp {
cents: UsdCents,
) -> Result<Sats, PriceAppError> {
let sats = self
.price_cache
.latest_tick()
.snapshot_cache
.latest_snapshot()
.await?
.buy_usd()
.sats_from_cents(cents);
Expand All @@ -162,8 +163,8 @@ impl PriceApp {
cents: UsdCents,
) -> Result<Sats, PriceAppError> {
let sats = self
.price_cache
.latest_tick()
.snapshot_cache
.latest_snapshot()
.await?
.sell_usd()
.sats_from_cents(cents);
Expand All @@ -172,7 +173,11 @@ impl PriceApp {

#[instrument(skip_all, fields(correlation_id), ret, err)]
pub async fn get_cents_per_sat_exchange_mid_rate(&self) -> Result<f64, PriceAppError> {
let cents_per_sat = self.price_cache.latest_tick().await?.mid_price_of_one_sat();
let cents_per_sat = self
.snapshot_cache
.latest_snapshot()
.await?
.mid_price_of_one_sat()?;
Ok(f64::try_from(cents_per_sat)?)
}
}
2 changes: 1 addition & 1 deletion price-server/src/currency/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum CurrencyError {

macro_rules! currency {
($name:ident, $code:ident) => {
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct $name {
inner: Money<'static, inner::stablesats::Currency>,
}
Expand Down
4 changes: 4 additions & 0 deletions price-server/src/exchange_price_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ pub enum ExchangePriceCacheError {
}

#[derive(Clone)]
#[allow(dead_code)]
pub struct ExchangePriceCache {
inner: Arc<RwLock<ExchangePriceCacheInner>>,
}

#[allow(dead_code)]
impl ExchangePriceCache {
pub fn new(stale_after: Duration) -> Self {
Self {
Expand All @@ -51,6 +53,7 @@ impl ExchangePriceCache {
}

#[derive(Clone)]
#[allow(dead_code)]
pub struct BtcSatTick {
timestamp: TimeStamp,
correlation_id: CorrelationId,
Expand All @@ -59,6 +62,7 @@ pub struct BtcSatTick {
bid_price_of_one_sat: UsdCents,
}

#[allow(dead_code)]
impl BtcSatTick {
pub fn mid_price_of_one_sat(&self) -> UsdCents {
(&self.bid_price_of_one_sat + &self.ask_price_of_one_sat) / 2
Expand Down
4 changes: 4 additions & 0 deletions price-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ pub mod app;
pub mod currency;
mod exchange_price_cache;
mod fee_calculator;
mod order_book_cache;
mod price_converter;
mod server;

use shared::{health::HealthCheckTrigger, pubsub::PubSubConfig};

use app::PriceApp;
pub use exchange_price_cache::ExchangePriceCacheError;
pub use fee_calculator::FeeCalculatorConfig;
pub use order_book_cache::*;
pub use price_converter::*;
pub use server::*;

pub async fn run(
Expand Down
Loading

0 comments on commit 952850c

Please sign in to comment.