Skip to content

Commit

Permalink
Revert "Feature: move partition (#1326)" from nv12 (#1481)
Browse files Browse the repository at this point in the history
* Revert "Feature: move partition  (#1326)"

This reverts commit 5ec2a6b.

* ci

* run CI
  • Loading branch information
aarshkshah1992 authored and arajasek committed Nov 14, 2023
1 parent 0faa9b3 commit de366e3
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 1,332 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ fvm_ipld_encoding = "0.4.0"
fvm_ipld_blockstore = "0.2.0"
fvm_ipld_hamt = "0.8.0"
fvm_ipld_kamt = "0.3.0"
fvm_ipld_amt = { version = "0.6.2" }
fvm_ipld_amt = { version = "0.6.1" }
fvm_ipld_bitfield = "0.6.0"

# workspace
Expand Down
97 changes: 0 additions & 97 deletions actors/miner/src/deadline_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use super::{
BitFieldQueue, ExpirationSet, Partition, PartitionSectorMap, PoStPartition, PowerPair,
SectorOnChainInfo, Sectors, TerminationResult,
};

use crate::SECTORS_AMT_BITWIDTH;

// Bitwidth of AMTs determined empirically from mutation patterns and projections of mainnet data.
Expand Down Expand Up @@ -103,102 +102,6 @@ impl Deadlines {
self.due[deadline_idx as usize] = store.put_cbor(deadline, Code::Blake2b256)?;
Ok(())
}

pub fn move_partitions<BS: Blockstore>(
policy: &Policy,
store: &BS,
orig_deadline: &mut Deadline,
dest_deadline: &mut Deadline,
partitions: &BitField,
) -> anyhow::Result<()> {
let mut orig_partitions = orig_deadline.partitions_amt(store)?;
let mut dest_partitions = dest_deadline.partitions_amt(store)?;

// even though we're moving partitions intact, we still need to update orig/dest `Deadline` accordingly.

if dest_partitions.count() + partitions.len() > policy.max_partitions_per_deadline {
return Err(actor_error!(
forbidden,
"partitions in dest_deadline will exceed max_partitions_per_deadline"
))?;
}

let first_dest_partition_idx = dest_partitions.count();
for (i, orig_partition_idx) in partitions.iter().enumerate() {
let moving_partition = orig_partitions
.get(orig_partition_idx)?
.ok_or_else(|| actor_error!(not_found, "no partition {}", orig_partition_idx))?
.clone();
if !moving_partition.faults.is_empty() || !moving_partition.unproven.is_empty() {
return Err(actor_error!(forbidden, "partition with faults or unproven sectors are not allowed to move, partition_idx {}", orig_partition_idx))?;
}
if orig_deadline.early_terminations.get(orig_partition_idx) {
return Err(actor_error!(forbidden, "partition with early terminated sectors are not allowed to move, partition_idx {}", orig_partition_idx))?;
}
if !moving_partition.faulty_power.is_zero() {
return Err(actor_error!(
illegal_state,
"partition faulty_power should be zero when faults is empty, partition_idx {}",
orig_partition_idx
))?;
}

let dest_partition_idx = first_dest_partition_idx + i as u64;

let sector_count = moving_partition.sectors.len();
let live_sector_count = sector_count - moving_partition.terminated.len();

// start updating orig/dest `Deadline` here

orig_deadline.total_sectors -= sector_count;
orig_deadline.live_sectors -= live_sector_count;

dest_deadline.total_sectors += sector_count;
dest_deadline.live_sectors += live_sector_count;

orig_partitions.set(orig_partition_idx, Partition::new(store)?)?;
dest_partitions.set(dest_partition_idx, moving_partition)?;
}

// update expirations_epochs Cid of Deadline.
// Note that when moving a partition from `orig_expirations_epochs` to `dest_expirations_epochs`,
// we explicitly keep the `dest_epoch` the same as `orig_epoch`, this is by design of not re-quantizing.
{
let mut epochs_to_remove = Vec::<u64>::new();
let mut orig_expirations_epochs: Array<BitField, _> =
Array::load(&orig_deadline.expirations_epochs, store)?;
let mut dest_expirations_epochs: Array<BitField, _> =
Array::load(&dest_deadline.expirations_epochs, store)?;
orig_expirations_epochs.for_each_mut(|orig_epoch, orig_bitfield| {
let dest_epoch = orig_epoch;
let mut to_bitfield =
dest_expirations_epochs.get(dest_epoch)?.cloned().unwrap_or_default();
for (i, partition_id) in partitions.iter().enumerate() {
if orig_bitfield.get(partition_id) {
orig_bitfield.unset(partition_id);
to_bitfield.set(first_dest_partition_idx + i as u64);
}
}
dest_expirations_epochs.set(dest_epoch, to_bitfield)?;

if orig_bitfield.is_empty() {
epochs_to_remove.push(orig_epoch);
}

Ok(())
})?;
if !epochs_to_remove.is_empty() {
orig_expirations_epochs.batch_delete(epochs_to_remove, true)?;
}
orig_deadline.expirations_epochs = orig_expirations_epochs.flush()?;
dest_deadline.expirations_epochs = dest_expirations_epochs.flush()?;
}

orig_deadline.partitions = orig_partitions.flush()?;
dest_deadline.partitions = dest_partitions.flush()?;

Ok(())
}
}

