Skip to content

Commit

Permalink
Feature/prospective parachains multi candidates (#1996)
Browse files Browse the repository at this point in the history
* select children

Signed-off-by: iceseer <iceseer@gmail.com>

* start collation/validation protocols

Signed-off-by: iceseer <iceseer@gmail.com>

* ancestry fix

Signed-off-by: iceseer <iceseer@gmail.com>

* view update version

Signed-off-by: iceseer <iceseer@gmail.com>

---------

Signed-off-by: iceseer <iceseer@gmail.com>
Signed-off-by: turuslan <turuslan.devbox@gmail.com>
Co-authored-by: turuslan <turuslan.devbox@gmail.com>
  • Loading branch information
iceseer and turuslan authored Apr 7, 2024
1 parent a1dec68 commit d0de7d0
Show file tree
Hide file tree
Showing 10 changed files with 674 additions and 128 deletions.
5 changes: 3 additions & 2 deletions core/log/configurator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,15 @@ namespace kagome::log {
- name: warp_sync_protocol
- name: parachain_protocols
children:
- name: collation_protocol
- name: validation_protocol
- name: collation_protocol_vstaging
- name: validation_protocol_vstaging
- name: req_collation_protocol
- name: req_chunk_protocol
- name: req_available_data_protocol
- name: req_statement_protocol
- name: req_pov_protocol
- name: dispute_protocol
- name: req_attested_candidate_protocol
- name: changes_trie
- name: storage
children:
Expand Down
78 changes: 39 additions & 39 deletions core/network/impl/peer_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,15 @@ namespace kagome::network {
return it->second;
}

std::optional<std::reference_wrapper<const PeerState>>
PeerManagerImpl::getPeerState(const PeerId &peer_id) const {
auto it = peer_states_.find(peer_id);
if (it == peer_states_.end()) {
return std::nullopt;
}
return it->second;
}

void PeerManagerImpl::processDiscoveredPeer(const PeerId &peer_id) {
// Ignore himself
if (isSelfPeer(peer_id)) {
Expand Down Expand Up @@ -726,41 +735,29 @@ namespace kagome::network {

log_->trace("Try to open outgoing validation protocol.(peer={})",
peer_info.id);
openOutgoing(
stream_engine_,
validation_protocol,
peer_info,
[validation_protocol, peer_info, wptr{weak_from_this()}](
outcome::result<std::shared_ptr<Stream>> stream_result) {
auto self = wptr.lock();
if (not self) {
return;
}

auto &peer_id = peer_info.id;
if (!stream_result.has_value()) {
SL_TRACE(self->log_,
"Unable to create stream {} with {}: {}",
validation_protocol->protocolName(),
peer_id,
stream_result.error().message());
auto ps = self->getPeerState(peer_info.id);
if (ps) {
self->tryOpenValidationProtocol(
peer_info, ps->get(), network::CollationVersion::V1);
} else {
SL_TRACE(
self->log_,
"No peer state to open V1 validation protocol {} with {}",
validation_protocol->protocolName(),
peer_id);
}
return;
}

self->stream_engine_->addOutgoing(stream_result.value(),
validation_protocol);
});
openOutgoing(stream_engine_,
validation_protocol,
peer_info,
[validation_protocol, peer_info, wptr{weak_from_this()}](
outcome::result<std::shared_ptr<Stream>> stream_result) {
auto self = wptr.lock();
if (not self) {
return;
}

auto &peer_id = peer_info.id;
if (!stream_result.has_value()) {
SL_TRACE(self->log_,
"Unable to create stream {} with {}: {}",
validation_protocol->protocolName(),
peer_id,
stream_result.error().message());
return;
}

self->stream_engine_->addOutgoing(stream_result.value(),
validation_protocol);
});
}
}

Expand Down Expand Up @@ -839,11 +836,14 @@ namespace kagome::network {
}

void PeerManagerImpl::reserveStatusStreams(const PeerId &peer_id) const {
auto proto_val_vstaging = router_->getValidationProtocolVStaging();
BOOST_ASSERT_MSG(proto_val_vstaging,
"Router did not provide validation protocol vstaging");
if (auto ps = getPeerState(peer_id);
ps && ps->get().roles.flags.authority) {
auto proto_val_vstaging = router_->getValidationProtocolVStaging();
BOOST_ASSERT_MSG(proto_val_vstaging,
"Router did not provide validation protocol vstaging");

stream_engine_->reserveStreams(peer_id, proto_val_vstaging);
stream_engine_->reserveStreams(peer_id, proto_val_vstaging);
}
}

void PeerManagerImpl::reserveStreams(const PeerId &peer_id) const {
Expand Down
2 changes: 2 additions & 0 deletions core/network/impl/peer_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ namespace kagome::network {
/** @see PeerManager::getPeerState */
std::optional<std::reference_wrapper<PeerState>> getPeerState(
const PeerId &peer_id) override;
std::optional<std::reference_wrapper<const PeerState>> getPeerState(
const PeerId &peer_id) const override;

private:
/// Right way to check self peer as it takes into account dev mode
Expand Down
4 changes: 2 additions & 2 deletions core/network/impl/router_libp2p.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ namespace kagome::network {
// lazyStart(collation_protocol_);
// lazyStart(validation_protocol_);

lazyStart(collation_protocol_);
lazyStart(validation_protocol_);
lazyStart(collation_protocol_vstaging_);
lazyStart(validation_protocol_vstaging_);
lazyStart(req_collation_protocol_);
lazyStart(req_pov_protocol_);
lazyStart(fetch_chunk_protocol_);
Expand Down
2 changes: 2 additions & 0 deletions core/network/peer_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ namespace kagome::network {
*/
virtual std::optional<std::reference_wrapper<PeerState>> getPeerState(
const PeerId &peer_id) = 0;
virtual std::optional<std::reference_wrapper<const PeerState>> getPeerState(
const PeerId &peer_id) const = 0;

/**
* @returns number of active peers
Expand Down
116 changes: 97 additions & 19 deletions core/parachain/validator/fragment_tree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,47 +533,125 @@ namespace kagome::parachain::fragment {
return res;
}

/**
* @brief Select `count` candidates after the given `required_path` which
* pass the predicate and have not already been backed on chain.
* Does an exhaustive search into the tree starting after `required_path`.
* If there are multiple possibilities of size `count`, this will select the
* first one. If there is no chain of size `count` that matches the
* criteria, this will return the largest chain it could find with the
* criteria. If there are no candidates meeting those criteria,
* returns an empty `Vec`. Cycles are accepted, see module docs for the
* `Cycles` section. The intention of the `required_path` is
* to allow queries on the basis of one or more candidates which were
* previously pending availability becoming available and opening up more
* room on the core.
*/

template <typename Func>
std::optional<CandidateHash> selectChild(
const std::vector<CandidateHash> &required_path, Func &&pred) const {
std::vector<CandidateHash> selectChildren(
const std::vector<CandidateHash> &required_path,
uint32_t count,
Func &&pred) const {
NodePointer base_node{NodePointerRoot{}};
for (const CandidateHash &required_step : required_path) {
if (auto node = nodeCandidateChild(base_node, required_step)) {
base_node = *node;
} else {
return std::nullopt;
return {};
}
}

return visit_in_place(
std::vector<CandidateHash> accum;
return selectChildrenInner(
std::move(base_node), count, count, std::forward<Func>(pred), accum);
}

/**
* @brief Try finding a candidate chain starting from `base_node` of length
* `expected_count`. If not possible, return the longest one we
* could find. Does a depth-first search, since we're optimistic that
* there won't be more than one such chains (parachains shouldn't
* usually have forks). So in the usual case, this will conclude in
* `O(expected_count)`. Cycles are accepted, but this doesn't allow for
* infinite execution time, because the maximum depth we'll reach is
* `expected_count`. Worst case performance is `O(num_forks
* ^ expected_count)`. Although an exponential function, this is
* actually a constant that can only be altered via sudo/governance,
* because: 1. `num_forks` at a given level is at most `max_candidate_depth
* * max_validators_per_core` (because each validator in the
* assigned group can second `max_candidate_depth` candidates). The
* prospective-parachains subsystem assumes that the number of para forks is
* limited by collator-protocol and backing subsystems. In practice, this is
* a constant which can only be altered by sudo or governance. 2.
* `expected_count` is equal to the number of cores a para is scheduled on
* (in an elastic scaling scenario). For non-elastic-scaling, this is
* just 1. In practice, this should be a small number (1-3), capped
* by the total number of available cores (a constant alterable only
* via governance/sudo).
*/
template <typename Func>
std::vector<CandidateHash> selectChildrenInner(
NodePointer base_node,
uint32_t expected_count,
uint32_t remaining_count,
const Func &pred,
std::vector<CandidateHash> &accumulator) const {
if (remaining_count == 0) {
return accumulator;
}

auto children = visit_in_place(
base_node,
[&](const NodePointerRoot &) -> std::optional<CandidateHash> {
for (const FragmentNode &n : nodes) {
[&](const NodePointerRoot &)
-> std::vector<std::pair<NodePointer, CandidateHash>> {
std::vector<std::pair<NodePointer, CandidateHash>> tmp;
for (size_t ptr = 0; ptr < nodes.size(); ++ptr) {
const FragmentNode &n = nodes[ptr];
if (!is_type<NodePointerRoot>(n.parent)) {
return std::nullopt;
continue;
}
if (scope.getPendingAvailability(n.candidate_hash)) {
return std::nullopt;
continue;
}
if (!pred(n.candidate_hash)) {
return std::nullopt;
continue;
}
return n.candidate_hash;
tmp.emplace_back(NodePointerStorage{ptr}, n.candidate_hash);
}
return std::nullopt;
return tmp;
},
[&](const NodePointerStorage &ptr) -> std::optional<CandidateHash> {
for (const auto &[_, h] : nodes[ptr].children) {
if (scope.getPendingAvailability(h)) {
return std::nullopt;
[&](const NodePointerStorage &base_node_ptr)
-> std::vector<std::pair<NodePointer, CandidateHash>> {
std::vector<std::pair<NodePointer, CandidateHash>> tmp;
const auto &bn = nodes[base_node_ptr];
for (const auto &[ptr, hash] : bn.children) {
if (scope.getPendingAvailability(hash)) {
continue;
}
if (!pred(h)) {
return std::nullopt;
if (!pred(hash)) {
continue;
}
return h;
tmp.emplace_back(ptr, hash);
}
return std::nullopt;
return tmp;
});

auto best_result = accumulator;
for (const auto &[child_ptr, child_hash] : children) {
accumulator.emplace_back(child_hash);
auto result = selectChildrenInner(
child_ptr, expected_count, remaining_count - 1, pred, accumulator);
accumulator.pop_back();

if (result.size() == size_t(expected_count)) {
return result;
} else if (best_result.size() < result.size()) {
best_result = result;
}
}

return best_result;
}

static FragmentTree populate(const std::shared_ptr<crypto::Hasher> &hasher,
Expand Down
50 changes: 35 additions & 15 deletions core/parachain/validator/impl/parachain_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,19 @@ namespace kagome::parachain {
return;
}

[[maybe_unused]] const auto _ =
prospective_parachains_->onActiveLeavesUpdate(network::ExViewRef{
.new_head = {event.new_head},
.lost = event.lost,
});
if (const auto r =
prospective_parachains_->onActiveLeavesUpdate(network::ExViewRef{
.new_head = {event.new_head},
.lost = event.lost,
});
r.has_error()) {
SL_WARN(
logger_,
"Prospective parachains leaf update failed. (relay_parent={}, error={})",
relay_parent,
r.error().message());
}

backing_store_->onActivateLeaf(relay_parent);
createBackingTask(relay_parent);
SL_TRACE(logger_,
Expand Down Expand Up @@ -521,7 +529,7 @@ namespace kagome::parachain {
void ParachainProcessorImpl::broadcastViewExcept(
const libp2p::peer::PeerId &peer_id, const network::View &view) const {
auto msg = std::make_shared<
network::WireMessage<network::ValidatorProtocolMessage>>(
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::ViewUpdate{.view = view});
pm_->getStreamEngine()->broadcast(
router_->getValidationProtocolVStaging(),
Expand Down Expand Up @@ -562,7 +570,8 @@ namespace kagome::parachain {
BOOST_ASSERT(se);

auto message = std::make_shared<
network::WireMessage<network::ValidatorProtocolMessage>>(msg);
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
msg);
SL_TRACE(
logger_,
"Broadcasting view update to group.(relay_parent={}, group_size={})",
Expand All @@ -581,7 +590,7 @@ namespace kagome::parachain {

void ParachainProcessorImpl::broadcastView(const network::View &view) const {
auto msg = std::make_shared<
network::WireMessage<network::ValidatorProtocolMessage>>(
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::ViewUpdate{.view = view});
pm_->getStreamEngine()->broadcast(router_->getCollationProtocolVStaging(),
msg);
Expand Down Expand Up @@ -2255,18 +2264,28 @@ namespace kagome::parachain {
core,
[&](const network::ScheduledCore &scheduled_core)
-> std::optional<std::pair<CandidateHash, Hash>> {
return prospective_parachains_->answerGetBackableCandidate(
relay_parent, scheduled_core.para_id, {});
if (auto i = prospective_parachains_->answerGetBackableCandidates(
relay_parent, scheduled_core.para_id, 1, {});
!i.empty()) {
return i[0];
}
return std::nullopt;
},
[&](const runtime::OccupiedCore &occupied_core)
-> std::optional<std::pair<CandidateHash, Hash>> {
/// TODO(iceseer): do https://github.com/qdrvm/kagome/issues/1888
/// `bitfields_indicate_availability` check
if (occupied_core.next_up_on_available) {
return prospective_parachains_->answerGetBackableCandidate(
relay_parent,
occupied_core.next_up_on_available->para_id,
{occupied_core.candidate_hash});
if (auto i =
prospective_parachains_->answerGetBackableCandidates(
relay_parent,
occupied_core.next_up_on_available->para_id,
1,
{occupied_core.candidate_hash});
!i.empty()) {
return i[0];
}
return std::nullopt;
}
return std::nullopt;
},
Expand Down Expand Up @@ -2810,7 +2829,7 @@ namespace kagome::parachain {
peer_id,
protocol,
std::make_shared<
network::WireMessage<network::ValidatorProtocolMessage>>(
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::ViewUpdate{.view = my_view->get().view}));
}

Expand Down Expand Up @@ -3553,6 +3572,7 @@ namespace kagome::parachain {
*our_current_state_.implicit_view,
our_current_state_.active_leaves,
peer_data.collator_state->para_id)) {
SL_TRACE(logger_, "Out of view. (relay_parent={})", on_relay_parent);
return Error::OUT_OF_VIEW;
}

Expand Down
Loading

0 comments on commit d0de7d0

Please sign in to comment.