Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deferred credits #3229

Merged
merged 8 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions massa-bootstrap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ massa_ledger_worker = { path = "../massa-ledger-worker", features = [
] }
massa_pos_worker = { path = "../massa-pos-worker", features = ["testing"] }
massa_pos_exports = { path = "../massa-pos-exports", features = ["testing"] }
massa_consensus_exports = { path = "../massa-consensus-exports", features = [
"testing",
] }
lazy_static = "1.4"
tempfile = "3.3"

Expand Down
19 changes: 11 additions & 8 deletions massa-execution-worker/src/active_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,18 @@ impl ActiveHistory {
}

/// Gets the deferred credits for a given address that will be credited at a given slot
pub(crate) fn get_adress_deferred_credit_for(&self, addr: &Address, slot: &Slot) -> Option<Amount> {
for hist_item in self.0
.iter()
.rev() {
pub(crate) fn get_adress_deferred_credit_for(
&self,
addr: &Address,
slot: &Slot,
) -> Option<Amount> {
for hist_item in self.0.iter().rev() {
if let Some(v) = hist_item
.state_changes
.pos_changes
.deferred_credits
.get_address_deferred_credit_for_slot(addr, slot) {
.state_changes
.pos_changes
.deferred_credits
.get_address_deferred_credit_for_slot(addr, slot)
{
return Some(v);
}
}
Expand Down
18 changes: 14 additions & 4 deletions massa-execution-worker/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,12 +658,22 @@ impl ExecutionContext {
/// * `slot`: associated slot of the deferred credits to be executed
/// * `credits`: deferred to be executed
pub fn execute_deferred_credits(&mut self, slot: &Slot) {
let credits = self.speculative_roll_state.get_deferred_credits(slot);
for (addr, amount) in credits {
if let Err(e) = self.transfer_coins(None, Some(addr), amount, false) {
let executed_credits = self.speculative_roll_state.get_deferred_credits(slot);

for (address, amount) in executed_credits {
self.speculative_roll_state
AurelienFT marked this conversation as resolved.
Show resolved Hide resolved
.added_changes
.deferred_credits
.0
.entry(*slot)
.or_default()
.entry(address)
.and_modify(|credit_amount| *credit_amount = Amount::default())
.or_default();
if let Err(e) = self.transfer_coins(None, Some(address), amount, false) {
debug!(
"could not credit {} deferred coins to {} at slot {}: {}",
amount, addr, slot, e
amount, address, slot, e
);
}
}
Expand Down
35 changes: 20 additions & 15 deletions massa-execution-worker/src/speculative_roll_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) struct SpeculativeRollState {
active_history: Arc<RwLock<ActiveHistory>>,

/// List of changes to the state after settling roll sell/buy
added_changes: PoSChanges,
pub(crate) added_changes: PoSChanges,
}

impl SpeculativeRollState {
Expand Down Expand Up @@ -136,15 +136,14 @@ impl SpeculativeRollState {
.saturating_add(roll_price.saturating_mul_u64(roll_count));

// Remove the rolls
self
.added_changes
self.added_changes
.roll_changes
.insert(*seller_addr, owned_count.saturating_sub(roll_count));

// Add deferred credits (reimbursement) corresponding to the sold rolls value
self.added_changes
.deferred_credits
.insert(*seller_addr, target_slot, new_deferred_credits);
.deferred_credits
.insert(*seller_addr, target_slot, new_deferred_credits);

Ok(())
}
Expand Down Expand Up @@ -225,7 +224,7 @@ impl SpeculativeRollState {
if !target_credits.is_empty() {
let mut credits = DeferredCredits::default();
credits.0.insert(target_slot, target_credits);
self.added_changes.deferred_credits.nested_extend(credits);
self.added_changes.deferred_credits.nested_replace(credits);
}
}

Expand Down Expand Up @@ -277,26 +276,32 @@ impl SpeculativeRollState {

/// Gets the deferred credits for a given address that will be credited at a given slot
fn get_address_deferred_credit_for_slot(&self, addr: &Address, slot: &Slot) -> Option<Amount> {

// search in the added changes
if let Some(v) = self.added_changes
if let Some(v) = self
.added_changes
.deferred_credits
.get_address_deferred_credit_for_slot(addr, slot) {
.get_address_deferred_credit_for_slot(addr, slot)
{
return Some(v);
}

// search in the history
if let Some(v) = self.active_history
if let Some(v) = self
.active_history
.read()
.get_adress_deferred_credit_for(addr, slot) {
.get_adress_deferred_credit_for(addr, slot)
{
return Some(v);
}

// search in the final state
if let Some(v) = self.final_state
if let Some(v) = self
.final_state
.read()
.pos_state.deferred_credits
.get_address_deferred_credit_for_slot(addr, slot) {
.pos_state
.deferred_credits
.get_address_deferred_credit_for_slot(addr, slot)
{
return Some(v);
}

Expand Down Expand Up @@ -490,7 +495,7 @@ impl SpeculativeRollState {
credits.extend(
self.active_history
.read()
.get_all_deferred_credits_for(slot)
.get_all_deferred_credits_for(slot),
);

// added deferred credits
Expand Down
46 changes: 39 additions & 7 deletions massa-final-state/src/final_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use massa_async_pool::{AsyncMessageId, AsyncPool, AsyncPoolChanges, Change};
use massa_executed_ops::ExecutedOps;
use massa_ledger_exports::{get_address_from_key, LedgerChanges, LedgerController};
use massa_models::{slot::Slot, streaming_step::StreamingStep};
use massa_pos_exports::{PoSFinalState, SelectorController};
use massa_pos_exports::{DeferredCredits, PoSFinalState, SelectorController};
use std::collections::VecDeque;
use tracing::debug;

Expand Down Expand Up @@ -134,8 +134,10 @@ impl FinalState {
/// * ledger change that is after `slot` and before or equal to `ledger_step` key
/// * ledger change if main bootstrap process is finished
/// * async pool change that is after `slot` and before or equal to `pool_step` message id
/// * proof-of-stake change if main bootstrap process is finished
/// * proof-of-stake change if main bootstrap process is finished
/// * async pool change if main bootstrap process is finished
/// * proof-of-stake deferred credits change if main bootstrap process is finished
/// * proof-of-stake deferred credits change that is after `slot` and before or equal to `credits_step` slot
/// * proof-of-stake cycle history change if main bootstrap process is finished
/// * executed ops change if main bootstrap process is finished
///
/// Produces an error when the `slot` is too old for `self.changes_history`
Expand Down Expand Up @@ -227,12 +229,42 @@ impl FinalState {
_ => (),
}

// Get Proof of Stake state changes if current bootstrap cycle is incomplete (so last)
if cycle_step.finished() && credits_step.finished() {
slot_changes.pos_changes = changes.pos_changes.clone();
// Get PoS deferred credits changes that concern credits <= credits_step
match credits_step {
StreamingStep::Ongoing(cursor_slot) => {
let deferred_credits = DeferredCredits(
changes
.pos_changes
.deferred_credits
.0
.iter()
.filter_map(|(credits_slot, credits)| {
if *credits_slot <= cursor_slot {
Some((*credits_slot, credits.clone()))
} else {
None
}
})
.collect(),
);
slot_changes.pos_changes.deferred_credits = deferred_credits;
}
StreamingStep::Finished(_) => {
slot_changes.pos_changes.deferred_credits =
changes.pos_changes.deferred_credits.clone();
}
_ => (),
}

// Get PoS cycle changes if cycle history main bootstrap finished
if cycle_step.finished() {
slot_changes.pos_changes.seed_bits = changes.pos_changes.seed_bits.clone();
slot_changes.pos_changes.roll_changes = changes.pos_changes.roll_changes.clone();
slot_changes.pos_changes.production_stats =
changes.pos_changes.production_stats.clone();
}

// Get executed operations changes if classic bootstrap finished
// Get executed operations changes if executed ops main bootstrap finished
if ops_step.finished() {
slot_changes.executed_ops_changes = changes.executed_ops_changes.clone();
}
Expand Down
33 changes: 16 additions & 17 deletions massa-pos-exports/src/deferred_credits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,19 @@ pub struct DeferredCredits(pub BTreeMap<Slot, PreHashMap<Address, Amount>>);

impl DeferredCredits {
/// Extends the current `DeferredCredits` with another but accumulates the addresses and amounts
pub fn nested_extend(&mut self, other: Self) {
for (slot, new_credits) in other.0 {
pub fn nested_replace(&mut self, other: Self) {
for (slot, other_credits) in other.0 {
self.0
.entry(slot)
.and_modify(|current_credits| {
for (address, new_amount) in new_credits.iter() {
for (address, other_amount) in other_credits.iter() {
current_credits
.entry(*address)
.and_modify(|current_amount| {
*current_amount = current_amount.saturating_add(*new_amount);
})
.or_insert(*new_amount);
.and_modify(|current_amount| *current_amount = *other_amount)
.or_insert(*other_amount);
}
})
.or_insert(new_credits);
.or_insert(other_credits);
}
}

Expand All @@ -55,25 +53,26 @@ impl DeferredCredits {
}

/// Gets the deferred credits for a given address that will be credited at a given slot
pub fn get_address_deferred_credit_for_slot(&self, addr: &Address, slot: &Slot) -> Option<Amount> {
if let Some(v) = self.0
pub fn get_address_deferred_credit_for_slot(
&self,
addr: &Address,
slot: &Slot,
) -> Option<Amount> {
if let Some(v) = self
.0
.get(slot)
.and_then(|slot_credits|
slot_credits.get(addr)
) {
.and_then(|slot_credits| slot_credits.get(addr))
{
return Some(*v);
}
None
}

/// Insert/overwrite a deferred credit
pub fn insert(&mut self, addr: Address, slot: Slot, amount: Amount) {
let entry = self.0
.entry(slot)
.or_default();
let entry = self.0.entry(slot).or_default();
entry.insert(addr, amount);
}

}

/// Serializer for `DeferredCredits`
Expand Down
2 changes: 1 addition & 1 deletion massa-pos-exports/src/pos_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl PoSChanges {
}

// extend deferred credits
self.deferred_credits.nested_extend(other.deferred_credits);
self.deferred_credits.nested_replace(other.deferred_credits);
}
}

Expand Down
4 changes: 2 additions & 2 deletions massa-pos-exports/src/pos_final_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl PoSFinalState {
// extent deferred_credits with changes.deferred_credits
// remove zero-valued credits
self.deferred_credits
.nested_extend(changes.deferred_credits);
.nested_replace(changes.deferred_credits);
self.deferred_credits.remove_zeros();

// feed the cycle if it is complete
Expand Down Expand Up @@ -480,7 +480,7 @@ impl PoSFinalState {
/// # Arguments
/// `part`: `DeferredCredits` from `get_pos_state_part` and used to update PoS final state
pub fn set_deferred_credits_part(&mut self, part: DeferredCredits) -> StreamingStep<Slot> {
self.deferred_credits.nested_extend(part);
self.deferred_credits.nested_replace(part);
if let Some(slot) = self
.deferred_credits
.0
Expand Down