/// Deadline holds the state for all sectors due at a specific deadline.
Expand Down
67 changes: 0 additions & 67 deletions actors/miner/src/deadlines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,73 +127,6 @@ pub fn deadline_available_for_compaction(
)
}

/// the distance between from_deadline and to_deadline clockwise in deadline unit.
fn deadline_distance(policy: &Policy, from_deadline: u64, to_deadline: u64) -> u64 {
if to_deadline >= from_deadline {
to_deadline - from_deadline
} else {
policy.wpost_period_deadlines - from_deadline + to_deadline
}
}

/// only allow moving to a nearer deadline from current one
pub fn ensure_deadline_available_for_move(
policy: &Policy,
orig_deadline: u64,
dest_deadline: u64,
current_deadline: &DeadlineInfo,
) -> Result<(), String> {
if !deadline_is_mutable(
policy,
current_deadline.period_start,
orig_deadline,
current_deadline.current_epoch,
) {
return Err(format!(
"cannot move from a deadline {}, immutable at epoch {}",
orig_deadline, current_deadline.current_epoch
));
}

if !deadline_is_mutable(
policy,
current_deadline.period_start,
dest_deadline,
current_deadline.current_epoch,
) {
return Err(format!(
"cannot move to a deadline {}, immutable at epoch {}",
dest_deadline, current_deadline.current_epoch
));
}

if deadline_distance(policy, current_deadline.index, dest_deadline)
>= deadline_distance(policy, current_deadline.index, orig_deadline)
{
return Err(format!(
"can only move to a deadline which is nearer from current deadline {}, dest_deadline {} is not nearer than orig_deadline {}",
current_deadline.index, dest_deadline, orig_deadline
));
}

Ok(())
}

// returns the nearest deadline info with index `target_deadline` that has already occured from the point of view of the current deadline(including the current deadline).
pub fn nearest_occured_deadline_info(
policy: &Policy,
current_deadline: &DeadlineInfo,
target_deadline: u64,
) -> DeadlineInfo {
// Find the proving period start for the deadline in question.
let mut pp_start = current_deadline.period_start;
if current_deadline.index < target_deadline {
pp_start -= policy.wpost_proving_period
}

new_deadline_info(policy, pp_start, target_deadline, current_deadline.current_epoch)
}

// Determine current period start and deadline index directly from current epoch and
// the offset implied by the proving period. This works correctly even for the state
// of a miner actor without an active deadline cron
Expand Down
108 changes: 37 additions & 71 deletions actors/miner/src/expiration_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::convert::TryInto;

use anyhow::{anyhow, Context};
use cid::Cid;
use fil_actors_runtime::network::EPOCHS_IN_DAY;
use fil_actors_runtime::runtime::Policy;
use fil_actors_runtime::{ActorDowncast, Array};
use fvm_ipld_amt::{Error as AmtError, ValueMut};
Expand Down Expand Up @@ -644,16 +643,16 @@ impl<'db, BS: Blockstore> ExpirationQueue<'db, BS> {
Ok(())
}

