Skip to content

Commit

Permalink
benchmark-data-update: listen to oracle/serum accounts in additions t…
Browse files Browse the repository at this point in the history
…o mango accounts
  • Loading branch information
farnyser committed Mar 28, 2024
1 parent c884ad1 commit 8fcc11e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 36 deletions.
45 changes: 18 additions & 27 deletions bin/benchmark-data-update/src/processors/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,17 @@ use crate::processors::data::DataEvent::{AccountUpdate, Other, Snapshot};
use async_channel::Receiver;
use chrono::Utc;
use itertools::Itertools;
use mango_v4_client::account_update_stream::{Message, SnapshotType};
use mango_v4_client::snapshot_source::is_mango_account;
use mango_v4_client::{
account_update_stream, grpc_source, snapshot_source, websocket_source, MangoGroupContext,
};
use mango_v4_client::account_update_stream::Message;
use mango_v4_client::{account_update_stream, grpc_source, websocket_source, MangoGroupContext};
use services_mango_lib::fail_or_retry;
use services_mango_lib::retry_counter::RetryCounter;
use solana_client::nonblocking::rpc_client::RpcClient as RpcClientAsync;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use std::fmt::{Display, Pointer};
use std::fmt::Display;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tracing::warn;
Expand Down Expand Up @@ -68,7 +65,6 @@ impl DataProcessor {
exit: Arc<AtomicBool>,
) -> anyhow::Result<DataProcessor> {
let mut retry_counter = RetryCounter::new(2);
let mango_group = Pubkey::from_str(&configuration.mango_group)?;
let mango_stream = fail_or_retry!(
retry_counter,
Self::init_mango_source(configuration, source, exit.clone()).await
Expand All @@ -89,7 +85,7 @@ impl DataProcessor {
continue;
}

let event = Self::parse_message(msg, source, received_at, mango_group);
let event = Self::parse_message(msg, source, received_at);

if event.is_none() {
continue;
Expand Down Expand Up @@ -129,27 +125,22 @@ impl DataProcessor {
message: Message,
source: DataEventSource,
received_at: chrono::DateTime<Utc>,
mango_group: Pubkey,
) -> Option<DataEvent> {
match message {
Message::Account(account_write) => {
if is_mango_account(&account_write.account, &mango_group).is_some() {
return Some(AccountUpdate(AccountUpdateEvent {
account: account_write.pubkey,
received_at,
source,
slot: account_write.slot,
}));
}
return Some(AccountUpdate(AccountUpdateEvent {
account: account_write.pubkey,
received_at,
source,
slot: account_write.slot,
}));
}
Message::Snapshot(snapshot, snapshot_type) => {
Message::Snapshot(snapshot, _) => {
let slot = snapshot[0].slot;
let mut result = Vec::new();
for update in snapshot.iter() {
if is_mango_account(&update.account, &mango_group).is_some() {
result.push(update.pubkey);
assert!(slot == update.slot);
}
result.push(update.pubkey);
assert!(slot == update.slot);
}

return Some(Snapshot(SnapshotEvent {
Expand Down Expand Up @@ -215,8 +206,8 @@ impl DataProcessor {
open_orders_authority: mango_group,
grpc_sources: sources,
},
mango_oracles.clone(),
account_update_sender.clone(),
mango_oracles,
account_update_sender,
metrics,
exit,
);
Expand All @@ -228,8 +219,8 @@ impl DataProcessor {
serum_programs,
open_orders_authority: mango_group,
},
mango_oracles.clone(),
account_update_sender.clone(),
mango_oracles,
account_update_sender,
);
}

Expand Down
15 changes: 6 additions & 9 deletions bin/benchmark-data-update/src/processors/logger.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
use crate::configuration::Configuration;
use chrono::Utc;
use hdrhistogram::Histogram;
use solana_sdk::blake3::Hash;
use solana_sdk::pubkey::Pubkey;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tracing::{info, warn};

use super::data::{AccountUpdateEvent, DataEvent, DataEventSource, SnapshotEvent};
use super::data::{AccountUpdateEvent, DataEvent, DataEventSource};

pub struct LoggerProcessor {
pub job: JoinHandle<()>,
}

impl LoggerProcessor {
/// TODO FAS
/// Enlever slot de la key, et comparer en mode "min slot" -> il faut un update avec upd.slot >= existing.slot pour match
pub async fn init(
data_sender_1: &tokio::sync::broadcast::Sender<DataEvent>,
data_sender_2: &tokio::sync::broadcast::Sender<DataEvent>,
Expand Down Expand Up @@ -130,7 +124,10 @@ impl LoggerProcessor {
for x in events {
tracing::debug!(
"{} => {} {} (got from {})",
x.0, x.1.slot, x.1.received_at, x.1.source
x.0,
x.1.slot,
x.1.received_at,
x.1.source
)
}

Expand Down

0 comments on commit 8fcc11e

Please sign in to comment.