Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Less transport panics #31

Merged
merged 10 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
- Updated `wtransport` to `0.5.0`, `xwt-wtransport` and `xwt-web`
- Made integer/usize casts in `aeronet_transport` more safe and explicit
- Replace `LaneIndex(n)` with `LaneIndex::new(n)` (works in `const`)
- Made errors more explicit when using `TransportSend::push`
- `TransportSend::push` returns `Result<.., TransportSendError>`, but you can ignore the error

# 0.11.0

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/aeronet_replicon/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ fn flush(
for (channel_id, msg) in replicon_client.drain_sent() {
let lane_index = convert::to_lane_index(channel_id);
for mut transport in &mut clients {
transport.send.push(lane_index, msg.clone(), now);
_ = transport.send.push(lane_index, msg.clone(), now);
}
}
}
2 changes: 1 addition & 1 deletion crates/aeronet_replicon/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,6 @@ fn flush(mut replicon_server: ResMut<RepliconServer>, mut clients: Query<&mut Tr
};
let lane_index = convert::to_lane_index(channel_id);

transport.send.push(lane_index, msg, now);
_ = transport.send.push(lane_index, msg, now);
}
}
1 change: 1 addition & 0 deletions crates/aeronet_transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ document-features = [
aeronet_io = { workspace = true }

ahash = { workspace = true }
anyhow = { workspace = true }
arbitrary = { workspace = true }
bitvec = { workspace = true }
derive_more = { workspace = true }
Expand Down
48 changes: 34 additions & 14 deletions crates/aeronet_transport/src/frag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ use {
/// efficient as the receiver will have to resize its buffer when re-receiving
/// `C` later, but the logic will still behave correctly.
///
/// # Errors
///
/// Errors if the message is too big, and will be split into more than
/// [`MinSize::MAX`] number of fragments. Realistically, you should never run
/// into this as long as your messages are of reasonable length.
///
/// # Panics
///
/// Panics if `max_frag_len == 0`.
Expand All @@ -50,20 +56,25 @@ use {
pub fn split(
max_frag_len: usize,
msg: Bytes,
) -> impl ExactSizeIterator<Item = (FragmentPosition, Bytes)> + DoubleEndedIterator + FusedIterator
{
) -> Result<
impl ExactSizeIterator<Item = (FragmentPosition, Bytes)> + DoubleEndedIterator + FusedIterator,
MessageTooBig,
> {
assert!(max_frag_len > 0);

let msg_len = msg.len();
let byte_len = msg.len();
let iter = msg.byte_chunks(max_frag_len);
let num_frags = iter.len();
iter.enumerate().rev().map(move |(index, payload)| {
// do this inside the iterator, since we now know
// that we have at least at least 1 item in this iterator.
// if we did this outside the iterator, and `num_frags` was 0,
// `num_frags - 1` would underflow.
let last_index = num_frags - 1;

let last_index = num_frags.saturating_sub(1);
if MinSize::try_from(last_index).is_err() {
return Err(MessageTooBig {
byte_len,
num_frags,
});
}

Ok(iter.enumerate().rev().map(move |(index, payload)| {
let position = if index == last_index {
MinSize::try_from(index)
.ok()
Expand All @@ -73,12 +84,21 @@ pub fn split(
.ok()
.and_then(FragmentPosition::non_last)
}
.unwrap_or_else(|| {
panic!("too many fragments - msg length: {msg_len}, num frags: {num_frags}");
});
.expect("we check above that there should not be more than `MinSize::MAX` fragments");

(position, payload)
})
}))
}

/// Passed a message to [`split`] which was too long in length, and cannot be
/// represented in [`MinSize::MAX`] number of fragments.
#[derive(Debug, Clone, Display, Error, TypeSize)]
#[display("message too big - byte length: {byte_len}, num frags: {num_frags} / {}", MinSize::MAX.0)]
pub struct MessageTooBig {
/// How long the message is, in bytes.
pub byte_len: usize,
/// How many fragments this message would take up.
pub num_frags: usize,
}

/// Receives fragments created by [`split`] and reassembles them into full
Expand Down Expand Up @@ -319,7 +339,7 @@ mod tests {
let max_frag_len = 8;
let msg = Bytes::from_static(b"hello world! goodbye woorld!");

let mut iter = split(max_frag_len, msg);
let mut iter = split(max_frag_len, msg).unwrap();

let mut recv = FragmentReceiver::default();
let mem_left = 30;
Expand Down
1 change: 1 addition & 0 deletions crates/aeronet_transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl Plugin for AeronetTransportPlugin {
recv::clear_buffers.before(TransportSet::Poll),
(
recv::poll,
send::disconnect_errored,
send::update_send_bytes_config,
send::refill_send_bytes,
check_memory_limit,
Expand Down
122 changes: 84 additions & 38 deletions crates/aeronet_transport/src/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

use {
crate::{
FlushedPacket, FragmentPath, MessageKey, Transport, TransportConfig, frag,
FlushedPacket, FragmentPath, MessageKey, Transport, TransportConfig,
frag::{self, MessageTooBig},
lane::{LaneIndex, LaneKind, LaneReliability},
limit::{Limit, TokenBucket},
min_size::MinSize,
Expand All @@ -12,11 +13,15 @@ use {
},
rtt::RttEstimator,
},
aeronet_io::Session,
aeronet_io::{
Session,
connection::{DisconnectReason, Disconnected},
},
ahash::HashMap,
bevy_ecs::prelude::*,
bevy_time::{Real, Time},
core::iter,
derive_more::derive::{Display, Error, From},
octs::{Bytes, EncodeLen, Write},
std::collections::hash_map::Entry,
tracing::{trace, trace_span},
Expand All @@ -31,7 +36,7 @@ pub struct TransportSend {
pub(crate) lanes: Box<[SendLane]>,
bytes_bucket: TokenBucket,
next_packet_seq: PacketSeq,
too_many_msgs: bool,
error: Option<TransportSendError>,
}

/// State of a lane used for sending outgoing messages on a [`Transport`].
Expand Down Expand Up @@ -74,7 +79,7 @@ impl TransportSend {
.collect(),
bytes_bucket: TokenBucket::new(0),
next_packet_seq: PacketSeq::default(),
too_many_msgs: false,
error: None,
}
}

Expand All @@ -101,14 +106,21 @@ impl TransportSend {
/// [`TransportRecv::acks`], you can compare message keys to tell if the
/// message you are pushing right now was the one that was acknowledged.
///
/// [^1]: See [`MessageKey`] for uniqueness guarantees.
///
/// # Errors
///
/// If the message could not be enqueued (if e.g. there are already too many
/// messages buffered for sending), this returns [`None`], and the transport
/// messages buffered for sending), this returns [`Err`], and the transport
/// will be forcibly disconnected on the next update. This is considered a
/// fatal connection condition, because you may have sent a message along a
/// reliable lane, and those [`LaneKind`]s provide strong guarantees that
/// messages will be received by the peer.
///
/// [^1]: See [`MessageKey`] for uniqueness guarantees.
/// Normally, errors should not happen when pushing messages, so if an error
/// does occur, it should be treated as fatal. Feel free to ignore the error
/// if you don't want to handle it in any special way - the session will
/// automatically disconnect anyway.
///
/// # Panics
///
Expand Down Expand Up @@ -147,36 +159,58 @@ impl TransportSend {
/// ```
///
/// [`TransportRecv::acks`]: crate::recv::TransportRecv::acks
pub fn push(&mut self, lane_index: LaneIndex, msg: Bytes, now: Instant) -> Option<MessageKey> {
let lane = &mut self.lanes[usize::from(lane_index.0)];
let msg_seq = lane.next_msg_seq;
let Entry::Vacant(entry) = lane.sent_msgs.entry(msg_seq) else {
self.too_many_msgs = true;
return None;
};
pub fn push(
&mut self,
lane_index: LaneIndex,
msg: Bytes,
now: Instant,
) -> Result<MessageKey, TransportSendError> {
let result = (|| {
let lane = &mut self.lanes[usize::from(lane_index.0)];
let msg_seq = lane.next_msg_seq;
let Entry::Vacant(entry) = lane.sent_msgs.entry(msg_seq) else {
return Err(TransportSendError::TooManyMessages);
};

let frags = frag::split(self.max_frag_len, msg);
entry.insert(SentMessage {
frags: frags
.map(|(position, payload)| {
Some(SentFragment {
position,
payload,
sent_at: now,
next_flush_at: now,
let frags = frag::split(self.max_frag_len, msg)?;
entry.insert(SentMessage {
frags: frags
.map(|(position, payload)| {
Some(SentFragment {
position,
payload,
sent_at: now,
next_flush_at: now,
})
})
})
.collect(),
});
.collect(),
});

lane.next_msg_seq += MessageSeq::new(1);
Ok(MessageKey {
lane: lane_index,
seq: msg_seq,
})
})();

lane.next_msg_seq += MessageSeq::new(1);
Some(MessageKey {
lane: lane_index,
seq: msg_seq,
})
if let Err(err) = &result {
self.error = Some(err.clone());
}
result
}
}

/// Failed to enqueue a message for sending using [`TransportSend::push`].
#[derive(Debug, Clone, Display, Error, From, TypeSize)]
pub enum TransportSendError {
/// Too many messages were already buffered for sending, and we would be
/// overwriting the sequence number of an existing message.
#[display("too many buffered messages")]
TooManyMessages,
/// Message was too big to enqueue for sending.
MessageTooBig(MessageTooBig),
}

impl SendLane {
/// Gets what kind of lane this state represents.
#[must_use]
Expand Down Expand Up @@ -206,23 +240,35 @@ pub(crate) fn update_send_bytes_config(
}
}

pub(crate) fn refill_send_bytes(time: Res<Time<Real>>, mut sessions: Query<&mut Transport>) {
pub(crate) fn disconnect_errored(mut sessions: Query<&mut Transport>, mut commands: Commands) {
for mut transport in &mut sessions {
if let Some(err) = transport.send.error.take() {
commands.trigger(Disconnected {
reason: DisconnectReason::Error(anyhow::Error::new(err)),
});
}
}
}

pub(crate) fn refill_send_bytes(time: Res<Time<Real>>, mut sessions: Query<&mut Transport>) {
sessions.par_iter_mut().for_each(|mut transport| {
transport
.send
.bytes_bucket
.refill_portion(time.delta_secs_f64());
}
});
}

pub(crate) fn flush(mut sessions: Query<(&mut Session, &mut Transport)>) {
let now = Instant::now();
for (mut session, mut transport) in &mut sessions {
let packet_mtu = session.mtu();
session
.send
.extend(flush_on(&mut transport, now, packet_mtu));
}
sessions
.par_iter_mut()
.for_each(|(mut session, mut transport)| {
let packet_mtu = session.mtu();
session
.send
.extend(flush_on(&mut transport, now, packet_mtu));
});
}

/// Exposes `flush_on` for fuzz tests.
Expand Down