Skip to content

Commit

Permalink
Merge pull request #405 from lavanet/CNS-368-fixation-store-fix-refcount
Browse files Browse the repository at this point in the history
CNS-368: fixation store fix refcount and deletions
  • Loading branch information
Yaroms authored Apr 19, 2023
2 parents c021275 + 4616ae1 commit dbea6bf
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 50 deletions.
125 changes: 85 additions & 40 deletions common/fixation_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
// - GetEntry(index, *entry): get a copy (and reference) of the latest version of an entry
// - PutEntry(index, block): drop a reference of a version of an entry
// - [TBD] RemoveEntry(index): mark an entry as unavailable for new GetEntry() calls
// - GetAllEntryIndex(): get all the entries indices (without versions)
// - GetAllEntryIndices(): get all the entries indices (without versions)
// - GetAllEntryVersions(index): get all the versions of an entry (for testing)
// - AdvanceBlock(): notify of block progress (e.g. BeginBlock) for garbage collection
//
// Entry names (index) must contain only visible ascii characters (ascii values 32-125).
// The ascii 'DEL' invisible character is used internally to terminate the index values
Expand Down Expand Up @@ -69,6 +71,7 @@ type FixationStore struct {
storeKey sdk.StoreKey
cdc codec.BinaryCodec
prefix string
tstore TimerStore
}