/// Note that the `epoch` parameter doesn't quantize, and assumes the entry for the epoch is non-empty.
fn remove(
&mut self,
epoch: ChainEpoch,
raw_epoch: ChainEpoch,
on_time_sectors: &BitField,
early_sectors: &BitField,
active_power: &PowerPair,
faulty_power: &PowerPair,
pledge: &TokenAmount,
) -> anyhow::Result<()> {
let epoch = self.quant.quantize_up(raw_epoch);
let mut expiration_set = self
.amt
.get(epoch.try_into()?)
Expand Down Expand Up @@ -777,67 +776,46 @@ impl<'db, BS: Blockstore> ExpirationQueue<'db, BS> {
sector_size: SectorSize,
sectors: &[SectorOnChainInfo],
) -> anyhow::Result<Vec<SectorExpirationSet>> {
if sectors.is_empty() {
return Ok(Vec::new());
}

let mut declared_expirations = BTreeMap::<ChainEpoch, bool>::new();
let mut sectors_by_number = BTreeMap::<u64, &SectorOnChainInfo>::new();
let mut all_remaining = BTreeSet::<u64>::new();
let mut declared_expirations = BTreeSet::<i64>::new();

for sector in sectors {
declared_expirations.insert(sector.expiration);
let q_expiration = self.quant.quantize_up(sector.expiration);
declared_expirations.insert(q_expiration, true);
all_remaining.insert(sector.sector_number);
sectors_by_number.insert(sector.sector_number, sector);
}

let mut expiration_groups = Vec::<SectorExpirationSet>::with_capacity(sectors.len());

let mut old_end = 0i64;
for expiration in declared_expirations.iter() {
// Basically we're scanning [sector.expiration, sector.expiration+EPOCHS_IN_DAY) for active sectors.
// Since a sector may have been moved from another deadline, the possible range for an active sector is [sector.expiration, sector.expiration+EPOCHS_IN_DAY).
//
// And we're also trying to avoid scanning the same range twice by choosing a proper `start_at`.

let start_at = if *expiration > old_end {
*expiration
} else {
// +1 since the range is inclusive
old_end + 1
};
let new_end = (expiration + EPOCHS_IN_DAY - 1) as u64;

// scan range [start_at, new_end] for active sectors of interest
self.amt.for_each_while_ranged(Some(start_at as u64), None, |epoch, es| {
if epoch > new_end {
// no need to scan any more
return Ok(false);
}

let group = group_expiration_set(
sector_size,
&sectors_by_number,
&mut all_remaining,
es,
epoch as ChainEpoch,
);

if !group.sector_epoch_set.sectors.is_empty() {
expiration_groups.push(group);
}

Ok(epoch < new_end && !all_remaining.is_empty())
})?;

old_end = new_end as i64;
let mut expiration_groups =
Vec::<SectorExpirationSet>::with_capacity(declared_expirations.len());

for (&expiration, _) in declared_expirations.iter() {
let es = self.may_get(expiration)?;

let group = group_expiration_set(
sector_size,
&sectors_by_number,
&mut all_remaining,
es,
expiration,
);
if !group.sector_epoch_set.sectors.is_empty() {
expiration_groups.push(group);
}
}

// If sectors remain, traverse next in epoch order. Remaining sectors should be
// rescheduled to expire soon, so this traversal should exit early.
if !all_remaining.is_empty() {
self.amt.for_each_while(|epoch, es| {
let epoch = epoch as ChainEpoch;
// If this set's epoch is one of our declared epochs, we've already processed it
// in the loop above, so skip processing here. Sectors rescheduled to this epoch
// would have been included in the earlier processing.
if declared_expirations.contains_key(&epoch) {
return Ok(true);
}

// Sector should not be found in EarlyExpirations which holds faults. An implicit assumption
// of grouping is that it only returns sectors with active power. ExpirationQueue should not
Expand All @@ -848,7 +826,7 @@ impl<'db, BS: Blockstore> ExpirationQueue<'db, BS> {
sector_size,
&sectors_by_number,
&mut all_remaining,
es,
es.clone(),
epoch,
);

Expand Down Expand Up @@ -933,7 +911,7 @@ fn group_expiration_set(
sector_size: SectorSize,
sectors: &BTreeMap<u64, &SectorOnChainInfo>,
include_set: &mut BTreeSet<u64>,
es: &ExpirationSet,
es: ExpirationSet,
expiration: ChainEpoch,
) -> SectorExpirationSet {
let mut sector_numbers = Vec::new();
Expand All @@ -949,26 +927,14 @@ fn group_expiration_set(
}
}

if sector_numbers.is_empty() {
SectorExpirationSet {
sector_epoch_set: SectorEpochSet {
epoch: expiration,
sectors: sector_numbers,
power: total_power,
pledge: total_pledge,
},
expiration_set: ExpirationSet::default(),
}
} else {
SectorExpirationSet {
sector_epoch_set: SectorEpochSet {
epoch: expiration,
sectors: sector_numbers,
power: total_power,
pledge: total_pledge,
},
expiration_set: es.clone(), // lazy clone
}
SectorExpirationSet {
sector_epoch_set: SectorEpochSet {
epoch: expiration,
sectors: sector_numbers,
power: total_power,
pledge: total_pledge,
},
expiration_set: es,
}
}

Expand Down
Loading

0 comments on commit de366e3

Please sign in to comment.