Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use mango feeds in liquidator #923

Open
wants to merge 15 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 81 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ fixed = { git = "https://github.com/blockworks-foundation/fixed.git", branch = "
pyth-sdk-solana = "0.8.0"
# commit c85e56d (0.5.10 plus dependency updates)
serum_dex = { git = "https://github.com/openbook-dex/program.git", default-features=false }
mango-feeds-connector = "0.2.1"
mango-feeds-connector = { git = "https://github.com/blockworks-foundation/mango-feeds.git", branch = "serge/custom_filtering"}

# 1.16.7+ is required due to this: https://github.com/blockworks-foundation/mango-v4/issues/712
solana-address-lookup-table-program = "~1.16.7"
Expand Down
55 changes: 55 additions & 0 deletions bin/benchmark-data-update/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
[package]
name = "benchmark-data-update"
version = "0.1.0"
authors = ["Serge Farny <serge.farny@gmail.com>"]
edition = "2021"
license = "AGPL-3.0-or-later"

[dependencies]
mango-feeds-connector = { workspace = true }
mango-feeds-lib = { path = "../../lib/mango-feeds-lib" }
services-mango-lib = { path = "../../lib/services-mango-lib" }

solana-client = { workspace = true }
solana-logger = { workspace = true }
solana-sdk = { workspace = true }

anchor-lang = { workspace = true }
anchor-client = { workspace = true }

fixed = { workspace = true, features = ["serde", "borsh"] }

mango-v4 = { path = "../../programs/mango-v4", features = ["client"] }
mango-v4-client = { path = "../../lib/client" }

serum_dex = { workspace = true }

bs58 = "0.3.1"
log = "0.4"
anyhow = "1.0"
toml = "0.5"
serde = "1.0.130"
serde_derive = "1.0.130"
serde_json = "1.0.68"
futures = "0.3.17"
futures-core = "0.3"
futures-channel = "0.3"
futures-util = "0.3"
ws = "^0.9.2"
async-channel = "1.6"
async-trait = "0.1"
bytemuck = "^1.7.2"
itertools = "0.10.3"
jemallocator = "0.3.2"
chrono = "0.4.23"
base64 = "0.21"

tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.17"

native-tls = "0.2"
rustls = "0.20.8"
tracing = { version = "0.1", features = ["log"] }

hdrhistogram = "7.5.4"
csv = "1.0"
4 changes: 4 additions & 0 deletions bin/benchmark-data-update/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# benchmark data update

Compare websocket and grpc connection performance

14 changes: 14 additions & 0 deletions bin/benchmark-data-update/conf/example-config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX"

[source_configuration]
rpc_ws_url = "wss://mango.rpcpool.com/<TOKEN>"
rpc_http_url = "http://mango.rpcpool.com/<TOKEN>"
snapshot_interval_secs = 900
use_grpc = false
dedup_queue_size = 50000

[[source_configuration.grpc_sources]]
name = "benchmark-data-update"
connection_string = "http://tyo64.rpcpool.com/"
#token = "<TOKEN>"
retry_connection_sleep_secs = 30
25 changes: 25 additions & 0 deletions bin/benchmark-data-update/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use mango_feeds_connector::GrpcSourceConfig;
use serde_derive::Deserialize;
use services_mango_lib::env_helper::string_or_env;

#[derive(Clone, Debug, Deserialize)]
pub struct Configuration {
#[serde(deserialize_with = "string_or_env")]
pub mango_group: String,
#[serde(deserialize_with = "string_or_env")]
pub export_csv_path: String,
pub source_configuration: SourceConfiguration,
}

#[derive(Clone, Debug, Deserialize)]
pub struct SourceConfiguration {
#[serde(deserialize_with = "string_or_env")]
pub rpc_http_url: String,
#[serde(deserialize_with = "string_or_env")]
pub rpc_ws_url: String,

pub snapshot_interval_secs: u64,

pub dedup_queue_size: usize,
pub grpc_sources: Vec<GrpcSourceConfig>,
}
82 changes: 82 additions & 0 deletions bin/benchmark-data-update/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
mod configuration;
mod processors;

use futures_util::StreamExt;
// use mango_feeds_connector::metrics;
use mango_v4_client::tracing_subscriber_init;
use std::fs::File;
use std::io::Read;
use std::sync::atomic::Ordering;

use crate::configuration::Configuration;
use crate::processors::data::{DataEventSource, DataProcessor};
use crate::processors::exit::ExitProcessor;
use crate::processors::exporter::ExporterProcessor;
use crate::processors::logger::LoggerProcessor;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect();

if args.len() < 2 {
eprintln!("Please enter a config file path argument.");
return Ok(());
}

let configuration: Configuration = {
let mut file = File::open(&args[1])?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
toml::from_str(&contents).unwrap()
};

tracing_subscriber_init();

let exit_processor = ExitProcessor::init().await?;

let ws_processor: DataProcessor = DataProcessor::init(
&configuration,
DataEventSource::Websocket,
exit_processor.exit.clone(),
)
.await?;
let grpc_processor: DataProcessor = DataProcessor::init(
&configuration,
DataEventSource::Grpc,
exit_processor.exit.clone(),
)
.await?;

let logger_processor = LoggerProcessor::init(
&ws_processor.channel,
&grpc_processor.channel,
exit_processor.exit.clone(),
)
.await?;

let exporter_processor = ExporterProcessor::init(
&configuration,
&ws_processor.channel,
&grpc_processor.channel,
exit_processor.exit.clone(),
)
.await?;

let jobs = vec![
exit_processor.job,
ws_processor.job,
grpc_processor.job,
logger_processor.job,
exporter_processor.job,
];
let mut jobs: futures::stream::FuturesUnordered<_> = jobs.into_iter().collect();

while let Some(_) = jobs.next().await {
// if any job exit, stop the others threads & wait
exit_processor.exit.store(true, Ordering::Relaxed);
}

// for now, we force exit here because websocket connection to RPC is not properly closed on exit
tracing::warn!("killing process");
std::process::exit(0x0100);
}
Loading
Loading