Skip to content

Commit

Permalink
Add Disable Mempool Flag (solana-labs#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
jedleggett authored Feb 21, 2024
1 parent 0e2a067 commit 856df68
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 75 deletions.
10 changes: 5 additions & 5 deletions .github/actions/setup-rust/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ runs:

- name: Install cargo dependencies
run: |
rustup install nightly-2023-10-05
rustup component add rustfmt --toolchain nightly-2023-10-05-x86_64-unknown-linux-gnu
rustup component add clippy --toolchain nightly-2023-10-05-x86_64-unknown-linux-gnu
rustup install nightly-2024-02-01
rustup component add rustfmt --toolchain nightly-2024-02-01-x86_64-unknown-linux-gnu
rustup component add clippy --toolchain nightly-2024-02-01-x86_64-unknown-linux-gnu
cargo install cargo-udeps --locked
cargo install cargo-sort
shell: bash
Expand All @@ -63,6 +63,6 @@ runs:
run: |
rustc --version;
cargo --version;
cargo +nightly-2023-10-05-x86_64-unknown-linux-gnu clippy --version;
cargo +nightly-2023-10-05-x86_64-unknown-linux-gnu fmt --version;
cargo +nightly-2024-02-01-x86_64-unknown-linux-gnu clippy --version;
cargo +nightly-2024-02-01-x86_64-unknown-linux-gnu fmt --version;
shell: bash
4 changes: 2 additions & 2 deletions .github/workflows/clean_code.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
caller-workflow-name: clippy_and_udeps_check

- name: cargo clippy
run: cargo +nightly-2023-10-05-x86_64-unknown-linux-gnu clippy --all-targets
run: cargo +nightly-2024-02-01-x86_64-unknown-linux-gnu clippy --all-targets

- name: cargo udeps
run: cargo +nightly-2023-10-05-x86_64-unknown-linux-gnu udeps --locked
run: cargo +nightly-2024-02-01-x86_64-unknown-linux-gnu udeps --locked
83 changes: 45 additions & 38 deletions block_engine/src/block_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
thread,
thread::{Builder, JoinHandle},
time::{Duration, Instant, SystemTime},
};
Expand Down Expand Up @@ -53,6 +52,11 @@ use tonic::{

use crate::block_engine_stats::BlockEngineStats;

pub struct BlockEngineConfig {
pub block_engine_url: String,
pub auth_service_url: String,
}

#[derive(Clone)]
struct AuthInterceptor {
access_token: Arc<Mutex<Token>>,
Expand Down Expand Up @@ -95,16 +99,15 @@ pub type BlockEngineResult<T> = Result<T, BlockEngineError>;

/// Attempts to maintain a connection to a Block Engine and forward packets to it
pub struct BlockEngineRelayerHandler {
block_engine_forwarder: JoinHandle<()>,
block_engine_forwarder: Option<JoinHandle<()>>,
}

impl BlockEngineRelayerHandler {
const BLOCK_ENGINE_PACKET_QUEUE_CAPACITY: usize = 1_000;

#[allow(clippy::too_many_arguments)]
pub fn new(
block_engine_url: String,
auth_service_url: String,
block_engine_config: Option<BlockEngineConfig>,
mut block_engine_receiver: Receiver<BlockEnginePackets>,
keypair: Arc<Keypair>,
exit: Arc<AtomicBool>,
Expand All @@ -114,46 +117,50 @@ impl BlockEngineRelayerHandler {
ofac_addresses: HashSet<Pubkey>,
) -> BlockEngineRelayerHandler {
let is_connected_to_block_engine = is_connected_to_block_engine.clone();
let block_engine_forwarder = Builder::new()
.name("block_engine_relayer_handler_thread".into())
.spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
while !exit.load(Ordering::Relaxed) {
let result = Self::auth_and_connect(
&block_engine_url,
&auth_service_url,
&mut block_engine_receiver,
&keypair,
&exit,
aoi_cache_ttl_s,
&address_lookup_table_cache,
&is_connected_to_block_engine,
&ofac_addresses,
)
.await;
is_connected_to_block_engine.store(false, Ordering::Relaxed);

if let Err(e) = result {
error!("error authenticating and connecting: {:?}", e);
datapoint_error!("block_engine_relayer-error",
"block_engine_url" => block_engine_url,
"auth_service_url" => auth_service_url,
("error", e.to_string(), String)
);
sleep(Duration::from_secs(2)).await;
let block_engine_forwarder = block_engine_config.map(|config| {
Builder::new()
.name("block_engine_relayer_handler_thread".into())
.spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
while !exit.load(Ordering::Relaxed) {
let result = Self::auth_and_connect(
&config.block_engine_url,
&config.auth_service_url,
&mut block_engine_receiver,
&keypair,
&exit,
aoi_cache_ttl_s,
&address_lookup_table_cache,
&is_connected_to_block_engine,
&ofac_addresses,
)
.await;
is_connected_to_block_engine.store(false, Ordering::Relaxed);

if let Err(e) = result {
error!("error authenticating and connecting: {:?}", e);
datapoint_error!("block_engine_relayer-error",
"block_engine_url" => &config.block_engine_url,
"auth_service_url" => &config.auth_service_url,
("error", e.to_string(), String)
);
sleep(Duration::from_secs(2)).await;
}
}
}
});
})
.unwrap();
});
})
.unwrap()
});
BlockEngineRelayerHandler {
block_engine_forwarder,
}
}

