Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose ClaimQueue via a runtime api and use it in collation-generation #3580

Merged
merged 13 commits into from
Mar 20, 2024
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

use std::pin::Pin;
use std::{
collections::{BTreeMap, VecDeque},
pin::Pin,
};

use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
use cumulus_relay_chain_rpc_interface::RelayChainRpcClient;
Expand All @@ -25,6 +28,7 @@ use polkadot_primitives::{
async_backing::{AsyncBackingParams, BackingState},
slashing,
vstaging::{ApprovalVotingParams, NodeFeatures},
CoreIndex,
};
use sc_authority_discovery::{AuthorityDiscovery, Error as AuthorityDiscoveryError};
use sc_client_api::AuxStore;
Expand Down Expand Up @@ -442,6 +446,13 @@ impl RuntimeApiSubsystemClient for BlockChainRpcClient {
async fn node_features(&self, at: Hash) -> Result<NodeFeatures, ApiError> {
Ok(self.rpc_client.parachain_host_node_features(at).await?)
}

async fn claim_queue(
&self,
at: Hash,
) -> Result<BTreeMap<CoreIndex, VecDeque<cumulus_primitives_core::ParaId>>, ApiError> {
Ok(self.rpc_client.parachain_host_claim_queue(at).await?)
}
}

#[async_trait::async_trait]
Expand Down
17 changes: 13 additions & 4 deletions cumulus/client/relay-chain-rpc-interface/src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use jsonrpsee::{
};
use serde::de::DeserializeOwned;
use serde_json::Value as JsonValue;
use std::collections::VecDeque;
use tokio::sync::mpsc::Sender as TokioSender;

