Skip to content

Commit

Permalink
Feature: move partition (#1326)
Browse files Browse the repository at this point in the history
* init

* avoid recompute current_deadline

* treat empty bitfield as all

* rm useless quote

* add verify

* combine option1 & option2

* fix

* fix

* nit

* mod error

* nit

* fmt

* fix ci

* fix bug

* add test

* add more test

* partial fix for review

* adjust test

* use .context_code

* fix for test

* disallow empty partitions

* refactor deadline_available_for_move

* fix for clippy

* minor opt

* only verify_windowed_post once

* mod error msg

* 1. verify_window_post batch by batch
2. move partitions intact, the `expirations_epochs` and `early_terminated` may have a gap of 24H

* fix ci

* mod check for epoch

* partial review fix

* adjust test

* refactor with Partition::adjust_for_move

* share the language with FIP

* deadline_available_for_move => ensure_deadline_available_for_move

* add some doc comment

* more renaming

* more renaming

* rename + merge master

* mod wording

* fix test

* renaming in test

* apply alex's idea of not re-quantizing at all.

* 1. forbid moving when there're early terminations
2. remove `adjust_for_move`

* rm anyhow::Ok

* minor optimization by observing that partition `faulty_power` should be zero when `faults` is empty

* adjust find_sectors_by_expiration for not re-quantizing

* add test

* fix for review

* add a comment about not re-quantizing when moving expirations_epochs

* minor optimization

* avoid scanning the same range twice

* 1. review fix
2. add check for `max_partitions_per_deadline`

* fix comment

* use with_context_code
  • Loading branch information
zhiqiangxu authored Sep 29, 2023
1 parent e1d0eef commit 5ec2a6b
Show file tree
Hide file tree
Showing 11 changed files with 1,332 additions and 61 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ fvm_ipld_encoding = "0.4.0"
fvm_ipld_blockstore = "0.2.0"
fvm_ipld_hamt = "0.7.0"
fvm_ipld_kamt = "0.3.0"
fvm_ipld_amt = { version = "0.6.1" }
fvm_ipld_amt = { version = "0.6.2" }
fvm_ipld_bitfield = "0.6.0"

# workspace
Expand Down
97 changes: 97 additions & 0 deletions actors/miner/src/deadline_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use super::{
BitFieldQueue, ExpirationSet, Partition, PartitionSectorMap, PoStPartition, PowerPair,
SectorOnChainInfo, Sectors, TerminationResult,
};

use crate::SECTORS_AMT_BITWIDTH;

// Bitwidth of AMTs determined empirically from mutation patterns and projections of mainnet data.
Expand Down Expand Up @@ -102,6 +103,102 @@ impl Deadlines {
self.due[deadline_idx as usize] = store.put_cbor(deadline, Code::Blake2b256)?;
Ok(())
}

pub fn move_partitions<BS: Blockstore>(
policy: &Policy,
store: &BS,
orig_deadline: &mut Deadline,
dest_deadline: &mut Deadline,
partitions: &BitField,
) -> anyhow::Result<()> {
let mut orig_partitions = orig_deadline.partitions_amt(store)?;
let mut dest_partitions = dest_deadline.partitions_amt(store)?;

// even though we're moving partitions intact, we still need to update orig/dest `Deadline` accordingly.

if dest_partitions.count() + partitions.len() > policy.max_partitions_per_deadline {
return Err(actor_error!(
forbidden,
"partitions in dest_deadline will exceed max_partitions_per_deadline"
))?;
}

let first_dest_partition_idx = dest_partitions.count();
for (i, orig_partition_idx) in partitions.iter().enumerate() {
let moving_partition = orig_partitions
.get(orig_partition_idx)?
.ok_or_else(|| actor_error!(not_found, "no partition {}", orig_partition_idx))?
.clone();
if !moving_partition.faults.is_empty() || !moving_partition.unproven.is_empty() {
return Err(actor_error!(forbidden, "partition with faults or unproven sectors are not allowed to move, partition_idx {}", orig_partition_idx))?;
}
if orig_deadline.early_terminations.get(orig_partition_idx) {
return Err(actor_error!(forbidden, "partition with early terminated sectors are not allowed to move, partition_idx {}", orig_partition_idx))?;
}
if !moving_partition.faulty_power.is_zero() {
return Err(actor_error!(
illegal_state,
"partition faulty_power should be zero when faults is empty, partition_idx {}",
orig_partition_idx
))?;
}

let dest_partition_idx = first_dest_partition_idx + i as u64;

// sector_count is both total sector count and total live sector count, since no sector is faulty here.
let sector_count = moving_partition.sectors.len();

// start updating orig/dest `Deadline` here

orig_deadline.total_sectors -= sector_count;
orig_deadline.live_sectors -= sector_count;

dest_deadline.total_sectors += sector_count;
dest_deadline.live_sectors += sector_count;

orig_partitions.set(orig_partition_idx, Partition::new(store)?)?;
dest_partitions.set(dest_partition_idx, moving_partition)?;
}

// update expirations_epochs Cid of Deadline.
// Note that when moving a partition from `orig_expirations_epochs` to `dest_expirations_epochs`,
// we explicitly keep the `dest_epoch` the same as `orig_epoch`, this is by design of not re-quantizing.
{
let mut epochs_to_remove = Vec::<u64>::new();
let mut orig_expirations_epochs: Array<BitField, _> =
Array::load(&orig_deadline.expirations_epochs, store)?;
let mut dest_expirations_epochs: Array<BitField, _> =
Array::load(&dest_deadline.expirations_epochs, store)?;
orig_expirations_epochs.for_each_mut(|orig_epoch, orig_bitfield| {
let dest_epoch = orig_epoch;
let mut to_bitfield =
dest_expirations_epochs.get(dest_epoch)?.cloned().unwrap_or_default();
for (i, partition_id) in partitions.iter().enumerate() {
if orig_bitfield.get(partition_id) {
orig_bitfield.unset(partition_id);
to_bitfield.set(first_dest_partition_idx + i as u64);
}
}
dest_expirations_epochs.set(dest_epoch, to_bitfield)?;

if orig_bitfield.is_empty() {
epochs_to_remove.push(orig_epoch);
}

Ok(())
})?;
if !epochs_to_remove.is_empty() {
orig_expirations_epochs.batch_delete(epochs_to_remove, true)?;
}
orig_deadline.expirations_epochs = orig_expirations_epochs.flush()?;
dest_deadline.expirations_epochs = dest_expirations_epochs.flush()?;
}

orig_deadline.partitions = orig_partitions.flush()?;
dest_deadline.partitions = dest_partitions.flush()?;

Ok(())
}
}