pub fn join(self) -> thread::Result<()> {
self.block_engine_forwarder.join()
pub fn join(self) {
if let Some(forwarder) = self.block_engine_forwarder {
forwarder.join().unwrap()
}
}

/// Relayers are whitelisted in the block engine. In order to auth, a challenge-response handshake
Expand Down
10 changes: 4 additions & 6 deletions core/src/ofac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,9 @@ fn is_ofac_address_in_lookup_table(

#[cfg(test)]
mod tests {
use std::{collections::HashSet, sync::Arc, time::Duration};
use std::collections::HashSet;

use crossbeam_channel::unbounded;
use dashmap::DashMap;
use solana_perf::packet::PacketBatch;
use solana_sdk::{
address_lookup_table_account::AddressLookupTableAccount,
hash::Hash,
Expand Down Expand Up @@ -167,7 +165,7 @@ mod tests {

let lookup_table_pubkey = Pubkey::new_unique();
let lookup_table = AddressLookupTableAccount {
key: lookup_table_pubkey.clone(),
key: lookup_table_pubkey,
addresses: vec![ofac_pubkey, Pubkey::new_unique()],
};

Expand Down Expand Up @@ -281,7 +279,7 @@ mod tests {
Hash::default(),
);
let random_tx = VersionedTransaction::from(random_tx);
let random_packet = Packet::from_data(None, &random_tx).expect("can create packet");
let random_packet = Packet::from_data(None, random_tx).expect("can create packet");

let ofac_tx = Transaction::new_signed_with_payer(
&[Instruction::new_with_bytes(
Expand All @@ -298,7 +296,7 @@ mod tests {
Hash::default(),
);
let ofac_tx = VersionedTransaction::from(ofac_tx);
let ofac_packet = Packet::from_data(None, &ofac_tx).expect("can create packet");
let ofac_packet = Packet::from_data(None, ofac_tx).expect("can create packet");

assert!(!is_tx_ofac_related(
&random_packet.deserialize_slice(..).unwrap(),
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "1.73.0"
channel = "1.76.0"
38 changes: 21 additions & 17 deletions transaction-relayer/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub fn start_forward_and_delay_thread(
packet_delay_ms: u32,
block_engine_sender: tokio::sync::mpsc::Sender<BlockEnginePackets>,
num_threads: u64,
disable_mempool: bool,
exit: &Arc<AtomicBool>,
) -> Vec<JoinHandle<()>> {
const SLEEP_DURATION: Duration = Duration::from_millis(5);
Expand Down Expand Up @@ -77,23 +78,26 @@ pub fn start_forward_and_delay_thread(

// try_send because the block engine receiver only drains when it's connected
// and we don't want to OOM on packet_receiver
match block_engine_sender.try_send(BlockEnginePackets {
banking_packet_batch: banking_packet_batch.clone(),
stamp: system_time,
expiration: packet_delay_ms,
}) {
Ok(_) => {
forwarder_metrics.num_be_packets_forwarded += num_packets;
}
Err(TrySendError::Closed(_)) => {
panic!(
"error sending packet batch to block engine handler"
);
}
Err(TrySendError::Full(_)) => {
// block engine most likely not connected
forwarder_metrics.num_be_packets_dropped += num_packets;
forwarder_metrics.num_be_sender_full += 1;
if !disable_mempool {
match block_engine_sender.try_send(BlockEnginePackets {
banking_packet_batch: banking_packet_batch.clone(),
stamp: system_time,
expiration: packet_delay_ms,
}) {
Ok(_) => {
forwarder_metrics.num_be_packets_forwarded +=
num_packets;
}
Err(TrySendError::Closed(_)) => {
panic!(
"error sending packet batch to block engine handler"
);
}
Err(TrySendError::Full(_)) => {
// block engine most likely not connected
forwarder_metrics.num_be_packets_dropped += num_packets;
forwarder_metrics.num_be_sender_full += 1;
}
}
}
buffered_packet_batches.push_back(RelayerPacketBatches {
Expand Down
29 changes: 23 additions & 6 deletions transaction-relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use clap::Parser;
use crossbeam_channel::tick;
use dashmap::DashMap;
use env_logger::Env;
use jito_block_engine::block_engine::BlockEngineRelayerHandler;
use jito_block_engine::block_engine::{BlockEngineConfig, BlockEngineRelayerHandler};
use jito_core::{
graceful_panic,
tpu::{Tpu, TpuSockets},
Expand Down Expand Up @@ -50,6 +50,8 @@ use tikv_jemallocator::Jemalloc;
use tokio::{runtime::Builder, signal, sync::mpsc::channel};
use tonic::transport::Server;

// no-op change to test ci

#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

Expand Down Expand Up @@ -117,7 +119,7 @@ struct Args {
/// Address for Jito Block Engine.
/// See https://jito-labs.gitbook.io/mev/searcher-resources/block-engine#connection-details
#[arg(long, env)]
block_engine_url: String,
block_engine_url: Option<String>,

/// Manual override for authentication service address of the block-engine.
/// Defaults to `--block-engine-url`
Expand Down Expand Up @@ -194,6 +196,10 @@ struct Args {
/// Number of packets to send in each packet batch to the validator
#[arg(long, env, default_value_t = 4)]
validator_packet_batch_size: usize,

/// Disable Mempool forwarding
#[arg(long, env, default_value_t = false)]
disable_mempool: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -356,14 +362,25 @@ fn main() {
args.packet_delay_ms,
block_engine_sender,
1,
args.disable_mempool,
&exit,
);

let is_connected_to_block_engine = Arc::new(AtomicBool::new(false));
let block_engine_config = if !args.disable_mempool && args.block_engine_url.is_some() {
let block_engine_url = args.block_engine_url.unwrap();
let auth_service_url = args
.block_engine_auth_service_url
.unwrap_or(block_engine_url.clone());
Some(BlockEngineConfig {
block_engine_url,
auth_service_url,
})
} else {
None
};
let block_engine_forwarder = BlockEngineRelayerHandler::new(
args.block_engine_url.clone(),
args.block_engine_auth_service_url
.unwrap_or(args.block_engine_url),
block_engine_config,
block_engine_receiver,
keypair,
exit.clone(),
Expand Down Expand Up @@ -479,7 +496,7 @@ fn main() {
t.join().unwrap();
}
lookup_table_refresher.join().unwrap();
block_engine_forwarder.join().unwrap();
block_engine_forwarder.join();
}

pub async fn shutdown_signal(exit: Arc<AtomicBool>) {
Expand Down

0 comments on commit 856df68

Please sign in to comment.