Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNS-368: fixation store fix refcount and deletions #405

Merged
merged 8 commits into from
Apr 19, 2023
125 changes: 85 additions & 40 deletions common/fixation_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
// - GetEntry(index, *entry): get a copy (and reference) of the latest version of an entry
// - PutEntry(index, block): drop a reference of a version of an entry
// - [TBD] RemoveEntry(index): mark an entry as unavailable for new GetEntry() calls
// - GetAllEntryIndex(): get all the entries indices (without versions)
// - GetAllEntryIndices(): get all the entries indices (without versions)
orenl-lava marked this conversation as resolved.
Show resolved Hide resolved
// - GetAllEntryVersions(index): get all the versions of an entry (for testing)
// - AdvanceBlock(): notify of block progress (e.g. BeginBlock) for garbage collection
//
// Entry names (index) must contain only visible ascii characters (ascii values 32-125).
// The ascii 'DEL' invisible character is used internally to terminate the index values
Expand Down Expand Up @@ -69,6 +71,7 @@ type FixationStore struct {
storeKey sdk.StoreKey
cdc codec.BinaryCodec
prefix string
tstore TimerStore
orenl-lava marked this conversation as resolved.
Show resolved Hide resolved
}

const (
Expand Down Expand Up @@ -136,7 +139,7 @@ func (fs *FixationStore) AppendEntry(
fs.setEntryIndex(ctx, safeIndex)
} else {
// make sure the new entry's block is not smaller than the latest entry's block
if block < latestEntry.GetBlock() {
if block < latestEntry.Block {
details := map[string]string{
"latestBlock": strconv.FormatUint(latestEntry.GetBlock(), 10),
"block": strconv.FormatUint(block, 10),
Expand All @@ -147,17 +150,11 @@ func (fs *FixationStore) AppendEntry(
}

// if the new entry's block is equal to the latest entry, overwrite the latest entry
if block == latestEntry.GetBlock() {
if block == latestEntry.Block {
return fs.ModifyEntry(ctx, index, block, entryData)
}

// if the old latest entry has refcount of 0, then update its "stale_at" time
// TODO: remove this when the latest entry gets its own refcount.
if latestEntry.Refcount == 0 {
// never overflows because because ctx.BlockHeight is int64
latestEntry.StaleAt = uint64(ctx.BlockHeight()) + uint64(types.STALE_ENTRY_TIME)
fs.setEntry(ctx, latestEntry)
}
fs.putEntry(ctx, latestEntry)
}

// marshal the new entry's data
Expand All @@ -169,42 +166,72 @@ func (fs *FixationStore) AppendEntry(
Block: block,
StaleAt: math.MaxUint64,
Data: marshaledEntryData,
Refcount: 0,
Refcount: 1,
}

fs.setEntry(ctx, entry)

fs.deleteStaleEntries(ctx, safeIndex)

return nil
}

func (fs *FixationStore) deleteStaleEntries(ctx sdk.Context, safeIndex string) {
fs.assertSanitizedIndex(safeIndex)
store := fs.getStore(ctx, safeIndex)

iterator := sdk.KVStorePrefixIterator(store, []byte{})
defer iterator.Close()

for iterator.Valid() {
// umarshal the old entry version
var oldEntry types.Entry
fs.cdc.MustUnmarshal(iterator.Value(), &oldEntry)
// "stale" entry versions are ones that reached refcount zero at least
// STALE_TIME blocks ago; they are not visible in lookups, hence may be
// discarded. specifically, a stale entry version becomes "elgibile for
// removal" , if either it is:
// one that follows a stale entry version, -OR-
// the oldest entry version
// rationale: entries are generally valid from their block time until
// the block time of the following newer entry. this newer entry marks
// the end of the previous entry, and hence may not be removed until
// that previous entry gets discarded. keeping the stale entry versions
// ensures (future FindEntry) that blocks from that entry onward are
// stale (otherwise, a lookup might resolve successfully with an older
// non-stale entry version). For this, one - the oldest - marker is
orenl-lava marked this conversation as resolved.
Show resolved Hide resolved
// enough, and additional younger markers can be discarded.
// for example, consider this situation with versions A through E:
// A(stale), B, C(stale), D(stale), E
// in this case, A can be discarded because it is the oldest. C cannot
// be discarded because it marks that new blocks are stale (while older
// blocks between B and C map to B). D is unneeded as marker because C
// is already there, and can be discarded too.

var removals []uint64
safeToDeleteEntry := true // if oldest, or if previous entry was stale
safeToDeleteIndex := true // if non of the entry versions was skipped

iterator.Next()
for ; iterator.Valid(); iterator.Next() {
// umarshal the old entry version
var entry types.Entry
fs.cdc.MustUnmarshal(iterator.Value(), &entry)

// skipping removal of latest version
if !iterator.Valid() {
break
if !entry.IsStale(ctx) {
safeToDeleteEntry = false
safeToDeleteIndex = false
continue
}

// delete stale entries (if they are at the end of the list)
if oldEntry.IsStale(ctx) {
fs.removeEntry(ctx, oldEntry.GetIndex(), oldEntry.GetBlock())
} else {
// avoid removal of entries in the middle of the list, because it would
// break future lookup that may involve that (stale) entry.
break
if !safeToDeleteEntry {
safeToDeleteEntry = true
safeToDeleteIndex = false
continue
}

removals = append(removals, entry.Block)
}

for _, block := range removals {
fs.removeEntry(ctx, safeIndex, block)
}

if safeToDeleteIndex {
// non was skipped - so all were removed: delete the entry index
fs.removeEntryIndex(ctx, safeIndex)
}
}

Expand Down Expand Up @@ -256,7 +283,7 @@ func (fs *FixationStore) getUnmarshaledEntryForBlock(ctx sdk.Context, safeIndex
var entry types.Entry
fs.cdc.MustUnmarshal(iterator.Value(), &entry)

if entry.GetBlock() <= block {
if entry.Block <= block {
orenl-lava marked this conversation as resolved.
Show resolved Hide resolved
if entry.IsStale(ctx) {
break
}
Expand Down Expand Up @@ -308,6 +335,23 @@ func (fs *FixationStore) GetEntry(ctx sdk.Context, index string, entryData codec
return true
}

// putEntry decrements the refcount of an entry and marks for staleness if needed
func (fs *FixationStore) putEntry(ctx sdk.Context, entry types.Entry) {
if entry.Refcount == 0 {
panic("Fixation: prefix " + fs.prefix + ": negative refcount safeIndex: " + entry.Index)
}

entry.Refcount -= 1

if entry.Refcount == 0 {
// never overflows because ctx.BlockHeight is int64
entry.StaleAt = uint64(ctx.BlockHeight()) + uint64(types.STALE_ENTRY_TIME)
fs.tstore.AddTimerByBlockHeight(ctx, entry.StaleAt, entry.Index)
}

fs.setEntry(ctx, entry)
}

// PutEntry finds the entry by index and block and decrements the refcount
func (fs *FixationStore) PutEntry(ctx sdk.Context, index string, block uint64) {
safeIndex, err := sanitizeIndex(index)
Expand All @@ -320,18 +364,12 @@ func (fs *FixationStore) PutEntry(ctx sdk.Context, index string, block uint64) {
panic("PutEntry with unknown index: " + index)
}

if entry.Refcount == 0 {
panic("PutEntry with refcount zero index: " + index)
if entry.Block != block {
panic("PutEntry with block mismatch index: " + index +
" got " + strconv.Itoa(int(entry.Block)) + " expected " + strconv.Itoa(int(block)))
}

entry.Refcount -= 1

if entry.Refcount == 0 {
// never overflows because because ctx.BlockHeight is int64
entry.StaleAt = uint64(ctx.BlockHeight()) + uint64(types.STALE_ENTRY_TIME) - 1
}

fs.setEntry(ctx, entry)
fs.putEntry(ctx, entry)
orenl-lava marked this conversation as resolved.
Show resolved Hide resolved
}

// removeEntry removes an entry from the store
Expand All @@ -344,8 +382,15 @@ func (fs *FixationStore) createStoreKey(index string) string {
return types.EntryKey + fs.prefix + index
}

func (fs *FixationStore) AdvanceBlock(ctx sdk.Context) {
fs.tstore.Tick(ctx)
}

// NewFixationStore returns a new FixationStore object
func NewFixationStore(storeKey sdk.StoreKey, cdc codec.BinaryCodec, prefix string) *FixationStore {
fs := FixationStore{storeKey: storeKey, cdc: cdc, prefix: prefix}
callback := func(ctx sdk.Context, data string) { fs.deleteStaleEntries(ctx, data) }
tstore := NewTimerStore(storeKey, cdc, prefix).WithCallbackByBlockHeight(callback)
fs.tstore = *tstore
orenl-lava marked this conversation as resolved.
Show resolved Hide resolved
return &fs
}
30 changes: 29 additions & 1 deletion common/fixation_entry_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/cosmos/cosmos-sdk/store/prefix"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/common/types"
"github.com/lavanet/lava/utils"
)

// FixationStore manages lists of entries with versions in the store.
Expand Down Expand Up @@ -58,11 +59,38 @@ func (fs FixationStore) GetAllEntryIndicesWithPrefix(ctx sdk.Context, prefix str
return indexList
}

// GetAllEntryIndex returns all Entry indices
// GetAllEntryIndices returns all Entry indices
func (fs FixationStore) GetAllEntryIndices(ctx sdk.Context) []string {
return fs.GetAllEntryIndicesWithPrefix(ctx, "")
}

// GetAllEntryVersions returns a list of all versions (blocks) of an entry.
// If stale == true, then the output will include stale versions (for testing).
func (fs *FixationStore) GetAllEntryVersions(ctx sdk.Context, index string, stale bool) (blocks []uint64) {
safeIndex, err := sanitizeIndex(index)
if err != nil {
details := map[string]string{"index": index}
utils.LavaError(ctx, ctx.Logger(), "GetAllEntryVersions", details, "invalid non-ascii entry")
return nil
}

store := fs.getStore(ctx, safeIndex)

iterator := sdk.KVStorePrefixIterator(store, []byte{})
defer iterator.Close()

for ; iterator.Valid(); iterator.Next() {
var entry types.Entry
fs.cdc.MustUnmarshal(iterator.Value(), &entry)
if !stale && entry.IsStale(ctx) {
continue
}
blocks = append(blocks, entry.Block)
}

return blocks
}

func (fs FixationStore) createEntryIndexStoreKey() string {
return types.EntryIndexKey + fs.prefix
}
Expand Down
Loading