Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNS-496: fixation allow append after delete #660

Merged
merged 14 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 114 additions & 53 deletions common/fixation_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@ import (
// its previous data. An entry version can be modified using ModifyEntry().
//
// GetEntry() returns the latest (with respect to current, not future) version of an entry. A
// nearest-no-later (than a given block) entry version can be obtained with FindEntry().
// nearest-no-later (than a given block) entry version can be obtained with FindEntry(); And
// FindEntry2() is similar, but also returns the entry's version (block).
//
// Entry versions maintain reference count (refcount) that determine their lifetime. New entry
// versions (appended) start with refcount 1. The refcount of the latest version is incremented
// using GetEntry(). The refcount of any version is decremented using PutEntry(). The refcount
// of the latest version is also decremented when a newer version is appended.
// using GetEntry(). The refcount of the latest version is decremented when a newer version is
// appended, or when a future version becomes in effect. References taken with GetEntry() can
// be dropped (and the refcount decremented) by using PutEntry(). PutEntry() can also be used
// to cancel/remove a future entry.
//
// When an entry's refcount reaches 0, it remains partly visible for a predefined period of
// blocks (stale-period), and then becomes stale and fully invisible. During the stale-period
Expand All @@ -73,16 +76,17 @@ import (
//
// An entry can be deleted using DelEntry(), after which is will not possible to GetEntry(),
// and calls to FindEntry() for a block beyond that time of deletion (at or later) would fail
// too. Until the data has been fully cleaned up (i.e. no refcount and past all stale-periods),
// calls to AppendEntry() with that entry's index will fail.
// too. (Note that DelEntry() will also discard any pending future versions of the entry).
//
// The per-entry APIs are and their actions are summarized below:
//
// o AppendEntry() adds a new version of an entry (could be a future version)
// o ModifyEntry() updates an existing version of an entry (could be a future version)
// o DelEntry() deletes and entry and make it invisible to GetEntry()
// o GetEntry() gets the latest (up to current) version of an entry, except if in stale-period
// o HasEntry() checks for existence of a specific version of an entry
// o FindEntry() gets the nearest-no-later version of an entry, including if in stale-period
// o FindEntry2() same as FindEntry(), and also returns the version (block) of the entry
// o ReadEntry() gets a specific entry version (stale or not)
// o PutEntry() drops reference counts taken by GetEntry()
//
Expand Down Expand Up @@ -130,9 +134,38 @@ import (
// count reaches 0 it marks them for deletion. The actual deletion takes place after
// a fixed number of epochs has passed.
//
// 5. When an entry (index) is deleted, a placeholder entry is a appended to mark the
// block at which deletion took place. This ensures that FindEntry() for the previous
// latest entry version would continue to work until that block and fail thereafter.
// 5. When an entry (index) is deleted, it is kept as a placeholder entry to mark the
// block at which deletion took place. This ensures that FindEntry() for that latest
// entry version would continue to work until that block and fail thereafter.
//
// State invariants:
//
// o Refcount: tracks the reference count of entry versions
// - extra refcount kept for latest and future entry versions
// - incremented for latest entry version with GetEntry()
// - decremented for specific entry version with PutEntry()
// - every PutEntry() must match a previous GetEntry()
// - PutEntry() can be used to remove a specific future entry
// - PutEntry() is illegal to call on last refcount of latest entry
//
// o IsLatest: tracks the entry version which is now the latest
// - transferred to the next (future) entry when it matures
// - removed from delete entries
//
// o DeleteAt: tracks the block upon which the entry will be deleted
// - held by the latest entry version smaller than or equal to that DeleteAt
// - prohibits future entry versions on or beyond the DeleteAt block
//
// o StaleAt: tracks when a removed entry version becomes invisible
// - set on non-future entries that reach refcount == 0
//
// Note that AppendEntry() may add retroactive entry only if it does not precede an
// existing latest entry. Also, AppendEntry() may not add future entries on or beyond
// a DeletedAt in the future (adding a future entry maturing on a future DeleteAt is
// permitted but useless, because it will be deleted straight away).
//
// Note also that DelEntry() discards all future entries on or beyond the DeletedAt
// block (and owing to the previous rule, none will be added until deletes occurs).

type FixationStore struct {
storeKey storetypes.StoreKey
Expand All @@ -155,10 +188,10 @@ func FixationVersion() uint64 {
// fire in order of expiry blocks, and in order of timeout kind.

const (
// NOTE: TimerFutureEntry should be smaller than timerDeleteEntry, to
// ensure that it fires first (because the latter will removes future entries,
// so otherwise if they both expire on the same block then the future entry
// callback will be surprised when it cannot find the respective entry).
// NOTE: timerFutureEntry should be smaller than timerDeleteEntry, to ensure
// that if both expire on the same block then future-entry timer fires before
// delete-entry timer, so that the latter would remove the brand newly matured
// future-entry.
timerFutureEntry = 0x01
timerDeleteEntry = 0x02
timerStaleEntry = 0x03
Expand Down Expand Up @@ -257,7 +290,8 @@ func (fs *FixationStore) AppendEntry(
var latest bool

// if latest entry is not found, this is a first version entry
if !found {
// if latest entry is found but is deleted, also treat like a first version entry
if !found || latestEntry.IsDeleted(ctx) {
fs.setEntryIndex(ctx, safeIndex, true)
if block <= ctxBlock {
latest = true
Expand All @@ -280,57 +314,68 @@ func (fs *FixationStore) AppendEntry(
// appending an entry, while can happen with a past block, must not succeed if it
// precedes en existing latest entry.

if block <= ctxBlock && !latestEntry.IsLatest {
if block < ctxBlock && !latestEntry.IsLatest {
return utils.LavaFormatWarning("AppendEntry",
fmt.Errorf("entry is older than existing latest entry"),
utils.Attribute{Key: "index", Value: index},
utils.Attribute{Key: "block", Value: block},
)
}

// temporary: do not allow adding new entries for an index that was deleted
// and still not fully cleaned up (e.g. not stale or with references held)
// if the new entry's block is equal to the nearest-no-later entry, overwrite that
// entry (only possible for latest and future entries, as the previous test would
// exclude earlier-than-latest entries).

if block == latestEntry.Block {
fs.ModifyEntry(ctx, index, block, entryData)
return nil
}

// we know the entry is not yet deleted (as tested above), so this is an attempt
// to append on or beyond a pending delete, which is not permitted.

if latestEntry.IsDeletedBy(block) {
return utils.LavaFormatWarning("AppendEntry",
fmt.Errorf("entry already deleted and pending cleanup"),
fmt.Errorf("entry disallows future version on or beyond pending delete"),
utils.Attribute{Key: "index", Value: index},
utils.Attribute{Key: "block", Value: block},
utils.Attribute{Key: "delete", Value: latestEntry.DeleteAt},
)
}

// if the new entry's block is equal to the latest entry, overwrite the latest entry
if block == latestEntry.Block {
fs.ModifyEntry(ctx, index, block, entryData)
return nil
}

// if the previous latest entry is marked with DeleteAt which is set to expire after
// this future entry's maturity (block), then transfer this DeleteAt to the future
// entry, and then replace the old timer with a new timer (below)
// (note: deletion, if any, cannot be for the current block, since it would have been
// processed at the beginning of the block, and AppendEntry would fail earlier).
// if the previous latest entry is marked with DeleteAt, and this future entry is set
// to mature strictly before that DeleteAt block (and specifically not at that block),
// then transfer this DeleteAt to the future entry, and then replace the old timer
// with a new timer (see below).

if latestEntry.HasDeleteAt() { // already know !latestEntry.IsDeletedBy(block)
if latestEntry.HasDeleteAt() {
// marked for delete that would take place after this future entry - transfer
// the marking to this future entry (which by now we know is later)
deleteAt = latestEntry.DeleteAt
latestEntry.DeleteAt = math.MaxUint64
fs.setEntry(ctx, latestEntry)
}
}

// if we are superseding a previous latest entry, then drop the latter's refcount;
// otherwise we are a future entry version, so set a timer for when it will become
// the new latest entry.

if block <= ctxBlock {
// the new entry is becoming the latest, put reference on the previous entry.
// (the latestEntry must be marked with IsLatest, because the test above would
// have returned an error otherwise.
latest = true
// if we are superseding a previous latest entry, then drop the latter's refcount (but
// not if the latest entry is deleted, since it lost its extra reference upon deletion);
// otherwise, if we are a future entry version, set a timer for when it will become the
// new latest entry.

if block <= ctxBlock {
// the new entry is becoming the latest now: mark as such
latest = true
if latestEntry.IsLatest {
// the previous entry was the latest and will no longer be so going forward, so
// drop the "latest" reference from that previous entry.
latestEntry.IsLatest = false
fs.putEntry(ctx, latestEntry)
} else {
// the new entry is not yet in effect, create a timer to put reference back to the latest once this is changed
key := encodeForTimer(safeIndex, block, timerFutureEntry)
fs.tstore.AddTimerByBlockHeight(ctx, block, key, []byte{})
fs.putEntry(ctx, latestEntry) // also saves updated latestEntry
}
} else {
// the new entry is not yet in effect, create a timer to put reference back to
// the latest once this is changed.
key := encodeForTimer(safeIndex, block, timerFutureEntry)
fs.tstore.AddTimerByBlockHeight(ctx, block, key, []byte{})
}

// marshal the new entry's data
Expand Down Expand Up @@ -417,7 +462,7 @@ func (fs *FixationStore) updateFutureEntry(ctx sdk.Context, safeIndex types.Safe
// because from now on it is no longer so.

latestEntry.IsLatest = false
fs.putEntry(ctx, latestEntry)
fs.putEntry(ctx, latestEntry) // also saves updated latestEntry
}

// we are now the latest entry - make it official by setting IsLatest
Expand All @@ -442,11 +487,13 @@ func (fs *FixationStore) deleteMarkedEntry(ctx sdk.Context, safeIndex types.Safe
)
}

entry.IsLatest = false

fs.setEntryIndex(ctx, safeIndex, false)
fs.putEntry(ctx, entry)

// no need to trim future entries: they were trimmed at the time of DelEntry,
// and since then new entries (beyond DeleteAt block) could not be appended.
// no need to trim future entries: they were trimmed at the time of DelEntry, and
// since then new entries (on or beyond DeleteAt block) could not be appended.
}

func (fs *FixationStore) deleteStaleEntries(ctx sdk.Context, safeIndex types.SafeIndex, _ uint64) {
Expand Down Expand Up @@ -526,7 +573,8 @@ func (fs *FixationStore) deleteStaleEntries(ctx sdk.Context, safeIndex types.Saf
}
}

// trimFutureEntries discards all future entries (relative to the current block)
// trimFutureEntries discards all future entries (relative to the DeleteAt of the
// entry version marked for deletion)
func (fs *FixationStore) trimFutureEntries(ctx sdk.Context, lastEntry types.Entry) {
store := fs.getEntryStore(ctx, lastEntry.SafeIndex())

Expand All @@ -540,7 +588,7 @@ func (fs *FixationStore) trimFutureEntries(ctx sdk.Context, lastEntry types.Entr
var entry types.Entry
fs.cdc.MustUnmarshal(iterator.Value(), &entry)

if entry.Block <= lastEntry.DeleteAt {
if entry.Block < lastEntry.DeleteAt {
break
}

Expand Down Expand Up @@ -803,7 +851,20 @@ func (fs *FixationStore) DelEntry(ctx sdk.Context, index string, block uint64) e
)
}

entry, found := fs.getUnmarshaledEntryForBlock(ctx, safeIndex, block)
// if this is a future delete, then we search for a strictly nearest-smaller
// previous entry (rather than nearest-no-later): if there exists an entry at
// that block, then we would trim it away (if we let it be, it would anyway be
// removed when DeleteAt is reached; and otherwise, it would make appending a
// new entry immediately after (and in same block) as the delete messy, since
// the new entry would need to overwrite a deleted previous entry (same block);
// thus it's simpler to not permit such future entry on the DeleteAt block.

deleteAt := block
if block > ctxBlock {
deleteAt--
}

entry, found := fs.getUnmarshaledEntryForBlock(ctx, safeIndex, deleteAt)
if !found || entry.HasDeleteAt() {
return legacyerrors.ErrNotFound
}
Expand All @@ -822,9 +883,9 @@ func (fs *FixationStore) DelEntry(ctx sdk.Context, index string, block uint64) e
fs.tstore.AddTimerByBlockHeight(ctx, block, key, []byte{})
}

// discard all future entries beyond the DeleteAt block, since they will never
// become the latest; and there will be no new entries beyond DeleteAt block
// as AppendEntry() does not allow such additions.
// discard all pending future entries on or beyond the DeleteAt block, since they
// will never become the latest; and there will be no new entries on or beyond
// DeleteAt block as AppendEntry() does not allow such additions.

fs.trimFutureEntries(ctx, entry)

Expand Down Expand Up @@ -883,7 +944,7 @@ func (fs *FixationStore) getEntryVersionsFilter(ctx sdk.Context, index string, b
}
}

// reverse the result slice to return the block in ascending order
// reverse the result slice to return the blocks in ascending order
length := len(blocks)
for i := 0; i < length/2; i++ {
blocks[i], blocks[length-i-1] = blocks[length-i-1], blocks[i]
Expand Down
Loading