/// Deadline holds the state for all sectors due at a specific deadline.
Expand Down
67 changes: 67 additions & 0 deletions actors/miner/src/deadlines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,73 @@ pub fn deadline_available_for_compaction(
)
}

/// the distance between from_deadline and to_deadline clockwise in deadline unit.
fn deadline_distance(policy: &Policy, from_deadline: u64, to_deadline: u64) -> u64 {
if to_deadline >= from_deadline {
to_deadline - from_deadline
} else {
policy.wpost_period_deadlines - from_deadline + to_deadline
}
}

/// only allow moving to a nearer deadline from current one
pub fn ensure_deadline_available_for_move(
policy: &Policy,
orig_deadline: u64,
dest_deadline: u64,
current_deadline: &DeadlineInfo,
) -> Result<(), String> {
if !deadline_is_mutable(
policy,
current_deadline.period_start,
orig_deadline,
current_deadline.current_epoch,
) {
return Err(format!(
"cannot move from a deadline {}, immutable at epoch {}",
orig_deadline, current_deadline.current_epoch
));
}

if !deadline_is_mutable(
policy,
current_deadline.period_start,
dest_deadline,
current_deadline.current_epoch,
) {
return Err(format!(
"cannot move to a deadline {}, immutable at epoch {}",
dest_deadline, current_deadline.current_epoch
));
}

if deadline_distance(policy, current_deadline.index, dest_deadline)
>= deadline_distance(policy, current_deadline.index, orig_deadline)
{
return Err(format!(
"can only move to a deadline which is nearer from current deadline {}, dest_deadline {} is not nearer than orig_deadline {}",
current_deadline.index, dest_deadline, orig_deadline
));
}

Ok(())
}

// returns the nearest deadline info with index `target_deadline` that has already occured from the point of view of the current deadline(including the current deadline).
pub fn nearest_occured_deadline_info(
policy: &Policy,
current_deadline: &DeadlineInfo,
target_deadline: u64,
) -> DeadlineInfo {
// Find the proving period start for the deadline in question.
let mut pp_start = current_deadline.period_start;
if current_deadline.index < target_deadline {
pp_start -= policy.wpost_proving_period
}

new_deadline_info(policy, pp_start, target_deadline, current_deadline.current_epoch)
}

// Determine current period start and deadline index directly from current epoch and
// the offset implied by the proving period. This works correctly even for the state
// of a miner actor without an active deadline cron
Expand Down
108 changes: 71 additions & 37 deletions actors/miner/src/expiration_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::convert::TryInto;

use anyhow::{anyhow, Context};
use cid::Cid;
use fil_actors_runtime::network::EPOCHS_IN_DAY;
use fil_actors_runtime::runtime::Policy;
use fil_actors_runtime::{ActorDowncast, Array};
use fvm_ipld_amt::{Error as AmtError, ValueMut};
Expand Down Expand Up @@ -643,16 +644,16 @@ impl<'db, BS: Blockstore> ExpirationQueue<'db, BS> {
Ok(())
}

