Skip to content

Commit

Permalink
Merge pull request #660 from lavanet/CNS-496-fixation-allow-append-af…
Browse files Browse the repository at this point in the history
…ter-delete

CNS-496: fixation allow append after delete
  • Loading branch information
Yaroms authored Aug 2, 2023
2 parents b2cc577 + ced7989 commit 810603e
Show file tree
Hide file tree
Showing 5 changed files with 942 additions and 3,465 deletions.
167 changes: 114 additions & 53 deletions common/fixation_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@ import (
// its previous data. An entry version can be modified using ModifyEntry().
//
// GetEntry() returns the latest (with respect to current, not future) version of an entry. A
// nearest-no-later (than a given block) entry version can be obtained with FindEntry().
// nearest-no-later (than a given block) entry version can be obtained with FindEntry(); And
// FindEntry2() is similar, but also returns the entry's version (block).
//
// Entry versions maintain reference count (refcount) that determine their lifetime. New entry
// versions (appended) start with refcount 1. The refcount of the latest version is incremented
// using GetEntry(). The refcount of any version is decremented using PutEntry(). The refcount
// of the latest version is also decremented when a newer version is appended.
// using GetEntry(). The refcount of the latest version is decremented when a newer version is
// appended, or when a future version becomes in effect. References taken with GetEntry() can
// be dropped (and the refcount decremented) by using PutEntry(). PutEntry() can also be used
// to cancel/remove a future entry.
//
// When an entry's refcount reaches 0, it remains partly visible for a predefined period of
// blocks (stale-period), and then becomes stale and fully invisible. During the stale-period
Expand All @@ -73,16 +76,17 @@ import (
//
// An entry can be deleted using DelEntry(), after which is will not possible to GetEntry(),
// and calls to FindEntry() for a block beyond that time of deletion (at or later) would fail
// too. Until the data has been fully cleaned up (i.e. no refcount and past all stale-periods),
// calls to AppendEntry() with that entry's index will fail.
// too. (Note that DelEntry() will also discard any pending future versions of the entry).
//
// The per-entry APIs are and their actions are summarized below:
//
// o AppendEntry() adds a new version of an entry (could be a future version)
// o ModifyEntry() updates an existing version of an entry (could be a future version)
// o DelEntry() deletes and entry and make it invisible to GetEntry()
// o GetEntry() gets the latest (up to current) version of an entry, except if in stale-period
// o HasEntry() checks for existence of a specific version of an entry
// o FindEntry() gets the nearest-no-later version of an entry, including if in stale-period
// o FindEntry2() same as FindEntry(), and also returns the version (block) of the entry
// o ReadEntry() gets a specific entry version (stale or not)
// o PutEntry() drops reference counts taken by GetEntry()
//
Expand Down Expand Up @@ -130,9 +134,38 @@ import (
// count reaches 0 it marks them for deletion. The actual deletion takes place after
// a fixed number of epochs has passed.
//
// 5. When an entry (index) is deleted, a placeholder entry is a appended to mark the
// block at which deletion took place. This ensures that FindEntry() for the previous
// latest entry version would continue to work until that block and fail thereafter.
// 5. When an entry (index) is deleted, it is kept as a placeholder entry to mark the
// block at which deletion took place. This ensures that FindEntry() for that latest
// entry version would continue to work until that block and fail thereafter.
//
// State invariants:
//
// o Refcount: tracks the reference count of entry versions
// - extra refcount kept for latest and future entry versions
// - incremented for latest entry version with GetEntry()
// - decremented for specific entry version with PutEntry()
// - every PutEntry() must match a previous GetEntry()
// - PutEntry() can be used to remove a specific future entry
// - PutEntry() is illegal to call on last refcount of latest entry
//
// o IsLatest: tracks the entry version which is now the latest
// - transferred to the next (future) entry when it matures
// - removed from delete entries
//
// o DeleteAt: tracks the block upon which the entry will be deleted
// - held by the latest entry version smaller than or equal to that DeleteAt
// - prohibits future entry versions on or beyond the DeleteAt block
//
// o StaleAt: tracks when a removed entry version becomes invisible
// - set on non-future entries that reach refcount == 0
//
// Note that AppendEntry() may add retroactive entry only if it does not precede an
// existing latest entry. Also, AppendEntry() may not add future entries on or beyond
// a DeletedAt in the future (adding a future entry maturing on a future DeleteAt is
// permitted but useless, because it will be deleted straight away).
//
// Note also that DelEntry() discards all future entries on or beyond the DeletedAt
// block (and owing to the previous rule, none will be added until deletes occurs).

type FixationStore struct {
storeKey storetypes.StoreKey
Expand All @@ -155,10 +188,10 @@ func FixationVersion() uint64 {
// fire in order of expiry blocks, and in order of timeout kind.

const (
// NOTE: TimerFutureEntry should be smaller than timerDeleteEntry, to
// ensure that it fires first (because the latter will removes future entries,
// so otherwise if they both expire on the same block then the future entry
// callback will be surprised when it cannot find the respective entry).
// NOTE: timerFutureEntry should be smaller than timerDeleteEntry, to ensure
// that if both expire on the same block then future-entry timer fires before
// delete-entry timer, so that the latter would remove the brand newly matured
// future-entry.
timerFutureEntry = 0x01
timerDeleteEntry = 0x02
timerStaleEntry = 0x03
Expand Down Expand Up @@ -257,7 +290,8 @@ func (fs *FixationStore) AppendEntry(
var latest bool

// if latest entry is not found, this is a first version entry
if !found {
// if latest entry is found but is deleted, also treat like a first version entry
if !found || latestEntry.IsDeleted(ctx) {
fs.setEntryIndex(ctx, safeIndex, true)
if block <= ctxBlock {
latest = true
Expand All @@ -280,57 +314,68 @@ func (fs *FixationStore) AppendEntry(
// appending an entry, while can happen with a past block, must not succeed if it
// precedes en existing latest entry.

if block <= ctxBlock && !latestEntry.IsLatest {
if block < ctxBlock && !latestEntry.IsLatest {
return utils.LavaFormatWarning("AppendEntry",
fmt.Errorf("entry is older than existing latest entry"),
utils.Attribute{Key: "index", Value: index},
utils.Attribute{Key: "block", Value: block},
)
}

// temporary: do not allow adding new entries for an index that was deleted
// and still not fully cleaned up (e.g. not stale or with references held)
// if the new entry's block is equal to the nearest-no-later entry, overwrite that
// entry (only possible for latest and future entries, as the previous test would
// exclude earlier-than-latest entries).

if block == latestEntry.Block {
fs.ModifyEntry(ctx, index, block, entryData)
return nil
}

// we know the entry is not yet deleted (as tested above), so this is an attempt
// to append on or beyond a pending delete, which is not permitted.

if latestEntry.IsDeletedBy(block) {
return utils.LavaFormatWarning("AppendEntry",
fmt.Errorf("entry already deleted and pending cleanup"),
fmt.Errorf("entry disallows future version on or beyond pending delete"),
utils.Attribute{Key: "index", Value: index},
utils.Attribute{Key: "block", Value: block},
utils.Attribute{Key: "delete", Value: latestEntry.DeleteAt},
)
}

// if the new entry's block is equal to the latest entry, overwrite the latest entry
if block == latestEntry.Block {
fs.ModifyEntry(ctx, index, block, entryData)
return nil
}

// if the previous latest entry is marked with DeleteAt which is set to expire after
// this future entry's maturity (block), then transfer this DeleteAt to the future
// entry, and then replace the old timer with a new timer (below)
// (note: deletion, if any, cannot be for the current block, since it would have been
// processed at the beginning of the block, and AppendEntry would fail earlier).
// if the previous latest entry is marked with DeleteAt, and this future entry is set
// to mature strictly before that DeleteAt block (and specifically not at that block),
// then transfer this DeleteAt to the future entry, and then replace the old timer
// with a new timer (see below).

if latestEntry.HasDeleteAt() { // already know !latestEntry.IsDeletedBy(block)
if latestEntry.HasDeleteAt() {
// marked for delete that would take place after this future entry - transfer
// the marking to this future entry (which by now we know is later)
deleteAt = latestEntry.DeleteAt
latestEntry.DeleteAt = math.MaxUint64
fs.setEntry(ctx, latestEntry)
}
}

// if we are superseding a previous latest entry, then drop the latter's refcount;
// otherwise we are a future entry version, so set a timer for when it will become
// the new latest entry.

if block <= ctxBlock {
// the new entry is becoming the latest, put reference on the previous entry.
// (the latestEntry must be marked with IsLatest, because the test above would
// have returned an error otherwise.
latest = true
// if we are superseding a previous latest entry, then drop the latter's refcount (but
// not if the latest entry is deleted, since it lost its extra reference upon deletion);
// otherwise, if we are a future entry version, set a timer for when it will become the
// new latest entry.

if block <= ctxBlock {
// the new entry is becoming the latest now: mark as such
latest = true
if latestEntry.IsLatest {
// the previous entry was the latest and will no longer be so going forward, so
// drop the "latest" reference from that previous entry.
latestEntry.IsLatest = false
fs.putEntry(ctx, latestEntry)
} else {
// the new entry is not yet in effect, create a timer to put reference back to the latest once this is changed
key := encodeForTimer(safeIndex, block, timerFutureEntry)
fs.tstore.AddTimerByBlockHeight(ctx, block, key, []byte{})
fs.putEntry(ctx, latestEntry) // also saves updated latestEntry
}
} else {
// the new entry is not yet in effect, create a timer to put reference back to
// the latest once this is changed.
key := encodeForTimer(safeIndex, block, timerFutureEntry)
fs.tstore.AddTimerByBlockHeight(ctx, block, key, []byte{})
}

// marshal the new entry's data
Expand Down Expand Up @@ -417,7 +462,7 @@ func (fs *FixationStore) updateFutureEntry(ctx sdk.Context, safeIndex types.Safe
// because from now on it is no longer so.

latestEntry.IsLatest = false
fs.putEntry(ctx, latestEntry)
fs.putEntry(ctx, latestEntry) // also saves updated latestEntry
}

// we are now the latest entry - make it official by setting IsLatest
Expand All @@ -442,11 +487,13 @@ func (fs *FixationStore) deleteMarkedEntry(ctx sdk.Context, safeIndex types.Safe
)
}

entry.IsLatest = false

fs.setEntryIndex(ctx, safeIndex, false)
fs.putEntry(ctx, entry)

// no need to trim future entries: they were trimmed at the time of DelEntry,
// and since then new entries (beyond DeleteAt block) could not be appended.
// no need to trim future entries: they were trimmed at the time of DelEntry, and
// since then new entries (on or beyond DeleteAt block) could not be appended.
}

func (fs *FixationStore) deleteStaleEntries(ctx sdk.Context, safeIndex types.SafeIndex, _ uint64) {
Expand Down Expand Up @@ -526,7 +573,8 @@ func (fs *FixationStore) deleteStaleEntries(ctx sdk.Context, safeIndex types.Saf
}
}

// trimFutureEntries discards all future entries (relative to the current block)
// trimFutureEntries discards all future entries (relative to the DeleteAt of the
// entry version marked for deletion)
func (fs *FixationStore) trimFutureEntries(ctx sdk.Context, lastEntry types.Entry) {
store := fs.getEntryStore(ctx, lastEntry.SafeIndex())

Expand All @@ -540,7 +588,7 @@ func (fs *FixationStore) trimFutureEntries(ctx sdk.Context, lastEntry types.Entr
var entry types.Entry
fs.cdc.MustUnmarshal(iterator.Value(), &entry)

if entry.Block <= lastEntry.DeleteAt {
if entry.Block < lastEntry.DeleteAt {
break
}

Expand Down Expand Up @@ -803,7 +851,20 @@ func (fs *FixationStore) DelEntry(ctx sdk.Context, index string, block uint64) e
)
}

entry, found := fs.getUnmarshaledEntryForBlock(ctx, safeIndex, block)
// if this is a future delete, then we search for a strictly nearest-smaller
// previous entry (rather than nearest-no-later): if there exists an entry at
// that block, then we would trim it away (if we let it be, it would anyway be
// removed when DeleteAt is reached; and otherwise, it would make appending a
// new entry immediately after (and in same block) as the delete messy, since
// the new entry would need to overwrite a deleted previous entry (same block);
// thus it's simpler to not permit such future entry on the DeleteAt block.

deleteAt := block
if block > ctxBlock {
deleteAt--
}

entry, found := fs.getUnmarshaledEntryForBlock(ctx, safeIndex, deleteAt)
if !found || entry.HasDeleteAt() {
return legacyerrors.ErrNotFound
}
Expand All @@ -822,9 +883,9 @@ func (fs *FixationStore) DelEntry(ctx sdk.Context, index string, block uint64) e
fs.tstore.AddTimerByBlockHeight(ctx, block, key, []byte{})
}

// discard all future entries beyond the DeleteAt block, since they will never
// become the latest; and there will be no new entries beyond DeleteAt block
// as AppendEntry() does not allow such additions.
// discard all pending future entries on or beyond the DeleteAt block, since they
// will never become the latest; and there will be no new entries on or beyond
// DeleteAt block as AppendEntry() does not allow such additions.

fs.trimFutureEntries(ctx, entry)

Expand Down Expand Up @@ -883,7 +944,7 @@ func (fs *FixationStore) getEntryVersionsFilter(ctx sdk.Context, index string, b
}
}

// reverse the result slice to return the block in ascending order
// reverse the result slice to return the blocks in ascending order
length := len(blocks)
for i := 0; i < length/2; i++ {
blocks[i], blocks[length-i-1] = blocks[length-i-1], blocks[i]
Expand Down
Loading

0 comments on commit 810603e

Please sign in to comment.