Skip to content

Commit

Permalink
check is_forwarded packet earlier (solana-labs#28159)
Browse files Browse the repository at this point in the history
* check and filter is_forwarded packet earlier

* review fix: renaming; and rebase
  • Loading branch information
tao-stones authored and gnapoli23 committed Dec 16, 2022
1 parent 0e32c9f commit 5ef5dac
Showing 1 changed file with 245 additions and 92 deletions.
337 changes: 245 additions & 92 deletions core/src/unprocessed_transaction_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,97 +440,99 @@ impl ThreadLocalUnprocessedPackets {
.chunks(batch_size)
.into_iter()
.flat_map(|packets_to_process| {
let packets_to_process = packets_to_process.into_iter().collect_vec();

// Vec<bool> of same size of `packets_to_process`, each indicates
// corresponding packet is tracer packet.
let tracer_packet_indexes = packets_to_process
.iter()
.map(|deserialized_packet| {
deserialized_packet
.original_packet()
.meta
.is_tracer_packet()
})
.collect::<Vec<_>>();
saturating_add_assign!(
total_tracer_packets_in_buffer,
tracer_packet_indexes
.iter()
.filter(|is_tracer| **is_tracer)
.count()
);

if accepting_packets {
let (
(sanitized_transactions, transaction_to_packet_indexes),
packet_conversion_time,
): ((Vec<SanitizedTransaction>, Vec<usize>), _) = measure!(
self.sanitize_unforwarded_packets(&packets_to_process, &bank,),
"sanitize_packet",
);
saturating_add_assign!(
total_packet_conversion_us,
packet_conversion_time.as_us()
// Only prcoess packets not yet forwarded
let (forwarded_packets, packets_to_forward, is_tracer_packet) = self
.prepare_packets_to_forward(
packets_to_process,
&mut total_tracer_packets_in_buffer,
);

let (forwardable_transaction_indexes, filter_packets_time) = measure!(
Self::filter_invalid_transactions(&sanitized_transactions, &bank,),
"filter_packets",
);
saturating_add_assign!(
total_filter_packets_us,
filter_packets_time.as_us()
);
[
forwarded_packets,
if accepting_packets {
let (
(sanitized_transactions, transaction_to_packet_indexes),
packet_conversion_time,
): (
(Vec<SanitizedTransaction>, Vec<usize>),
_,
) = measure!(
self.sanitize_unforwarded_packets(&packets_to_forward, &bank),
"sanitize_packet",
);
saturating_add_assign!(
total_packet_conversion_us,
packet_conversion_time.as_us()
);

for forwardable_transaction_index in &forwardable_transaction_indexes {
saturating_add_assign!(total_forwardable_packets, 1);
let forwardable_packet_index =
transaction_to_packet_indexes[*forwardable_transaction_index];
if tracer_packet_indexes[forwardable_packet_index] {
saturating_add_assign!(total_forwardable_tracer_packets, 1);
}
}
let (forwardable_transaction_indexes, filter_packets_time) = measure!(
Self::filter_invalid_transactions(&sanitized_transactions, &bank),
"filter_packets",
);
saturating_add_assign!(
total_filter_packets_us,
filter_packets_time.as_us()
);

let accepted_packet_indexes = Self::add_filtered_packets_to_forward_buffer(
forward_buffer,
&packets_to_process,
&sanitized_transactions,
&transaction_to_packet_indexes,
&forwardable_transaction_indexes,
&mut dropped_tx_before_forwarding_count,
);
accepting_packets =
accepted_packet_indexes.len() == forwardable_transaction_indexes.len();
for forwardable_transaction_index in &forwardable_transaction_indexes {
saturating_add_assign!(total_forwardable_packets, 1);
let forwardable_packet_index =
transaction_to_packet_indexes[*forwardable_transaction_index];
if is_tracer_packet[forwardable_packet_index] {
saturating_add_assign!(total_forwardable_tracer_packets, 1);
}
}

self.unprocessed_packet_batches
.mark_accepted_packets_as_forwarded(
&packets_to_process,
&accepted_packet_indexes,
let accepted_packet_indexes =
Self::add_filtered_packets_to_forward_buffer(
forward_buffer,
&packets_to_forward,
&sanitized_transactions,
&transaction_to_packet_indexes,
&forwardable_transaction_indexes,
&mut dropped_tx_before_forwarding_count,
);
accepting_packets = accepted_packet_indexes.len()
== forwardable_transaction_indexes.len();

self.unprocessed_packet_batches
.mark_accepted_packets_as_forwarded(
&packets_to_forward,
&accepted_packet_indexes,
);

self.collect_retained_packets(
&packets_to_forward,
&Self::prepare_filtered_packet_indexes(
&transaction_to_packet_indexes,
&forwardable_transaction_indexes,
),
)
} else {
// skip sanitizing and filtering if not longer able to add more packets for forwarding
saturating_add_assign!(
dropped_tx_before_forwarding_count,
packets_to_forward.len()
);

self.collect_retained_packets(
&packets_to_process,
&Self::prepare_filtered_packet_indexes(
&transaction_to_packet_indexes,
&forwardable_transaction_indexes,
),
)
} else {
// skip sanitizing and filtering if not longer able to add more packets for forwarding
saturating_add_assign!(
dropped_tx_before_forwarding_count,
packets_to_process.len()
);
packets_to_process
}
packets_to_forward
},
]
.concat()
}),
);

// replace packet priority queue
self.unprocessed_packet_batches.packet_priority_queue = new_priority_queue;
self.verify_priority_queue(original_capacity);

// Assert unprocessed queue is still consistent
assert_eq!(
self.unprocessed_packet_batches.packet_priority_queue.len(),
self.unprocessed_packet_batches
.message_hash_to_transaction
.len()
);

inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
dropped_tx_before_forwarding_count
Expand Down Expand Up @@ -582,20 +584,13 @@ impl ThreadLocalUnprocessedPackets {
deserialized_packets
.enumerate()
.filter_map(|(packet_index, deserialized_packet)| {
if !self
.unprocessed_packet_batches
.is_forwarded(deserialized_packet)
{
deserialized_packet
.build_sanitized_transaction(
&bank.feature_set,
bank.vote_only_bank(),
bank.as_ref(),
)
.map(|transaction| (transaction, packet_index))
} else {
None
}
deserialized_packet
.build_sanitized_transaction(
&bank.feature_set,
bank.vote_only_bank(),
bank.as_ref(),
)
.map(|transaction| (transaction, packet_index))
})
.unzip();

Expand Down Expand Up @@ -746,6 +741,45 @@ impl ThreadLocalUnprocessedPackets {
self.unprocessed_packet_batches.packet_priority_queue = new_retryable_packets;
self.verify_priority_queue(original_capacity);
}

/// Prepare a chunk of packets for forwarding, filter out already forwarded packets while
/// counting tracers.
/// Returns Vec of unforwarded packets, and Vec<bool> of same size each indicates corresponding
/// packet is tracer packet.
fn prepare_packets_to_forward(
&self,
packets_to_forward: impl Iterator<Item = Arc<ImmutableDeserializedPacket>>,
total_tracer_packets_in_buffer: &mut usize,
) -> (
Vec<Arc<ImmutableDeserializedPacket>>,
Vec<Arc<ImmutableDeserializedPacket>>,
Vec<bool>,
) {
let mut forwarded_packets: Vec<Arc<ImmutableDeserializedPacket>> = vec![];
let (forwardable_packets, is_tracer_packet) = packets_to_forward
.into_iter()
.filter_map(|immutable_deserialized_packet| {
let is_tracer_packet = immutable_deserialized_packet
.original_packet()
.meta
.is_tracer_packet();
if is_tracer_packet {
saturating_add_assign!(*total_tracer_packets_in_buffer, 1);
}
if !self
.unprocessed_packet_batches
.is_forwarded(&immutable_deserialized_packet)
{
Some((immutable_deserialized_packet, is_tracer_packet))
} else {
forwarded_packets.push(immutable_deserialized_packet);
None
}
})
.unzip();

(forwarded_packets, forwardable_packets, is_tracer_packet)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1026,4 +1060,123 @@ mod tests {
}
Ok(())
}

#[test]
fn test_prepare_packets_to_forward() {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10);

let simple_transactions: Vec<Transaction> = (0..256)
.map(|_id| {
// packets are deserialized upon receiving, failed packets will not be
// forwarded; Therefore we need to create real packets here.
let key1 = Keypair::new();
system_transaction::transfer(
&mint_keypair,
&key1.pubkey(),
genesis_config.rent.minimum_balance(0),
genesis_config.hash(),
)
})
.collect_vec();

let mut packets: Vec<DeserializedPacket> = simple_transactions
.iter()
.enumerate()
.map(|(packets_id, transaction)| {
let mut p = Packet::from_data(None, transaction).unwrap();
p.meta.port = packets_id as u16;
p.meta.set_tracer(true);
DeserializedPacket::new(p).unwrap()
})
.collect_vec();

// test preparing buffered packets for forwarding
let test_prepareing_buffered_packets_for_forwarding =
|buffered_packet_batches: UnprocessedPacketBatches| -> (usize, usize, usize) {
let mut total_tracer_packets_in_buffer: usize = 0;
let mut total_packets_to_forward: usize = 0;
let mut total_tracer_packets_to_forward: usize = 0;

let mut unprocessed_transactions = ThreadLocalUnprocessedPackets {
unprocessed_packet_batches: buffered_packet_batches,
thread_type: ThreadType::Transactions,
};

let mut original_priority_queue = unprocessed_transactions.take_priority_queue();
let _ = original_priority_queue
.drain_desc()
.chunks(128usize)
.into_iter()
.flat_map(|packets_to_process| {
let (_, packets_to_forward, is_tracer_packet) = unprocessed_transactions
.prepare_packets_to_forward(
packets_to_process,
&mut total_tracer_packets_in_buffer,
);
total_packets_to_forward += packets_to_forward.len();
total_tracer_packets_to_forward += is_tracer_packet.len();
packets_to_forward
})
.collect::<MinMaxHeap<Arc<ImmutableDeserializedPacket>>>();
(
total_tracer_packets_in_buffer,
total_packets_to_forward,
total_tracer_packets_to_forward,
)
};

// all tracer packets are forwardable
{
let buffered_packet_batches: UnprocessedPacketBatches =
UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len());
let (
total_tracer_packets_in_buffer,
total_packets_to_forward,
total_tracer_packets_to_forward,
) = test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches);
assert_eq!(total_tracer_packets_in_buffer, 256);
assert_eq!(total_packets_to_forward, 256);
assert_eq!(total_tracer_packets_to_forward, 256);
}

// some packets are forwarded
{
let num_already_forwarded = 16;
for packet in &mut packets[0..num_already_forwarded] {
packet.forwarded = true;
}
let buffered_packet_batches: UnprocessedPacketBatches =
UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len());
let (
total_tracer_packets_in_buffer,
total_packets_to_forward,
total_tracer_packets_to_forward,
) = test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches);
assert_eq!(total_tracer_packets_in_buffer, 256);
assert_eq!(total_packets_to_forward, 256 - num_already_forwarded);
assert_eq!(total_tracer_packets_to_forward, 256 - num_already_forwarded);
}

// all packets are forwarded
{
for packet in &mut packets {
packet.forwarded = true;
}
let buffered_packet_batches: UnprocessedPacketBatches =
UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len());
let (
total_tracer_packets_in_buffer,
total_packets_to_forward,
total_tracer_packets_to_forward,
) = test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches);
assert_eq!(total_tracer_packets_in_buffer, 256);
assert_eq!(total_packets_to_forward, 0);
assert_eq!(total_tracer_packets_to_forward, 0);
}
}
}

0 comments on commit 5ef5dac

Please sign in to comment.