/// Note that the `epoch` parameter doesn't quantize, and assumes the entry for the epoch is non-empty.
fn remove(
&mut self,
raw_epoch: ChainEpoch,
epoch: ChainEpoch,
on_time_sectors: &BitField,
early_sectors: &BitField,
active_power: &PowerPair,
faulty_power: &PowerPair,
pledge: &TokenAmount,
) -> anyhow::Result<()> {
let epoch = self.quant.quantize_up(raw_epoch);
let mut expiration_set = self
.amt
.get(epoch.try_into()?)
Expand Down Expand Up @@ -776,46 +777,67 @@ impl<'db, BS: Blockstore> ExpirationQueue<'db, BS> {
sector_size: SectorSize,
sectors: &[SectorOnChainInfo],
) -> anyhow::Result<Vec<SectorExpirationSet>> {
let mut declared_expirations = BTreeMap::<ChainEpoch, bool>::new();
if sectors.is_empty() {
return Ok(Vec::new());
}

let mut sectors_by_number = BTreeMap::<u64, &SectorOnChainInfo>::new();
let mut all_remaining = BTreeSet::<u64>::new();
let mut declared_expirations = BTreeSet::<i64>::new();

for sector in sectors {
let q_expiration = self.quant.quantize_up(sector.expiration);
declared_expirations.insert(q_expiration, true);
declared_expirations.insert(sector.expiration);
all_remaining.insert(sector.sector_number);
sectors_by_number.insert(sector.sector_number, sector);
}

let mut expiration_groups =
Vec::<SectorExpirationSet>::with_capacity(declared_expirations.len());

for (&expiration, _) in declared_expirations.iter() {
let es = self.may_get(expiration)?;

let group = group_expiration_set(
sector_size,
&sectors_by_number,
&mut all_remaining,
es,
expiration,
);
if !group.sector_epoch_set.sectors.is_empty() {
expiration_groups.push(group);
}
let mut expiration_groups = Vec::<SectorExpirationSet>::with_capacity(sectors.len());

let mut old_end = 0i64;
for expiration in declared_expirations.iter() {
// Basically we're scanning [sector.expiration, sector.expiration+EPOCHS_IN_DAY) for active sectors.
// Since a sector may have been moved from another deadline, the possible range for an active sector is [sector.expiration, sector.expiration+EPOCHS_IN_DAY).
//
// And we're also trying to avoid scanning the same range twice by choosing a proper `start_at`.

let start_at = if *expiration > old_end {
*expiration
} else {
// +1 since the range is inclusive
old_end + 1
};
let new_end = (expiration + EPOCHS_IN_DAY - 1) as u64;

// scan range [start_at, new_end] for active sectors of interest
self.amt.for_each_while_ranged(Some(start_at as u64), None, |epoch, es| {
if epoch > new_end {
// no need to scan any more
return Ok(false);
}

let group = group_expiration_set(
sector_size,
&sectors_by_number,
&mut all_remaining,
es,
epoch as ChainEpoch,
);

if !group.sector_epoch_set.sectors.is_empty() {
expiration_groups.push(group);
}

Ok(epoch < new_end && !all_remaining.is_empty())
})?;

old_end = new_end as i64;
}

// If sectors remain, traverse next in epoch order. Remaining sectors should be
// rescheduled to expire soon, so this traversal should exit early.
if !all_remaining.is_empty() {
self.amt.for_each_while(|epoch, es| {
let epoch = epoch as ChainEpoch;
// If this set's epoch is one of our declared epochs, we've already processed it
// in the loop above, so skip processing here. Sectors rescheduled to this epoch
// would have been included in the earlier processing.
if declared_expirations.contains_key(&epoch) {
return Ok(true);
}

// Sector should not be found in EarlyExpirations which holds faults. An implicit assumption
// of grouping is that it only returns sectors with active power. ExpirationQueue should not
Expand All @@ -826,7 +848,7 @@ impl<'db, BS: Blockstore> ExpirationQueue<'db, BS> {
sector_size,
&sectors_by_number,
&mut all_remaining,
es.clone(),
es,
epoch,
);

Expand Down Expand Up @@ -911,7 +933,7 @@ fn group_expiration_set(
sector_size: SectorSize,
sectors: &BTreeMap<u64, &SectorOnChainInfo>,
include_set: &mut BTreeSet<u64>,
es: ExpirationSet,
es: &ExpirationSet,
expiration: ChainEpoch,
) -> SectorExpirationSet {
let mut sector_numbers = Vec::new();
Expand All @@ -927,14 +949,26 @@ fn group_expiration_set(
}
}

SectorExpirationSet {
sector_epoch_set: SectorEpochSet {
epoch: expiration,
sectors: sector_numbers,
power: total_power,
pledge: total_pledge,
},
expiration_set: es,
if sector_numbers.is_empty() {
SectorExpirationSet {
sector_epoch_set: SectorEpochSet {
epoch: expiration,
sectors: sector_numbers,
power: total_power,
pledge: total_pledge,
},
expiration_set: ExpirationSet::default(),
}
} else {
SectorExpirationSet {
sector_epoch_set: SectorEpochSet {
epoch: expiration,
sectors: sector_numbers,
power: total_power,
pledge: total_pledge,
},
expiration_set: es.clone(), // lazy clone
}
}
}

Expand Down
Loading

0 comments on commit 5ec2a6b

Please sign in to comment.