Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

RollingSessionWindow cleanup #7204

Merged
merged 46 commits into from
May 30, 2023
Merged
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
5eb24c6
Replace `RollingSessionWindow` with `RuntimeInfo` - initial commit
tdimitrov Apr 13, 2023
b0da049
Fix tests in import
tdimitrov Apr 20, 2023
93e6863
Fix the rest of the tests
tdimitrov Apr 20, 2023
a7b74cc
Remove dead code
tdimitrov Apr 20, 2023
0f84b42
Fix todos
tdimitrov Apr 21, 2023
0e486d4
Simplify session caching
tdimitrov Apr 21, 2023
f6a9d98
Comments for `SessionInfoProvider`
tdimitrov Apr 21, 2023
2918b42
Separate `SessionInfoProvider` from `State`
tdimitrov Apr 21, 2023
837f952
`cache_session_info_for_head` becomes freestanding function
tdimitrov Apr 21, 2023
6f609f3
Remove unneeded `mut` usage
tdimitrov Apr 21, 2023
82a0ebc
fn session_info -> fn get_session_info() to avoid name clashes. The f…
tdimitrov Apr 24, 2023
dca7287
Fix SessionInfo retrieval
tdimitrov Apr 24, 2023
360b96d
Code cleanup
tdimitrov Apr 24, 2023
caa2529
Don't wrap `SessionInfoProvider` in an `Option`
tdimitrov Apr 25, 2023
7665ecd
Remove `earliest_session()`
tdimitrov Apr 27, 2023
3f2b85c
Remove pre-caching -> wip
tdimitrov Apr 28, 2023
19e9588
Fix some tests and code cleanup
tdimitrov May 2, 2023
70d136f
Fix all tests
tdimitrov May 2, 2023
6e348c9
Fixes in tests
tdimitrov May 2, 2023
4fa4403
Fix comments, variable names and small style changes
tdimitrov May 2, 2023
dc0542a
Fix a warning
tdimitrov May 2, 2023
233478e
impl From<SessionWindowSize> for NonZeroUsize
tdimitrov May 5, 2023
d27220a
Fix logging for `get_session_info` - remove redundant logs and decrea…
tdimitrov May 6, 2023
407eb27
Merge branch 'master' into tsv-rolling-session-approval-voting
tdimitrov May 6, 2023
2ed7dfd
Code review feedback
tdimitrov May 9, 2023
dcac680
Merge branch 'master' into tsv-rolling-session-approval-voting
tdimitrov May 10, 2023
96ffefe
Storage migration removing `COL_SESSION_WINDOW_DATA` from parachains db
tdimitrov May 9, 2023
81acb35
Remove `col_session_data` usages
tdimitrov May 9, 2023
14eb540
Storage migration clearing columns w/o removing them
tdimitrov May 11, 2023
0606eee
Remove session data column usages from `approval-voting` and `dispute…
tdimitrov May 11, 2023
6a2a9e2
Add some test cases from `RollingSessionWindow` to `dispute-coordinat…
tdimitrov May 12, 2023
0f94664
Fix formatting in initialized.rs
tdimitrov May 12, 2023
db120f6
Fix a corner case in `SessionInfo` caching for `dispute-coordinator`
tdimitrov May 12, 2023
0ffd17f
Remove `RollingSessionWindow` ;(
tdimitrov May 12, 2023
7d83f95
Revert "Fix formatting in initialized.rs"
tdimitrov May 12, 2023
b938b92
Merge branch 'master' into tsv-rolling-session-window-cleanup
tdimitrov May 16, 2023
39ae191
v2 to v3 migration drops `COL_DISPUTE_COORDINATOR_DATA` instead of cl…
tdimitrov May 16, 2023
84c134a
Fix `NUM_COLUMNS` in `approval-voting`
tdimitrov May 16, 2023
c7439c5
Use `columns::v3::NUM_COLUMNS` when opening db
tdimitrov May 18, 2023
2f6b33b
Update node/service/src/parachains_db/upgrade.rs
tdimitrov May 25, 2023
0ccfe2c
Don't write in `COL_DISPUTE_COORDINATOR_DATA` for `test_rocksdb_migra…
tdimitrov May 25, 2023
ab7197d
Fix `NUM+COLUMNS` in approval_voting
tdimitrov May 25, 2023
40daf94
Merge branch 'master' into tsv-rolling-session-window-cleanup
tdimitrov May 29, 2023
44fe75a
Fix formatting
tdimitrov May 29, 2023
bbbd5ee
Fix columns usage
tdimitrov May 29, 2023
4b45afb
Clarification comments about the different db versions
tdimitrov May 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Separate SessionInfoProvider from State
tdimitrov committed Apr 21, 2023
commit 2918b42b69376a6af3139e865b055f2db2493595
45 changes: 27 additions & 18 deletions node/core/approval-voting/src/import.rs
Original file line number Diff line number Diff line change
@@ -330,6 +330,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
ctx: &mut Context,
state: &mut State,
db: &mut OverlayedBackend<'_, B>,
session_info_provider: &mut Option<SessionInfoProvider>,
head: Hash,
finalized_number: &Option<BlockNumber>,
) -> SubsystemResult<Vec<BlockImportedCandidates>> {
@@ -365,7 +366,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
};

