Skip to content

Commit

Permalink
fixup after review (first round)
Browse files Browse the repository at this point in the history
  • Loading branch information
farnyser committed Apr 3, 2024
1 parent 5b1b658 commit 8386402
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 121 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions bin/service-mango-crank/src/transaction_builder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use mango_feeds_connector::{
account_write_filter::{self, AccountWriteRoute},
metrics::Metrics,
FeedWrite, SlotUpdate,
AccountWrite, SlotUpdate,
};

use solana_sdk::{instruction::Instruction, pubkey::Pubkey};
Expand All @@ -18,7 +18,7 @@ pub fn init(
group_pk: Pubkey,
metrics_sender: Metrics,
) -> anyhow::Result<(
async_channel::Sender<FeedWrite>,
async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>,
async_channel::Receiver<Vec<Instruction>>,
)> {
Expand Down
30 changes: 4 additions & 26 deletions bin/service-mango-fills/src/fill_event_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use log::*;
use mango_feeds_connector::{
chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData},
metrics::{MetricType, Metrics},
FeedWrite, SlotUpdate,
AccountWrite, SlotUpdate,
};
use mango_feeds_lib::serum::SerumEventQueueHeader;
use mango_feeds_lib::MarketConfig;
Expand Down Expand Up @@ -400,7 +400,7 @@ pub async fn init(
metrics_sender: Metrics,
exit: Arc<AtomicBool>,
) -> anyhow::Result<(
async_channel::Sender<FeedWrite>,
async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>,
async_channel::Receiver<FillEventFilterMessage>,
)> {
Expand All @@ -423,7 +423,7 @@ pub async fn init(

// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::unbounded::<FeedWrite>();
async_channel::unbounded::<AccountWrite>();

// Slot updates flowing from the outside into the single processing thread. From
// there they'll flow into the postgres sending thread.
Expand Down Expand Up @@ -463,7 +463,7 @@ pub async fn init(
break;
}
tokio::select! {
Ok(FeedWrite::Account(account_write)) = account_write_queue_receiver_c.recv() => {
Ok(account_write) = account_write_queue_receiver_c.recv() => {
if !all_queue_pks.contains(&account_write.pubkey) {
continue;
}
Expand All @@ -483,28 +483,6 @@ pub async fn init(
},
);
}
Ok(FeedWrite::Snapshot(snapshot_write)) = account_write_queue_receiver_c.recv() => {
for account_write in snapshot_write.accounts {
if !all_queue_pks.contains(&account_write.pubkey) {
continue;
}

chain_cache.update_account(
account_write.pubkey,
AccountData {
slot: account_write.slot,
write_version: account_write.write_version,
account: WritableAccount::create(
account_write.lamports,
account_write.data.clone(),
account_write.owner,
account_write.executable,
account_write.rent_epoch as Epoch,
),
},
);
}
}
Ok(slot_update) = slot_queue_receiver.recv() => {
chain_cache.update_slot(SlotData {
slot: slot_update.slot,
Expand Down
30 changes: 4 additions & 26 deletions bin/service-mango-orderbook/src/orderbook_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use fixed::types::I80F48;
use itertools::Itertools;
use log::*;
use mango_feeds_connector::metrics::MetricU64;
use mango_feeds_connector::FeedWrite;
use mango_feeds_connector::AccountWrite;
use mango_feeds_connector::{
chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData},
metrics::{MetricType, Metrics},
Expand Down Expand Up @@ -244,7 +244,7 @@ pub async fn init(
metrics_sender: Metrics,
exit: Arc<AtomicBool>,
) -> anyhow::Result<(
async_channel::Sender<FeedWrite>,
async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>,
async_channel::Receiver<OrderbookFilterMessage>,
)> {
Expand All @@ -255,7 +255,7 @@ pub async fn init(

// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::unbounded::<FeedWrite>();
async_channel::unbounded::<AccountWrite>();

// Slot updates flowing from the outside into the single processing thread. From
// there they'll flow into the postgres sending thread.
Expand Down Expand Up @@ -288,7 +288,7 @@ pub async fn init(
break;
}
tokio::select! {
Ok(FeedWrite::Account(account_write)) = account_write_queue_receiver.recv() => {
Ok(account_write) = account_write_queue_receiver.recv() => {
if !relevant_pubkeys.contains(&account_write.pubkey) {
continue;
}
Expand All @@ -307,35 +307,13 @@ pub async fn init(
},
);
},
Ok(FeedWrite::Snapshot(snapshot_write)) = account_write_queue_receiver.recv() => {
for account_write in snapshot_write.accounts {
if !relevant_pubkeys.contains(&account_write.pubkey) {
continue;
}
chain_cache.update_account(
account_write.pubkey,
AccountData {
slot: account_write.slot,
write_version: account_write.write_version,
account: WritableAccount::create(
account_write.lamports,
account_write.data.clone(),
account_write.owner,
account_write.executable,
account_write.rent_epoch as Epoch,
),
},
);
}
},
Ok(slot_update) = slot_queue_receiver.recv() => {
chain_cache.update_slot(SlotData {
slot: slot_update.slot,
parent: slot_update.parent,
status: slot_update.status,
chain: 0,
});

}
}

Expand Down
25 changes: 3 additions & 22 deletions bin/service-mango-pnl/src/memory_target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ use std::sync::{Arc, RwLock};
pub async fn init(
chain_data: Arc<RwLock<ChainData>>,
) -> anyhow::Result<(
async_channel::Sender<FeedWrite>,
async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>,
)> {
let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::unbounded::<FeedWrite>();
async_channel::unbounded::<AccountWrite>();

let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();

// update handling thread, reads both slots and account updates
tokio::spawn(async move {
loop {
tokio::select! {
Ok(FeedWrite::Account(account_write)) = account_write_queue_receiver.recv() => {
Ok(account_write) = account_write_queue_receiver.recv() => {
let mut chain = chain_data.write().unwrap();
chain.update_account(
account_write.pubkey,
Expand All @@ -35,25 +35,6 @@ pub async fn init(
},
);
}
Ok(FeedWrite::Snapshot(snapshot_write)) = account_write_queue_receiver.recv() => {
let mut chain = chain_data.write().unwrap();
for account_write in snapshot_write.accounts {
chain.update_account(
account_write.pubkey,
AccountData {
slot: account_write.slot,
write_version: account_write.write_version,
account: WritableAccount::create(
account_write.lamports,
account_write.data.clone(),
account_write.owner,
account_write.executable,
account_write.rent_epoch as Epoch,
),
},
);
}
}
Ok(slot_update) = slot_queue_receiver.recv() => {
let mut chain = chain_data.write().unwrap();
chain.update_slot(SlotData {
Expand Down
31 changes: 9 additions & 22 deletions lib/client/src/grpc_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use solana_sdk::pubkey::Pubkey;
use anyhow::Context;
use async_channel::{RecvError, Sender};
use mango_feeds_connector::{
EntityFilter, FeedFilterType, FeedWrite, FilterConfig, GrpcSourceConfig, Memcmp,
AccountWrite, EntityFilter, FeedFilterType, FilterConfig, GrpcSourceConfig, Memcmp,
SnapshotSourceConfig, SourceConfig,
};
use solana_rpc::rpc_pubsub::RpcSolPubSubClient;
Expand Down Expand Up @@ -99,7 +99,7 @@ async fn feed_data(
let (serum3_oo_sender, serum3_oo_receiver) = async_channel::unbounded();
let (serum3_oo_slot_sender, serum3_oo_slot_receiver) = async_channel::unbounded();
let filters = FilterConfig {
entity_filter: EntityFilter::FilterByProgramIdAndCustomCriteria(
entity_filter: EntityFilter::FilterByProgramIdSelective(
*serum_program,
serum3_oo_custom_filters.clone(),
),
Expand All @@ -122,7 +122,7 @@ async fn feed_data(
// Make sure the serum3_oo_sub_map does not exit when there's no serum_programs
let _unused_serum_sender;
if config.serum_programs.is_empty() {
let (sender, receiver) = async_channel::unbounded::<FeedWrite>();
let (sender, receiver) = async_channel::unbounded::<AccountWrite>();
_unused_serum_sender = sender;
serum3_oo_sub_map.insert(Pubkey::default(), receiver);
}
Expand Down Expand Up @@ -168,7 +168,7 @@ async fn feed_data(

async fn handle_message(
name: &str,
message: Result<FeedWrite, RecvError>,
message: Result<AccountWrite, RecvError>,
sender: &Sender<Message>,
) -> bool {
if let Ok(data) = message {
Expand All @@ -180,24 +180,11 @@ async fn handle_message(
}
}

async fn handle_feed_write(sender: &Sender<Message>, data: FeedWrite) {
match data {
FeedWrite::Account(account) => sender
.send(Message::Account(AccountUpdate::from_feed(account)))
.await
.expect("sending must succeed"),
FeedWrite::Snapshot(mut snapshot) => sender
.send(Message::Snapshot(
snapshot
.accounts
.drain(0..)
.map(|a| AccountUpdate::from_feed(a))
.collect(),
crate::account_update_stream::SnapshotType::Partial,
))
.await
.expect("sending must succeed"),
}
async fn handle_feed_write(sender: &Sender<Message>, account: AccountWrite) {
sender
.send(Message::Account(AccountUpdate::from_feed(account)))
.await
.expect("sending must succeed");
}

pub fn start(
Expand Down
31 changes: 9 additions & 22 deletions lib/client/src/websocket_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use solana_sdk::pubkey::Pubkey;
use anyhow::Context;
use async_channel::{RecvError, Sender};
use mango_feeds_connector::{
EntityFilter, FeedFilterType, FeedWrite, FilterConfig, Memcmp, SnapshotSourceConfig,
AccountWrite, EntityFilter, FeedFilterType, FilterConfig, Memcmp, SnapshotSourceConfig,
SourceConfig,
};
use solana_rpc::rpc_pubsub::RpcSolPubSubClient;
Expand Down Expand Up @@ -89,7 +89,7 @@ async fn feed_data(
let (serum3_oo_sender, serum3_oo_receiver) = async_channel::unbounded();
let (serum3_oo_slot_sender, serum3_oo_slot_receiver) = async_channel::unbounded();
let filters = FilterConfig {
entity_filter: EntityFilter::FilterByProgramIdAndCustomCriteria(
entity_filter: EntityFilter::FilterByProgramIdSelective(
*serum_program,
serum3_oo_custom_filters.clone(),
),
Expand All @@ -110,7 +110,7 @@ async fn feed_data(
// Make sure the serum3_oo_sub_map does not exit when there's no serum_programs
let _unused_serum_sender;
if config.serum_programs.is_empty() {
let (sender, receiver) = async_channel::unbounded::<FeedWrite>();
let (sender, receiver) = async_channel::unbounded::<AccountWrite>();
_unused_serum_sender = sender;
serum3_oo_sub_map.insert(Pubkey::default(), receiver);
}
Expand Down Expand Up @@ -156,7 +156,7 @@ async fn feed_data(

async fn handle_message(
name: &str,
message: Result<FeedWrite, RecvError>,
message: Result<AccountWrite, RecvError>,
sender: &Sender<Message>,
) -> bool {
if let Ok(data) = message {
Expand All @@ -168,24 +168,11 @@ async fn handle_message(
}
}

async fn handle_feed_write(sender: &Sender<Message>, data: FeedWrite) {
match data {
FeedWrite::Account(account) => sender
.send(Message::Account(AccountUpdate::from_feed(account)))
.await
.expect("sending must succeed"),
FeedWrite::Snapshot(mut snapshot) => sender
.send(Message::Snapshot(
snapshot
.accounts
.drain(0..)
.map(|a| AccountUpdate::from_feed(a))
.collect(),
crate::account_update_stream::SnapshotType::Partial,
))
.await
.expect("sending must succeed"),
}
async fn handle_feed_write(sender: &Sender<Message>, account: AccountWrite) {
sender
.send(Message::Account(AccountUpdate::from_feed(account)))
.await
.expect("sending must succeed");
}

pub fn start(config: Config, mango_oracles: Vec<Pubkey>, sender: async_channel::Sender<Message>) {
Expand Down

0 comments on commit 8386402

Please sign in to comment.