use parity_scale_codec::{Decode, Encode};
Expand All @@ -34,10 +35,10 @@ use cumulus_primitives_core::{
slashing,
vstaging::{ApprovalVotingParams, NodeFeatures},
BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash,
CommittedCandidateReceipt, CoreState, DisputeState, ExecutorParams, GroupRotationInfo,
Hash as RelayHash, Header as RelayHeader, InboundHrmpMessage, OccupiedCoreAssumption,
PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode,
ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
CommittedCandidateReceipt, CoreIndex, CoreState, DisputeState, ExecutorParams,
GroupRotationInfo, Hash as RelayHash, Header as RelayHeader, InboundHrmpMessage,
OccupiedCoreAssumption, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo,
ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
Expand Down Expand Up @@ -647,6 +648,14 @@ impl RelayChainRpcClient {
.await
}

pub async fn parachain_host_claim_queue(
&self,
at: RelayHash,
) -> Result<BTreeMap<CoreIndex, VecDeque<ParaId>>, RelayChainError> {
self.call_remote_runtime_function("ParachainHost_claim_queue", at, None::<()>)
.await
}

pub async fn validation_code_hash(
&self,
at: RelayHash,
Expand Down
1 change: 1 addition & 0 deletions polkadot/node/collation-generation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ parity-scale-codec = { version = "3.6.1", default-features = false, features = [
polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../primitives/test-helpers" }
assert_matches = "1.4.0"
rstest = "0.18.2"
sp-keyring = { path = "../../../substrate/primitives/keyring" }
74 changes: 64 additions & 10 deletions polkadot/node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,25 @@ use polkadot_node_primitives::{
SubmitCollationParams,
};
use polkadot_node_subsystem::{
messages::{CollationGenerationMessage, CollatorProtocolMessage},
messages::{CollationGenerationMessage, CollatorProtocolMessage, RuntimeApiRequest},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
SubsystemContext, SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::{
request_async_backing_params, request_availability_cores, request_persisted_validation_data,
request_validation_code, request_validation_code_hash, request_validators,
has_required_runtime, request_async_backing_params, request_availability_cores,
request_claim_queue, request_persisted_validation_data, request_validation_code,
request_validation_code_hash, request_validators,
};
use polkadot_primitives::{
collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt,
CollatorPair, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption, PersistedValidationData,
ValidationCodeHash,
CollatorPair, CoreIndex, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption,
PersistedValidationData, ScheduledCore, ValidationCodeHash,
};
use sp_core::crypto::Pair;
use std::sync::Arc;
use std::{
collections::{BTreeMap, VecDeque},
sync::Arc,
};

mod error;

Expand Down Expand Up @@ -223,6 +227,7 @@ async fn handle_new_activations<Context>(
let availability_cores = availability_cores??;
let n_validators = validators??.len();
let async_backing_params = async_backing_params?.ok();
let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent).await?;

for (core_idx, core) in availability_cores.into_iter().enumerate() {
let _availability_core_timer = metrics.time_new_activations_availability_core();
Expand All @@ -239,10 +244,25 @@ async fn handle_new_activations<Context>(
// TODO [now]: this assumes that next up == current.
// in practice we should only set `OccupiedCoreAssumption::Included`
// when the candidate occupying the core is also of the same para.
Comment on lines 244 to 246
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this TODO is being solved by this PR, isn't it? There's also a tracking issue here: #3327

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you are right. But it is fixed thanks to the updated implementation of next_up_on_available?
Or I am missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I think I was wrong actually. I think it should be fixed while implementing #3582

if let Some(scheduled) = occupied_core.next_up_on_available {
(scheduled, OccupiedCoreAssumption::Included)
} else {
continue
let res = match maybe_claim_queue {
Some(ref claim_queue) => {
// read what's in the claim queue for this core
fetch_next_scheduled_on_core(
claim_queue,
CoreIndex(core_idx as u32),
)
},
None => {
// Runtime doesn't support claim queue runtime api. Fallback to
// `next_up_on_available`
occupied_core.next_up_on_available
alindima marked this conversation as resolved.
Show resolved Hide resolved
},
}
.map(|scheduled| (scheduled, OccupiedCoreAssumption::Included));

match res {
Some(res) => res,
None => continue,
}
},
_ => {
Expand Down Expand Up @@ -598,3 +618,37 @@ fn erasure_root(
let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
Ok(polkadot_erasure_coding::branches(&chunks).root())
}

// Checks if the runtime supports `request_claim_queue` and executes it. Returns `Ok(None)`
// otherwise. Any [`RuntimeApiError`]s are bubbled up to the caller.
async fn fetch_claim_queue(
sender: &mut impl overseer::CollationGenerationSenderTrait,
relay_parent: Hash,
) -> crate::error::Result<Option<BTreeMap<CoreIndex, VecDeque<ParaId>>>> {
if has_required_runtime(
sender,
relay_parent,
RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
)
.await
Comment on lines +630 to +635
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: instead of doing two requests (one for the version and another for the actual runtime API), we could directly try calling the runtime API and handle RuntimeApiError::NotSupported

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point but I think it is more or less the same.

Each runtime call has got a version check before it (done by the runtime api subsystem). So in the current case we have got:
Version check (by has_required_runtime) -> Version check (by runtime api subsystem) -> Runtime call

If we call the new api directly on an old runtime and handle the error we will have the same:
Version check (by runtime api subsystem for the 'old call') -> Version check (by runtime api subsystem for the 'new call') -> Runtime call

So in terms of runtime calls - it's the same when we have got old runtime.

For a new runtime we do save one version check but (1) we will remove the version check once everything is released on Polkadot and (2) the runtime calls are cached and never hit the runtime.

Considering these - I'd stick with the current version because it is more explicit.

Another drawback of relying on NotSupported for version check is that the former happens in other weird cases - e.g. a runtime call being executed on pruned blocks. Having NotSupported errors polluting the logs won't be ideal especially when we chase bugs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a new runtime we do save one version check but (1) we will remove the version check once everything is released on Polkadot and (2) the runtime calls are cached and never hit the runtime.

yes, that's exactly my point, saving one version check for a new runtime. We'd also save one round-trip to the runtime API subsystem, regardless of the runtime version.

anyway, I don't have very strong feelings about this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your point is valid. I've created an issue for this - #3756

{
let res = request_claim_queue(relay_parent, sender).await.await??;
Ok(Some(res))
} else {
gum::trace!(target: LOG_TARGET, "Runtime doesn't support `request_claim_queue`");
Ok(None)
}
}

// Returns the next scheduled `ParaId` for a core in the claim queue, wrapped in `ScheduledCore`.
// This function is supposed to be used in `handle_new_activations` hence the return type.
fn fetch_next_scheduled_on_core(
claim_queue: &BTreeMap<CoreIndex, VecDeque<ParaId>>,
core_idx: CoreIndex,
) -> Option<ScheduledCore> {
claim_queue
.get(&core_idx)?
.front()
.cloned()
.map(|para_id| ScheduledCore { para_id, collator: None })
}
Loading
Loading