Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

parachain-system: update RelevantMessagingState according to the unincluded segment #2948

Merged
merged 11 commits into from
Aug 2, 2023
2 changes: 1 addition & 1 deletion pallets/parachain-system/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,4 @@ runtime-benchmarks = [

try-runtime = ["frame-support/try-runtime"]

parameterized-consensus-hook = []
parameterized-consensus-hook = []
95 changes: 74 additions & 21 deletions pallets/parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,12 @@ pub mod pallet {
return
},
};
let relevant_messaging_state = match Self::relevant_messaging_state() {
Some(ok) => ok,

// Before updating the relevant messaging state, we need to extract
// the total bandwidth limits for the purpose of updating the unincluded
// segment.
let total_bandwidth_out = match Self::relevant_messaging_state() {
Some(s) => OutboundBandwidthLimits::from_relay_chain_state(&s),
None => {
debug_assert!(
false,
Expand All @@ -252,22 +256,28 @@ pub mod pallet {
},
};

let total_bandwidth_out =
OutboundBandwidthLimits::from_relay_chain_state(&relevant_messaging_state);
let bandwidth_out = AggregatedUnincludedSegment::<T>::get().map(|segment| {
let mut bandwidth_out = total_bandwidth_out.clone();
bandwidth_out.subtract(segment.used_bandwidth());
bandwidth_out
});
// After this point, the `RelevantMessagingState` in storage reflects the
// unincluded segment.
Self::adjust_egress_bandwidth_limits();

let (ump_msg_count, ump_total_bytes) = <PendingUpwardMessages<T>>::mutate(|up| {
let bandwidth_out = bandwidth_out.as_ref().unwrap_or(&total_bandwidth_out);
let available_capacity = cmp::min(
bandwidth_out.ump_messages_remaining,
host_config.max_upward_message_num_per_candidate,
);

let available_size = bandwidth_out.ump_bytes_remaining;
let (available_capacity, available_size) = match Self::relevant_messaging_state() {
Some(limits) => (
limits.relay_dispatch_queue_remaining_capacity.remaining_count,
limits.relay_dispatch_queue_remaining_capacity.remaining_size,
),
None => {
debug_assert!(
false,
"relevant messaging state is promised to be set until `on_finalize`; \
qed",
);
rphmeier marked this conversation as resolved.
Show resolved Hide resolved
return (0, 0)
},
};

let available_capacity =
cmp::min(available_capacity, host_config.max_upward_message_num_per_candidate);

// Count the number of messages we can possibly fit in the given constraints, i.e.
// available_capacity and available_size.
Expand Down Expand Up @@ -312,8 +322,9 @@ pub mod pallet {
.hrmp_max_message_num_per_candidate
.min(<AnnouncedHrmpMessagesPerCandidate<T>>::take()) as usize;

// TODO [now]: the `ChannelInfo` implementation for this pallet is what's
// important here for proper limiting.
// Note: this internally calls the `GetChannelInfo` implementation for this
// pallet, which draws on the `RelevantMessagingState`. That in turn has
// been adjusted above to reflect the correct limits in all channels.
let outbound_messages =
T::OutboundXcmpMessageSource::take_outbound_messages(maximum_channels)
.into_iter()
Expand Down Expand Up @@ -351,9 +362,7 @@ pub mod pallet {
let watermark = HrmpWatermark::<T>::get();
let watermark_update =
HrmpWatermarkUpdate::new(watermark, LastRelayChainBlockNumber::<T>::get());
// TODO: In order of this panic to be correct, outbound message source should
// respect bandwidth limits as well.
// <https://github.com/paritytech/cumulus/issues/2471>

aggregated_segment
.append(&ancestor, watermark_update, &total_bandwidth_out)
.expect("unincluded segment limits exceeded");
Expand Down Expand Up @@ -431,6 +440,9 @@ pub mod pallet {
4 + hrmp_max_message_num_per_candidate as u64,
);

// Weight for adjusting the unincluded segment in `on_finalize`.
weight += T::DbWeight::get().reads_writes(6, 3);

// Always try to read `UpgradeGoAhead` in `on_finalize`.
weight += T::DbWeight::get().reads(1);

Expand Down Expand Up @@ -557,6 +569,7 @@ pub mod pallet {
let host_config = relay_state_proof
.read_abridged_host_configuration()
.expect("Invalid host configuration in relay chain state proof");

let relevant_messaging_state = relay_state_proof
.read_messaging_state_snapshot(&host_config)
.expect("Invalid messaging state in relay chain state proof");
Expand Down Expand Up @@ -1227,6 +1240,46 @@ impl<T: Config> Pallet<T> {
weight_used
}

/// This adjusts the `RelevantMessagingState` according to the bandwidth limits in the
/// unincluded segment.
//
// Reads: 2
// Writes: 1
fn adjust_egress_bandwidth_limits() {
let unincluded_segment = match AggregatedUnincludedSegment::<T>::get() {
None => return,
Some(s) => s,
};

<RelevantMessagingState<T>>::mutate(|messaging_state| {
let messaging_state = match messaging_state {
None => return,
Some(s) => s,
};

let used_bandwidth = unincluded_segment.used_bandwidth();

let channels = &mut messaging_state.egress_channels;
for (para_id, used) in used_bandwidth.hrmp_outgoing.iter() {
let i = match channels.binary_search_by_key(para_id, |item| item.0) {
Ok(i) => i,
Err(_) => continue, // indicates channel closed.
};

let c = &mut channels[i].1;

c.total_size = (c.total_size + used.total_bytes).min(c.max_total_size);
c.msg_count = (c.msg_count + used.msg_count).min(c.max_capacity);
}

let upward_capacity = &mut messaging_state.relay_dispatch_queue_remaining_capacity;
upward_capacity.remaining_count =
upward_capacity.remaining_count.saturating_sub(used_bandwidth.ump_msg_count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sanity checking, do we want to be subtracting the entire used_bandwidth of the segment here each time?

Copy link
Contributor Author

@rphmeier rphmeier Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. What we read from the relay-chain state is the total capacity as-of the most recently included block. We then subtract the used bandwidth in the unincluded segment from it to get the actual remaining channel capacity.

upward_capacity.remaining_size =
upward_capacity.remaining_size.saturating_sub(used_bandwidth.ump_total_bytes);
});
}

/// Put a new validation function into a particular location where polkadot
/// monitors for updates. Calling this function notifies polkadot that a new
/// upgrade has been scheduled.
Expand Down
198 changes: 194 additions & 4 deletions pallets/parachain-system/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,28 @@ fn send_message(dest: ParaId, message: Vec<u8>) {
impl XcmpMessageSource for FromThreadLocal {
fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
let mut ids = std::collections::BTreeSet::<ParaId>::new();
let mut taken = 0;
let mut taken_messages = 0;
let mut taken_bytes = 0;
let mut result = Vec::new();
SENT_MESSAGES.with(|ms| {
ms.borrow_mut().retain(|m| {
let status = <Pallet<Test> as GetChannelInfo>::get_channel_status(m.0);
let ready = matches!(status, ChannelStatus::Ready(..));
if ready && !ids.contains(&m.0) && taken < maximum_channels {
let (max_size_now, max_size_ever) = match status {
ChannelStatus::Ready(now, ever) => (now, ever),
ChannelStatus::Closed => return false, // drop message
ChannelStatus::Full => return true, // keep message queued.
};

let msg_len = m.1.len();

if !ids.contains(&m.0) &&
taken_messages < maximum_channels &&
msg_len <= max_size_ever &&
taken_bytes + msg_len <= max_size_now
{
ids.insert(m.0);
taken += 1;
taken_messages += 1;
taken_bytes += msg_len;
result.push(m.clone());
false
} else {
Expand Down Expand Up @@ -435,6 +448,47 @@ fn block_tests_run_on_drop() {
BlockTests::new().add(123, || panic!("if this test passes, block tests run properly"));
}

#[test]
fn test_xcmp_source_keeps_messages() {
let recipient = ParaId::from(400);

CONSENSUS_HOOK.with(|c| {
*c.borrow_mut() = Box::new(|_| (Weight::zero(), NonZeroU32::new(3).unwrap().into()))
});

BlockTests::new()
.with_inclusion_delay(2)
.with_relay_sproof_builder(move |_, block_number, sproof| {
sproof.host_config.hrmp_max_message_num_per_candidate = 10;
let channel = sproof.upsert_outbound_channel(recipient);
channel.max_total_size = 10;
channel.max_message_size = 10;

// Only fit messages starting from 3rd block.
channel.max_capacity = if block_number < 3 { 0 } else { 1 };
})
.add(1, || {})
.add_with_post_test(
2,
move || {
send_message(recipient, b"22".to_vec());
},
move || {
let v = HrmpOutboundMessages::<Test>::get();
assert!(v.is_empty());
},
)
.add_with_post_test(
3,
move || {},
move || {
// Not discarded.
let v = HrmpOutboundMessages::<Test>::get();
assert_eq!(v, vec![OutboundHrmpMessage { recipient, data: b"22".to_vec() }]);
},
);
}

#[test]
fn unincluded_segment_works() {
CONSENSUS_HOOK.with(|c| {
Expand Down Expand Up @@ -679,6 +733,142 @@ fn inherent_processed_messages_are_ignored() {
});
}

#[test]
fn hrmp_outbound_respects_used_bandwidth() {
let recipient = ParaId::from(400);

CONSENSUS_HOOK.with(|c| {
*c.borrow_mut() = Box::new(|_| (Weight::zero(), NonZeroU32::new(3).unwrap().into()))
});

BlockTests::new()
.with_inclusion_delay(2)
.with_relay_sproof_builder(move |_, block_number, sproof| {
sproof.host_config.hrmp_max_message_num_per_candidate = 10;
let channel = sproof.upsert_outbound_channel(recipient);
channel.max_capacity = 2;
channel.max_total_size = 4;

channel.max_message_size = 10;

// states:
// [relay_chain][unincluded_segment] + [message_queue]
// 2: []["2"] + ["2222"]
// 3: []["2", "3"] + ["2222"]
// 4: []["2", "3"] + ["2222", "444", "4"]
// 5: ["2"]["3"] + ["2222", "444", "4"]
// 6: ["2", "3"][] + ["2222", "444", "4"]
// 7: ["3"]["444"] + ["2222", "4"]
// 8: []["444", "4"] + ["2222"]
//
// 2 tests max bytes - there is message space but no byte space.
// 4 tests max capacity - there is byte space but no message space

match block_number {
5 => {
// 2 included.
// one message added
channel.msg_count = 1;
channel.total_size = 1;
},
6 => {
// 3 included.
// one message added
channel.msg_count = 2;
channel.total_size = 2;
},
7 => {
// 4 included.
// one message drained.
channel.msg_count = 1;
channel.total_size = 1;
},
8 => {
// 5 included. no messages added, one drained.
channel.msg_count = 0;
channel.total_size = 0;
},
_ => {
channel.msg_count = 0;
channel.total_size = 0;
},
}
})
.add(1, || {})
.add_with_post_test(
2,
move || {
send_message(recipient, b"2".to_vec());
send_message(recipient, b"2222".to_vec());
},
move || {
let v = HrmpOutboundMessages::<Test>::get();
assert_eq!(v, vec![OutboundHrmpMessage { recipient, data: b"2".to_vec() }]);
},
)
.add_with_post_test(
3,
move || {
send_message(recipient, b"3".to_vec());
},
move || {
let v = HrmpOutboundMessages::<Test>::get();
assert_eq!(v, vec![OutboundHrmpMessage { recipient, data: b"3".to_vec() }]);
},
)
.add_with_post_test(
4,
move || {
send_message(recipient, b"444".to_vec());
send_message(recipient, b"4".to_vec());
},
move || {
// Queue has byte capacity but not message capacity.
let v = HrmpOutboundMessages::<Test>::get();
assert!(v.is_empty());
},
)
.add_with_post_test(
5,
|| {},
move || {
// 1 is included here, channel not drained yet. nothing fits.
let v = HrmpOutboundMessages::<Test>::get();
assert!(v.is_empty());
},
)
.add_with_post_test(
6,
|| {},
move || {
// 2 is included here. channel is totally full.
let v = HrmpOutboundMessages::<Test>::get();
assert!(v.is_empty());
},
)
.add_with_post_test(
7,
|| {},
move || {
// 3 is included here. One message was drained out. The 3-byte message
// finally fits
let v = HrmpOutboundMessages::<Test>::get();
// This line relies on test implementation of [`XcmpMessageSource`].
assert_eq!(v, vec![OutboundHrmpMessage { recipient, data: b"444".to_vec() }]);
},
)
.add_with_post_test(
8,
|| {},
move || {
// 4 is included here. Relay-chain side of the queue is empty,
let v = HrmpOutboundMessages::<Test>::get();
// This line relies on test implementation of [`XcmpMessageSource`].
assert_eq!(v, vec![OutboundHrmpMessage { recipient, data: b"4".to_vec() }]);
},
);
}

#[test]
fn events() {
BlockTests::new()
Expand Down
Loading