Skip to content
This repository has been archived by the owner on Jun 6, 2023. It is now read-only.

prevent faults from double subtracting cc upgrade power #1129

Merged
merged 25 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 163 additions & 26 deletions actors/builtin/miner/expiration_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (q ExpirationQueue) AddActiveSectors(sectors []*SectorOnChainInfo, ssize ab
var totalSectors []bitfield.BitField
noEarlySectors := bitfield.New()
noFaultyPower := NewPowerPairZero()
for _, group := range groupSectorsByExpiration(ssize, sectors, q.quant) {
for _, group := range groupNewSectorsByDeclaredExpiration(ssize, sectors, q.quant) {
snos := bitfield.NewFromSet(group.sectors)
if err := q.add(group.epoch, snos, noEarlySectors, group.power, noFaultyPower, group.pledge); err != nil {
return bitfield.BitField{}, NewPowerPairZero(), big.Zero(), xerrors.Errorf("failed to record new sector expirations: %w", err)
Expand Down Expand Up @@ -224,37 +224,38 @@ func (q ExpirationQueue) RescheduleAsFaults(newExpiration abi.ChainEpoch, sector
rescheduledPower := NewPowerPairZero()

// Group sectors by their target expiration, then remove from existing queue entries according to those groups.
for _, group := range groupSectorsByExpiration(ssize, sectors, q.quant) {
groups, err := q.findSectorsByExpiration(ssize, sectors)
if err != nil {
return NewPowerPairZero(), err
}

for _, group := range groups {
var err error
var es ExpirationSet
if err = q.mustGet(group.epoch, &es); err != nil {
return NewPowerPairZero(), err
}
if group.epoch <= q.quant.QuantizeUp(newExpiration) {
// Don't reschedule sectors that are already due to expire on-time before the fault-driven expiration,
// but do represent their power as now faulty.
// Their pledge remains as "on-time".
es.ActivePower = es.ActivePower.Sub(group.power)
es.FaultyPower = es.FaultyPower.Add(group.power)
group.expirationSet.ActivePower = group.expirationSet.ActivePower.Sub(group.power)
group.expirationSet.FaultyPower = group.expirationSet.FaultyPower.Add(group.power)
expiringPower = expiringPower.Add(group.power)
} else {
// Remove sectors from on-time expiry and active power.
sectorsBf := bitfield.NewFromSet(group.sectors)
if es.OnTimeSectors, err = bitfield.SubtractBitField(es.OnTimeSectors, sectorsBf); err != nil {
if group.expirationSet.OnTimeSectors, err = bitfield.SubtractBitField(group.expirationSet.OnTimeSectors, sectorsBf); err != nil {
return NewPowerPairZero(), err
}
es.OnTimePledge = big.Sub(es.OnTimePledge, group.pledge)
es.ActivePower = es.ActivePower.Sub(group.power)
group.expirationSet.OnTimePledge = big.Sub(group.expirationSet.OnTimePledge, group.pledge)
group.expirationSet.ActivePower = group.expirationSet.ActivePower.Sub(group.power)

// Accumulate the sectors and power removed.
sectorsTotal = append(sectorsTotal, group.sectors...)
rescheduledPower = rescheduledPower.Add(group.power)
}
if err = q.mustUpdateOrDelete(group.epoch, &es); err != nil {
if err = q.mustUpdateOrDelete(group.epoch, group.expirationSet); err != nil {
return NewPowerPairZero(), err
}

if err = es.ValidateState(); err != nil {
if err = group.expirationSet.ValidateState(); err != nil {
return NewPowerPairZero(), err
}
}
Expand Down Expand Up @@ -591,8 +592,10 @@ func (q ExpirationQueue) remove(rawEpoch abi.ChainEpoch, onTimeSectors, earlySec
pledge abi.TokenAmount) error {
epoch := q.quant.QuantizeUp(rawEpoch)
var es ExpirationSet
if err := q.mustGet(epoch, &es); err != nil {
return err
if found, err := q.Array.Get(uint64(epoch), &es); err != nil {
return xerrors.Errorf("failed to lookup queue epoch %v: %w", epoch, err)
} else if !found {
return xerrors.Errorf("missing expected expiration set at epoch %v", epoch)
}

if err := es.Remove(onTimeSectors, earlySectors, pledge, activePower, faultyPower); err != nil {
Expand All @@ -610,7 +613,12 @@ func (q ExpirationQueue) removeActiveSectors(sectors []*SectorOnChainInfo, ssize
noFaultyPower := NewPowerPairZero()

// Group sectors by their expiration, then remove from existing queue entries according to those groups.
for _, group := range groupSectorsByExpiration(ssize, sectors, q.quant) {
groups, err := q.findSectorsByExpiration(ssize, sectors)
if err != nil {
return bitfield.BitField{}, NewPowerPairZero(), big.Zero(), err
}

for _, group := range groups {
sectorsBf := bitfield.NewFromSet(group.sectors)
if err := q.remove(group.epoch, sectorsBf, noEarlySectors, group.power, noFaultyPower, group.pledge); err != nil {
return bitfield.BitField{}, NewPowerPairZero(), big.Zero(), err
Expand Down Expand Up @@ -658,6 +666,13 @@ func (q ExpirationQueue) traverseMutate(f func(epoch abi.ChainEpoch, es *Expirat
return nil
}

func (q ExpirationQueue) traverse(f func(epoch abi.ChainEpoch, es *ExpirationSet) (keepGoing bool, err error)) error {
return q.traverseMutate(func(epoch abi.ChainEpoch, es *ExpirationSet) (bool, bool, error) {
keepGoing, err := f(epoch, es)
return false, keepGoing, err
})
}

func (q ExpirationQueue) mayGet(key abi.ChainEpoch) (*ExpirationSet, error) {
es := NewExpirationSetEmpty()
if _, err := q.Array.Get(uint64(key), es); err != nil {
Expand All @@ -666,15 +681,6 @@ func (q ExpirationQueue) mayGet(key abi.ChainEpoch) (*ExpirationSet, error) {
return es, nil
}

func (q ExpirationQueue) mustGet(key abi.ChainEpoch, es *ExpirationSet) error {
if found, err := q.Array.Get(uint64(key), es); err != nil {
return xerrors.Errorf("failed to lookup queue epoch %v: %w", key, err)
} else if !found {
return xerrors.Errorf("missing expected expiration set at epoch %v", key)
}
return nil
}

func (q ExpirationQueue) mustUpdate(epoch abi.ChainEpoch, es *ExpirationSet) error {
if err := q.Array.Set(uint64(epoch), es); err != nil {
return xerrors.Errorf("failed to set queue epoch %v: %w", epoch, err)
Expand Down Expand Up @@ -703,11 +709,16 @@ type sectorEpochSet struct {
pledge abi.TokenAmount
}

type sectorExpirationSet struct {
sectorEpochSet
expirationSet *ExpirationSet
}

// Takes a slice of sector infos and returns sector info sets grouped and
// sorted by expiration epoch, quantized.
//
// Note: While the result is sorted by epoch, the order of per-epoch sectors is maintained.
func groupSectorsByExpiration(sectorSize abi.SectorSize, sectors []*SectorOnChainInfo, quant QuantSpec) []sectorEpochSet {
func groupNewSectorsByDeclaredExpiration(sectorSize abi.SectorSize, sectors []*SectorOnChainInfo, quant QuantSpec) []sectorEpochSet {
sectorsByExpiration := make(map[abi.ChainEpoch][]*SectorOnChainInfo)

for _, sector := range sectors {
Expand Down Expand Up @@ -740,3 +751,129 @@ func groupSectorsByExpiration(sectorSize abi.SectorSize, sectors []*SectorOnChai
})
return sectorEpochSets
}

// Groups sectors into sets based on their Expiration field.
// If sectors are not found in the expiration set corresponding to their expiration field
// (i.e. they have been rescheduled) traverse expiration sets to for groups where these
// sectors actually expire.
// Groups will be returned in expiration order, earliest first.
func (q *ExpirationQueue) findSectorsByExpiration(sectorSize abi.SectorSize, sectors []*SectorOnChainInfo) ([]sectorExpirationSet, error) {
declaredExpirations := make(map[abi.ChainEpoch]bool, len(sectors))
sectorsByNumber := make(map[uint64]*SectorOnChainInfo, len(sectors))
allRemaining := make(map[uint64]struct{})
expirationGroups := make([]sectorExpirationSet, 0, len(declaredExpirations))

for _, sector := range sectors {
qExpiration := q.quant.QuantizeUp(sector.Expiration)
declaredExpirations[qExpiration] = true
allRemaining[uint64(sector.SectorNumber)] = struct{}{}
sectorsByNumber[uint64(sector.SectorNumber)] = sector
}

// Traverse expiration sets first by expected expirations. This will find all groups if no sectors have been rescheduled.
// This map iteration is non-deterministic but safe because we sort by epoch below.
for expiration := range declaredExpirations { //nolint:nomaprange // result is subsequently sorted
es, err := q.mayGet(expiration)
if err != nil {
return nil, err
}

// create group from overlap
var group sectorExpirationSet
group, allRemaining, err = groupExpirationSet(sectorSize, sectorsByNumber, allRemaining, es, expiration)
if err != nil {
return nil, err
}
if len(group.sectors) > 0 {
expirationGroups = append(expirationGroups, group)
}
}

// If sectors remain, traverse next in epoch order. Remaining sectors should be rescheduled to expire soon, so
// this traversal should exit early.
if len(allRemaining) > 0 {
err := q.traverse(func(epoch abi.ChainEpoch, es *ExpirationSet) (bool, error) {
// If this set's epoch is one of our declared epochs, we've already processed it in the loop above,
// so skip processing here. Sectors rescheduled to this epoch would have been included in the earlier processing.
if _, found := declaredExpirations[epoch]; found {
return true, nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking: this needs to check es.EarlySectors as well. Especially for the case where the sector has moved because it was rescheduled. I'm a bit surprised this escaped the great testing.

I think to hit this case we need to fault a sector (thus rescheduling it) and then terminate it (the other call path to this point, since it can't be faulted again).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, findSectorsByExpiration is only used in the context of non-faulty sectors.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e. when terminating a faulty sector as you mentioned, EQ.RemoveSectors treats faulty vs non-faulty terminations differently. The faulty sectors are removed by traversing the expiration queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm with @wadealexc that I don't think we could exercise this code. If we stop assuming that grouped sectors can be a mix of faulty and non-faulty sectors, our downstream accounting gets a lot more complicated. We could add an assertion that no sectors in the set are early. I tried it an the tests still pass.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring es.EarlySectors (now down in groupExpirationSet) still makes me quite uncomfortable. Adding an error check that no such sectors are being sought would help.


// Sector should not be found in EarlyExpirations which holds faults. An implicit assumption
// of grouping is that it only returns sectors with active power. ExpirationQueue should not
// provide operations that allow this to happen.
if err := assertNoEarlySectors(allRemaining, es); err != nil {
return true, err
}

var group sectorExpirationSet
var err error
group, allRemaining, err = groupExpirationSet(sectorSize, sectorsByNumber, allRemaining, es, epoch)
if err != nil {
return false, err
}
if len(group.sectors) > 0 {
expirationGroups = append(expirationGroups, group)
}

return len(allRemaining) > 0, nil
})
if err != nil {
return nil, err
}
}

if len(allRemaining) > 0 {
return nil, xerrors.New("some sectors not found in expiration queue")
}

// sort groups, earliest first.
sort.Slice(expirationGroups, func(i, j int) bool {
return expirationGroups[i].epoch < expirationGroups[j].epoch
})
return expirationGroups, nil
acruikshank marked this conversation as resolved.
Show resolved Hide resolved
}

// Takes a slice of sector infos a bitfield of sector numbers and returns a single group for all bitfield sectors
// Also returns a bitfield containing sectors not found in expiration set.
// This method mutates includeSet by removing sector numbers of sectors found in expiration set.
func groupExpirationSet(sectorSize abi.SectorSize, sectors map[uint64]*SectorOnChainInfo,
includeSet map[uint64]struct{}, es *ExpirationSet, expiration abi.ChainEpoch,
) (sectorExpirationSet, map[uint64]struct{}, error) {
var sectorNumbers []uint64
totalPower := NewPowerPairZero()
totalPledge := big.Zero()
err := es.OnTimeSectors.ForEach(func(u uint64) error {
if _, found := includeSet[u]; found {
sector := sectors[u]
sectorNumbers = append(sectorNumbers, u)
totalPower = totalPower.Add(PowerForSector(sectorSize, sector))
totalPledge = big.Add(totalPledge, sector.InitialPledge)
delete(includeSet, u)
}
return nil
})
if err != nil {
return sectorExpirationSet{}, nil, err
}

return sectorExpirationSet{
sectorEpochSet: sectorEpochSet{
epoch: expiration,
sectors: sectorNumbers,
power: totalPower,
pledge: totalPledge,
},
expirationSet: es,
}, includeSet, nil
}

// assertNoEarlySectors checks for an invalid overlap between a bitfield an a set's early sectors.
func assertNoEarlySectors(set map[uint64]struct{}, es *ExpirationSet) error {
return es.EarlySectors.ForEach(func(u uint64) error {
if _, found := set[u]; found {
return xerrors.Errorf("Invalid attempt to group sector %d with an early expiration", u)
}
return nil
})
}
4 changes: 2 additions & 2 deletions actors/builtin/miner/expiration_queue_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestExpirations(t *testing.T) {
testSector(14, 3, 0, 0, 0),
testSector(13, 4, 0, 0, 0),
}
result := groupSectorsByExpiration(2048, sectors, quant)
result := groupNewSectorsByDeclaredExpiration(2048, sectors, quant)
expected := []*sectorEpochSet{{
epoch: 13,
sectors: []uint64{1, 2, 4},
Expand All @@ -37,7 +37,7 @@ func TestExpirations(t *testing.T) {

func TestExpirationsEmpty(t *testing.T) {
sectors := []*SectorOnChainInfo{}
result := groupSectorsByExpiration(2048, sectors, NoQuantization)
result := groupNewSectorsByDeclaredExpiration(2048, sectors, NoQuantization)
expected := []sectorEpochSet{}
require.Equal(t, expected, result)
}
Expand Down
38 changes: 38 additions & 0 deletions actors/builtin/miner/expiration_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,43 @@ func TestExpirationQueue(t *testing.T) {
assert.True(t, set.FaultyPower.Equals(miner.NewPowerPairZero()))
})

t.Run("reschedule expirations then reschedule as fault", func(t *testing.T) {
// Create expiration 3 sets with 2 sectors apiece
queue := emptyExpirationQueueWithQuantizing(t, miner.NewQuantSpec(4, 1))
_, _, _, err := queue.AddActiveSectors(sectors, sectorSize)
require.NoError(t, err)

_, err = queue.Root()
require.NoError(t, err)

// reschedule 2 from second group to first
toReschedule := []*miner.SectorOnChainInfo{sectors[2]}
err = queue.RescheduleExpirations(2, toReschedule, sectorSize)
require.NoError(t, err)

// now reschedule one sector in first group and another in second group as faults to expire in first set
faults := []*miner.SectorOnChainInfo{sectors[1], sectors[2]}
power, err := queue.RescheduleAsFaults(4, faults, sectorSize)
require.NoError(t, err)

expectedPower := miner.PowerForSectors(sectorSize, faults)
assert.Equal(t, expectedPower, power)

// expect 0, 1, 2, 3 in first group
set, err := queue.PopUntil(5)
require.NoError(t, err)
assertBitfieldEquals(t, set.OnTimeSectors, 1, 2, 3)
assert.Equal(t, miner.PowerForSectors(sectorSize, []*miner.SectorOnChainInfo{sectors[0]}), set.ActivePower)
assert.Equal(t, expectedPower, set.FaultyPower)

// expect rest to come later
set, err = queue.PopUntil(20)
require.NoError(t, err)
assertBitfieldEquals(t, set.OnTimeSectors, 4, 5, 6)
assert.Equal(t, miner.PowerForSectors(sectorSize, []*miner.SectorOnChainInfo{sectors[3], sectors[4], sectors[5]}), set.ActivePower)
assert.Equal(t, miner.NewPowerPairZero(), set.FaultyPower)
})

t.Run("reschedule recover restores all sector stats", func(t *testing.T) {
// Create expiration 3 sets with 2 sectors apiece
queue := emptyExpirationQueueWithQuantizing(t, miner.NewQuantSpec(4, 1))
Expand Down Expand Up @@ -650,6 +687,7 @@ func TestExpirationQueue(t *testing.T) {
require.NoError(t, err)
assert.Zero(t, queue.Length())
})

t.Run("rescheduling no expirations leaves the queue empty", func(t *testing.T) {
queue := emptyExpirationQueueWithQuantizing(t, miner.NewQuantSpec(4, 1))
err := queue.RescheduleExpirations(10, nil, sectorSize)
Expand Down
7 changes: 6 additions & 1 deletion actors/builtin/miner/miner_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,7 @@ func (st *State) AdvanceDeadline(store adt.Store, currEpoch abi.ChainEpoch) (*Ad
pledgeDelta := abi.NewTokenAmount(0)
powerDelta := NewPowerPairZero()

var totalFaultyPower PowerPair
detectedFaultyPower := NewPowerPairZero()

// Note: Use dlInfo.Last() rather than rt.CurrEpoch unless certain
Expand Down Expand Up @@ -1047,6 +1048,10 @@ func (st *State) AdvanceDeadline(store adt.Store, currEpoch abi.ChainEpoch) (*Ad
if err != nil {
return nil, xerrors.Errorf("failed to process end of deadline %d: %w", dlInfo.Index, err)
}

// Capture deadline's faulty power after new faults have been detected, but before it is
// dropped along with faulty sectors expiring this round.
totalFaultyPower = deadline.FaultyPower
}
{
// Expire sectors that are due, either for on-time expiration or "early" faulty-for-too-long.
Expand Down Expand Up @@ -1095,7 +1100,7 @@ func (st *State) AdvanceDeadline(store adt.Store, currEpoch abi.ChainEpoch) (*Ad
pledgeDelta,
powerDelta,
detectedFaultyPower,
deadline.FaultyPower,
totalFaultyPower,
}, nil
}

Expand Down
Loading