const (
Expand Down Expand Up @@ -136,7 +139,7 @@ func (fs *FixationStore) AppendEntry(
fs.setEntryIndex(ctx, safeIndex)
} else {
// make sure the new entry's block is not smaller than the latest entry's block
if block < latestEntry.GetBlock() {
if block < latestEntry.Block {
details := map[string]string{
"latestBlock": strconv.FormatUint(latestEntry.GetBlock(), 10),
"block": strconv.FormatUint(block, 10),
Expand All @@ -147,17 +150,11 @@ func (fs *FixationStore) AppendEntry(
}

// if the new entry's block is equal to the latest entry, overwrite the latest entry
if block == latestEntry.GetBlock() {
if block == latestEntry.Block {
return fs.ModifyEntry(ctx, index, block, entryData)
}

// if the old latest entry has refcount of 0, then update its "stale_at" time
// TODO: remove this when the latest entry gets its own refcount.
if latestEntry.Refcount == 0 {
// never overflows because because ctx.BlockHeight is int64
latestEntry.StaleAt = uint64(ctx.BlockHeight()) + uint64(types.STALE_ENTRY_TIME)
fs.setEntry(ctx, latestEntry)
}
fs.putEntry(ctx, latestEntry)
}

// marshal the new entry's data
Expand All @@ -169,42 +166,72 @@ func (fs *FixationStore) AppendEntry(
Block: block,
StaleAt: math.MaxUint64,
Data: marshaledEntryData,
Refcount: 0,
Refcount: 1,
}

fs.setEntry(ctx, entry)

fs.deleteStaleEntries(ctx, safeIndex)

return nil
}

func (fs *FixationStore) deleteStaleEntries(ctx sdk.Context, safeIndex string) {
fs.assertSanitizedIndex(safeIndex)
store := fs.getStore(ctx, safeIndex)

iterator := sdk.KVStorePrefixIterator(store, []byte{})
defer iterator.Close()

for iterator.Valid() {
// umarshal the old entry version
var oldEntry types.Entry
fs.cdc.MustUnmarshal(iterator.Value(), &oldEntry)
// "stale" entry versions are ones that reached refcount zero at least
// STALE_TIME blocks ago; they are not visible in lookups, hence may be
// discarded. specifically, a stale entry version becomes "elgibile for
// removal" , if either it is:
// one that follows a stale entry version, -OR-
// the oldest entry version
// rationale: entries are generally valid from their block time until
// the block time of the following newer entry. this newer entry marks
// the end of the previous entry, and hence may not be removed until
// that previous entry gets discarded. keeping the stale entry versions
// ensures (future FindEntry) that blocks from that entry onward are
// stale (otherwise, a lookup might resolve successfully with an older
// non-stale entry version). For this, one - the oldest - marker is
// enough, and additional younger markers can be discarded.
// for example, consider this situation with versions A through E:
// A(stale), B, C(stale), D(stale), E
// in this case, A can be discarded because it is the oldest. C cannot
// be discarded because it marks that new blocks are stale (while older
// blocks between B and C map to B). D is unneeded as marker because C
// is already there, and can be discarded too.

var removals []uint64
safeToDeleteEntry := true // if oldest, or if previous entry was stale
safeToDeleteIndex := true // if non of the entry versions was skipped

iterator.Next()
for ; iterator.Valid(); iterator.Next() {
// umarshal the old entry version
var entry types.Entry
fs.cdc.MustUnmarshal(iterator.Value(), &entry)

// skipping removal of latest version
if !iterator.Valid() {
break
if !entry.IsStale(ctx) {
safeToDeleteEntry = false
safeToDeleteIndex = false
continue
}

// delete stale entries (if they are at the end of the list)
if oldEntry.IsStale(ctx) {
fs.removeEntry(ctx, oldEntry.GetIndex(), oldEntry.GetBlock())
} else {
// avoid removal of entries in the middle of the list, because it would
// break future lookup that may involve that (stale) entry.
break
if !safeToDeleteEntry {
safeToDeleteEntry = true
safeToDeleteIndex = false
continue
}

removals = append(removals, entry.Block)
}

for _, block := range removals {
fs.removeEntry(ctx, safeIndex, block)
}

if safeToDeleteIndex {
// non was skipped - so all were removed: delete the entry index
fs.removeEntryIndex(ctx, safeIndex)
}
}

Expand Down Expand Up @@ -256,7 +283,7 @@ func (fs *FixationStore) getUnmarshaledEntryForBlock(ctx sdk.Context, safeIndex
var entry types.Entry
fs.cdc.MustUnmarshal(iterator.Value(), &entry)

if entry.GetBlock() <= block {
if entry.Block <= block {
if entry.IsStale(ctx) {
break
}
Expand Down Expand Up @@ -308,6 +335,23 @@ func (fs *FixationStore) GetEntry(ctx sdk.Context, index string, entryData codec
return true
}

// putEntry decrements the refcount of an entry and marks for staleness if needed
func (fs *FixationStore) putEntry(ctx sdk.Context, entry types.Entry) {
if entry.Refcount == 0 {
panic("Fixation: prefix " + fs.prefix + ": negative refcount safeIndex: " + entry.Index)
}

entry.Refcount -= 1

if entry.Refcount == 0 {
// never overflows because ctx.BlockHeight is int64
entry.StaleAt = uint64(ctx.BlockHeight()) + uint64(types.STALE_ENTRY_TIME)
fs.tstore.AddTimerByBlockHeight(ctx, entry.StaleAt, entry.Index)
}

fs.setEntry(ctx, entry)
}

// PutEntry finds the entry by index and block and decrements the refcount
func (fs *FixationStore) PutEntry(ctx sdk.Context, index string, block uint64) {
safeIndex, err := sanitizeIndex(index)
Expand All @@ -320,18 +364,12 @@ func (fs *FixationStore) PutEntry(ctx sdk.Context, index string, block uint64) {
panic("PutEntry with unknown index: " + index)
}

if entry.Refcount == 0 {
panic("PutEntry with refcount zero index: " + index)
if entry.Block != block {
panic("PutEntry with block mismatch index: " + index +
" got " + strconv.Itoa(int(entry.Block)) + " expected " + strconv.Itoa(int(block)))
}

entry.Refcount -= 1

if entry.Refcount == 0 {
// never overflows because because ctx.BlockHeight is int64
entry.StaleAt = uint64(ctx.BlockHeight()) + uint64(types.STALE_ENTRY_TIME) - 1
}

fs.setEntry(ctx, entry)
fs.putEntry(ctx, entry)
}

// removeEntry removes an entry from the store
Expand All @@ -344,8 +382,15 @@ func (fs *FixationStore) createStoreKey(index string) string {
return types.EntryKey + fs.prefix + index
}

func (fs *FixationStore) AdvanceBlock(ctx sdk.Context) {
fs.tstore.Tick(ctx)
}

// NewFixationStore returns a new FixationStore object
func NewFixationStore(storeKey sdk.StoreKey, cdc codec.BinaryCodec, prefix string) *FixationStore {
fs := FixationStore{storeKey: storeKey, cdc: cdc, prefix: prefix}
callback := func(ctx sdk.Context, data string) { fs.deleteStaleEntries(ctx, data) }
tstore := NewTimerStore(storeKey, cdc, prefix).WithCallbackByBlockHeight(callback)
fs.tstore = *tstore
return &fs
}
30 changes: 29 additions & 1 deletion common/fixation_entry_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/cosmos/cosmos-sdk/store/prefix"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/common/types"
"github.com/lavanet/lava/utils"
)

// FixationStore manages lists of entries with versions in the store.
Expand Down Expand Up @@ -58,11 +59,38 @@ func (fs FixationStore) GetAllEntryIndicesWithPrefix(ctx sdk.Context, prefix str
return indexList
}

// GetAllEntryIndex returns all Entry indices
// GetAllEntryIndices returns all Entry indices
func (fs FixationStore) GetAllEntryIndices(ctx sdk.Context) []string {
return fs.GetAllEntryIndicesWithPrefix(ctx, "")
}

// GetAllEntryVersions returns a list of all versions (blocks) of an entry.
// If stale == true, then the output will include stale versions (for testing).
func (fs *FixationStore) GetAllEntryVersions(ctx sdk.Context, index string, stale bool) (blocks []uint64) {
safeIndex, err := sanitizeIndex(index)
if err != nil {
details := map[string]string{"index": index}
utils.LavaError(ctx, ctx.Logger(), "GetAllEntryVersions", details, "invalid non-ascii entry")
return nil
}

store := fs.getStore(ctx, safeIndex)

iterator := sdk.KVStorePrefixIterator(store, []byte{})
defer iterator.Close()

for ; iterator.Valid(); iterator.Next() {
var entry types.Entry
fs.cdc.MustUnmarshal(iterator.Value(), &entry)
if !stale && entry.IsStale(ctx) {
continue
}
blocks = append(blocks, entry.Block)
}

return blocks
}

func (fs FixationStore) createEntryIndexStoreKey() string {
return types.EntryIndexKey + fs.prefix
}
Expand Down
Loading

0 comments on commit dbea6bf

Please sign in to comment.