// Update session info based on most recent head.
state.cache_session_info_for_head(ctx, head).await;
state.cache_session_info_for_head(ctx, head, session_info_provider).await;

// If we've just started the node and are far behind,
// import at most `MAX_HEADS_LOOK_BACK` blocks.
@@ -394,7 +395,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
let mut imported_blocks_and_info = Vec::with_capacity(new_blocks.len());
for (block_hash, block_header) in new_blocks.into_iter().rev() {
let env = ImportedBlockInfoEnv {
runtime_info: &mut state.session_info,
runtime_info: session_info_provider,
assignment_criteria: &*state.assignment_criteria,
keystore: &state.keystore,
};
@@ -449,8 +450,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
} = imported_block_info;

// todo: refactor this
let session_info = &state
.session_info
let session_info = &session_info_provider
.as_mut()
.map(|s| &mut s.runtime_info)
.expect("imported_block_info requires session info to be available; qed")
@@ -648,7 +648,6 @@ pub(crate) mod tests {

fn blank_state() -> State {
State {
session_info: None,
keystore: Arc::new(LocalKeystore::in_memory()),
slot_duration_millis: 6_000,
clock: Box::new(MockClock::default()),
@@ -657,7 +656,11 @@ pub(crate) mod tests {
}
}

fn single_session_state(index: SessionIndex, info: SessionInfo, relay_parent: Hash) -> State {
fn single_session_state(
index: SessionIndex,
info: SessionInfo,
relay_parent: Hash,
) -> (State, SessionInfoProvider) {
let runtime_info = RuntimeInfo::new_with_cache(
RuntimeInfoConfig {
keystore: None,
@@ -674,14 +677,12 @@ pub(crate) mod tests {
)],
);

State {
session_info: Some(SessionInfoProvider {
highest_session_seen: index,
gaps_in_cache: false,
runtime_info,
}),
..blank_state()
}
let state = blank_state();

let session_info_provider =
SessionInfoProvider { runtime_info, gaps_in_cache: false, highest_session_seen: index };

(state, session_info_provider)
}

struct MockAssignmentCriteria;
@@ -1284,7 +1285,8 @@ pub(crate) mod tests {
.map(|(r, c, g)| CandidateEvent::CandidateIncluded(r, Vec::new().into(), c, g))
.collect::<Vec<_>>();

let mut state = single_session_state(session, session_info, parent_hash);
let (mut state, session_info_provider) =
single_session_state(session, session_info, parent_hash);
overlay_db.write_block_entry(
v1::BlockEntry {
block_hash: parent_hash,
@@ -1306,9 +1308,16 @@ pub(crate) mod tests {
let test_fut = {
Box::pin(async move {
let mut overlay_db = OverlayedBackend::new(&db);
let result = handle_new_head(&mut ctx, &mut state, &mut overlay_db, hash, &Some(1))
.await
.unwrap();
let result = handle_new_head(
&mut ctx,
&mut state,
&mut overlay_db,
&mut Some(session_info_provider),
hash,
&Some(1),
)
.await
.unwrap();

let write_ops = overlay_db.into_write_ops();
db.write(write_ops).unwrap();
208 changes: 138 additions & 70 deletions node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
@@ -655,8 +655,6 @@ impl SessionInfoProvider {
}

struct State {
// `None` on start-up. Gets initialized/updated on leaf update
session_info: Option<SessionInfoProvider>,
keystore: Arc<LocalKeystore>,
slot_duration_millis: u64,
clock: Box<dyn Clock + Send + Sync>,
@@ -666,43 +664,16 @@ struct State {

#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
impl State {
async fn session_info<Sender>(
&mut self,
sender: &mut Sender,
relay_parent: Hash,
session_index: SessionIndex,
) -> Option<&SessionInfo>
where
Sender: SubsystemSender<RuntimeApiMessage>,
{
match &mut self.session_info {
Some(session_info_provider) => match session_info_provider
.runtime_info
.get_session_info_by_index(sender, relay_parent, session_index)
.await
{
Ok(extended_info) => Some(&extended_info.session_info),
Err(_) => {
gum::error!(
target: LOG_TARGET,
session = session_index,
?relay_parent,
"Can't get SessionInfo"
);
None
},
},

None => None,
}
}

/// If `head` is in a new session - cache it
pub async fn cache_session_info_for_head<Context>(&mut self, ctx: &mut Context, head: Hash)
where
pub async fn cache_session_info_for_head<Context>(
&mut self,
ctx: &mut Context,
head: Hash,
session_info: &mut Option<SessionInfoProvider>,
) where
<Context as overseer::SubsystemContext>::Sender: Sized + Send,
{
match self.session_info.take() {
match session_info.take() {
None => {
let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig {
keystore: None,
@@ -742,7 +713,7 @@ impl State {
}
}

self.session_info = Some(SessionInfoProvider {
*session_info = Some(SessionInfoProvider {
runtime_info,
gaps_in_cache,
highest_session_seen: head_session_idx,
@@ -800,26 +771,32 @@ impl State {
};
}

self.session_info = Some(session_info_provider);
*session_info = Some(session_info_provider);
},
}
}

// Compute the required tranches for approval for this block and candidate combo.
// Fails if there is no approval entry for the block under the candidate or no candidate entry
// under the block, or if the session is out of bounds.
async fn approval_status<Sender, 'a, 'b>(
&'a mut self,
sender: &mut Sender,
session_info_provider: &'a mut Option<SessionInfoProvider>,
block_entry: &'a BlockEntry,
candidate_entry: &'b CandidateEntry,
) -> Option<(&'b ApprovalEntry, ApprovalStatus)>
where
Sender: SubsystemSender<RuntimeApiMessage>,
{
// We can't borrow the session here. Only get copies of what's needed.
let (no_show_slots, needed_approvals) = match self
.session_info(sender, block_entry.parent_hash(), block_entry.session())
.await
let (no_show_slots, needed_approvals) = match session_info(
session_info_provider,
sender,
block_entry.parent_hash(),
block_entry.session(),
)
.await
{
Some(s) => (s.no_show_slots, s.needed_approvals),
None => {
@@ -897,14 +874,16 @@ where
}

let mut state = State {
session_info: None,
keystore: subsystem.keystore,
slot_duration_millis: subsystem.slot_duration_millis,
clock,
assignment_criteria,
spans: HashMap::new(),
};

// `None` on start-up. Gets initialized/updated on leaf update
let mut session_info_provider: Option<SessionInfoProvider> = None;

let mut wakeups = Wakeups::default();
let mut currently_checking_set = CurrentlyCheckingSet::default();
let mut approvals_cache = lru::LruCache::new(APPROVAL_CACHE_SIZE);
@@ -930,6 +909,7 @@ where
&mut ctx,
&mut state,
&mut overlayed_db,
&mut session_info_provider,
woken_block,
woken_candidate,
&subsystem.metrics,
@@ -940,6 +920,7 @@ where
&mut ctx,
&mut state,
&mut overlayed_db,
&mut session_info_provider,
&subsystem.metrics,
next_msg?,
&mut last_finalized_height,
@@ -990,6 +971,7 @@ where
&mut ctx,
&mut state,
&mut overlayed_db,
&mut session_info_provider,
&subsystem.metrics,
&mut wakeups,
&mut currently_checking_set,
@@ -1036,6 +1018,7 @@ async fn handle_actions<Context>(
ctx: &mut Context,
state: &mut State,
overlayed_db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut Option<SessionInfoProvider>,
metrics: &Metrics,
wakeups: &mut Wakeups,
currently_checking_set: &mut CurrentlyCheckingSet,
@@ -1066,6 +1049,7 @@ async fn handle_actions<Context>(
ctx,
state,
overlayed_db,
session_info_provider,
metrics,
candidate_hash,
approval_request,
@@ -1293,6 +1277,7 @@ async fn handle_from_overseer<Context>(
ctx: &mut Context,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut Option<SessionInfoProvider>,
metrics: &Metrics,
x: FromOrchestra<ApprovalVotingMessage>,
last_finalized_height: &mut Option<BlockNumber>,
@@ -1306,7 +1291,16 @@ async fn handle_from_overseer<Context>(
let approval_voting_span =
jaeger::PerLeafSpan::new(activated.span, "approval-voting");
state.spans.insert(head, approval_voting_span);
match import::handle_new_head(ctx, state, db, head, &*last_finalized_height).await {
match import::handle_new_head(
ctx,
state,
db,
session_info_provider,
head,
&*last_finalized_height,
)
.await
{
Err(e) => return Err(SubsystemError::with_origin("db", e)),
Ok(block_imported_candidates) => {
// Schedule wakeups for all imported candidates.
@@ -1376,16 +1370,31 @@ async fn handle_from_overseer<Context>(
},
FromOrchestra::Communication { msg } => match msg {
ApprovalVotingMessage::CheckAndImportAssignment(a, claimed_core, res) => {
let (check_outcome, actions) =
check_and_import_assignment(ctx.sender(), state, db, a, claimed_core).await?;
let (check_outcome, actions) = check_and_import_assignment(
ctx.sender(),
state,
db,
session_info_provider,
a,
claimed_core,
)
.await?;
let _ = res.send(check_outcome);

actions
},
ApprovalVotingMessage::CheckAndImportApproval(a, res) =>
check_and_import_approval(ctx.sender(), state, db, metrics, a, |r| {
let _ = res.send(r);
})
check_and_import_approval(
ctx.sender(),
state,
db,
session_info_provider,
metrics,
a,
|r| {
let _ = res.send(r);
},
)
.await?
.0,
ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res) => {
@@ -1860,6 +1869,7 @@ async fn check_and_import_assignment<Sender>(
sender: &mut Sender,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut Option<SessionInfoProvider>,
assignment: IndirectAssignmentCert,
candidate_index: CandidateIndex,
) -> SubsystemResult<(AssignmentCheckResult, Vec<Action>)>
@@ -1888,9 +1898,13 @@ where
)),
};

let session_info = match state
.session_info(sender, block_entry.parent_hash(), block_entry.session())
.await
let session_info = match session_info(
session_info_provider,
sender,
block_entry.parent_hash(),
block_entry.session(),
)
.await
{
Some(s) => s,
None =>
@@ -2003,8 +2017,9 @@ where
let mut actions = Vec::new();

// We've imported a new approval, so we need to schedule a wake-up for when that might no-show.
if let Some((approval_entry, status)) =
state.approval_status(sender, &block_entry, &candidate_entry).await
if let Some((approval_entry, status)) = state
.approval_status(sender, session_info_provider, &block_entry, &candidate_entry)
.await
{
actions.extend(schedule_wakeup_action(
approval_entry,
@@ -2027,6 +2042,7 @@ async fn check_and_import_approval<T, Sender>(
sender: &mut Sender,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut Option<SessionInfoProvider>,
metrics: &Metrics,
approval: IndirectSignedApprovalVote,
with_response: impl FnOnce(ApprovalCheckResult) -> T,
@@ -2059,15 +2075,21 @@ where
},
};

let session_info =
match state.session_info(sender, approval.block_hash, block_entry.session()).await {
Some(s) => s,
None => {
respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownSessionIndex(
block_entry.session()
),))
},
};
let session_info = match session_info(
session_info_provider,
sender,
approval.block_hash,
block_entry.session(),
)
.await
{
Some(s) => s,
None => {
respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownSessionIndex(
block_entry.session()
),))
},
};

let approved_candidate_hash = match block_entry.candidate(approval.candidate_index as usize) {
Some((_, h)) => *h,
@@ -2144,6 +2166,7 @@ where
sender,
state,
db,
session_info_provider,
&metrics,
block_entry,
approved_candidate_hash,
@@ -2190,6 +2213,7 @@ async fn advance_approval_state<Sender>(
sender: &mut Sender,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut Option<SessionInfoProvider>,
metrics: &Metrics,
mut block_entry: BlockEntry,
candidate_hash: CandidateHash,
@@ -2228,8 +2252,9 @@ where

let tick_now = state.clock.tick_now();

let (is_approved, status) = if let Some((approval_entry, status)) =
state.approval_status(sender, &block_entry, &candidate_entry).await
let (is_approved, status) = if let Some((approval_entry, status)) = state
.approval_status(sender, session_info_provider, &block_entry, &candidate_entry)
.await
{
let check = approval_checking::check_approval(
&candidate_entry,
@@ -2364,6 +2389,7 @@ async fn process_wakeup<Context>(
ctx: &mut Context,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut Option<SessionInfoProvider>,
relay_block: Hash,
candidate_hash: CandidateHash,
metrics: &Metrics,
@@ -2389,9 +2415,13 @@ async fn process_wakeup<Context>(

let slot_duration_millis = state.slot_duration_millis;
let tranche_now = state.clock.tranche_now(slot_duration_millis, block_entry.slot());
let session_info = match state
.session_info(ctx.sender(), block_entry.parent_hash(), block_entry.session())
.await
let session_info = match session_info(
session_info_provider,
ctx.sender(),
block_entry.parent_hash(),
block_entry.session(),
)
.await
{
Some(i) => i,
None => {
@@ -2507,6 +2537,7 @@ async fn process_wakeup<Context>(
ctx.sender(),
state,
db,
session_info_provider,
metrics,
block_entry,
candidate_hash,
@@ -2724,6 +2755,7 @@ async fn issue_approval<Context>(
ctx: &mut Context,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut Option<SessionInfoProvider>,
metrics: &Metrics,
candidate_hash: CandidateHash,
ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest,
@@ -2765,9 +2797,13 @@ async fn issue_approval<Context>(
};
issue_approval_span.add_int_tag("candidate_index", candidate_index as i64);

let session_info = match state
.session_info(ctx.sender(), block_entry.parent_hash(), block_entry.session())
.await
let session_info = match session_info(
session_info_provider,
ctx.sender(),
block_entry.parent_hash(),
block_entry.session(),
)
.await
{
Some(s) => s,
None => {
@@ -2856,6 +2892,7 @@ async fn issue_approval<Context>(
ctx.sender(),
state,
db,
session_info_provider,
metrics,
block_entry,
candidate_hash,
@@ -2918,3 +2955,34 @@ fn issue_local_invalid_statement<Sender>(
false,
));
}

async fn session_info<'a, Sender>(
session_info_provider: &'a mut Option<SessionInfoProvider>,
sender: &mut Sender,
relay_parent: Hash,
session_index: SessionIndex,
) -> Option<&'a SessionInfo>
where
Sender: SubsystemSender<RuntimeApiMessage>,
{
match session_info_provider {
Some(session_info_provider) => match session_info_provider
.runtime_info
.get_session_info_by_index(sender, relay_parent, session_index)
.await
{
Ok(extended_info) => Some(&extended_info.session_info),
Err(_) => {
gum::error!(
target: LOG_TARGET,
session = session_index,
?relay_parent,
"Can't get SessionInfo"
);
None
},
},

None => None,
}
}