diff --git a/common/fixation_entry.go b/common/fixation_entry.go index 29b1a2fddd..d71e6b20be 100644 --- a/common/fixation_entry.go +++ b/common/fixation_entry.go @@ -26,7 +26,9 @@ import ( // - GetEntry(index, *entry): get a copy (and reference) of the latest version of an entry // - PutEntry(index, block): drop a reference of a version of an entry // - [TBD] RemoveEntry(index): mark an entry as unavailable for new GetEntry() calls -// - GetAllEntryIndex(): get all the entries indices (without versions) +// - GetAllEntryIndices(): get all the entries indices (without versions) +// - GetAllEntryVersions(index): get all the versions of an entry (for testing) +// - AdvanceBlock(): notify of block progress (e.g. BeginBlock) for garbage collection // // Entry names (index) must contain only visible ascii characters (ascii values 32-125). // The ascii 'DEL' invisible character is used internally to terminate the index values @@ -69,6 +71,7 @@ type FixationStore struct { storeKey sdk.StoreKey cdc codec.BinaryCodec prefix string + tstore TimerStore } const ( @@ -136,7 +139,7 @@ func (fs *FixationStore) AppendEntry( fs.setEntryIndex(ctx, safeIndex) } else { // make sure the new entry's block is not smaller than the latest entry's block - if block < latestEntry.GetBlock() { + if block < latestEntry.Block { details := map[string]string{ "latestBlock": strconv.FormatUint(latestEntry.GetBlock(), 10), "block": strconv.FormatUint(block, 10), @@ -147,17 +150,11 @@ func (fs *FixationStore) AppendEntry( } // if the new entry's block is equal to the latest entry, overwrite the latest entry - if block == latestEntry.GetBlock() { + if block == latestEntry.Block { return fs.ModifyEntry(ctx, index, block, entryData) } - // if the old latest entry has refcount of 0, then update its "stale_at" time - // TODO: remove this when the latest entry gets its own refcount. - if latestEntry.Refcount == 0 { - // never overflows because because ctx.BlockHeight is int64 - latestEntry.StaleAt = uint64(ctx.BlockHeight()) + uint64(types.STALE_ENTRY_TIME) - fs.setEntry(ctx, latestEntry) - } + fs.putEntry(ctx, latestEntry) } // marshal the new entry's data @@ -169,42 +166,72 @@ func (fs *FixationStore) AppendEntry( Block: block, StaleAt: math.MaxUint64, Data: marshaledEntryData, - Refcount: 0, + Refcount: 1, } fs.setEntry(ctx, entry) - - fs.deleteStaleEntries(ctx, safeIndex) - return nil } func (fs *FixationStore) deleteStaleEntries(ctx sdk.Context, safeIndex string) { fs.assertSanitizedIndex(safeIndex) store := fs.getStore(ctx, safeIndex) + iterator := sdk.KVStorePrefixIterator(store, []byte{}) defer iterator.Close() - for iterator.Valid() { - // umarshal the old entry version - var oldEntry types.Entry - fs.cdc.MustUnmarshal(iterator.Value(), &oldEntry) + // "stale" entry versions are ones that reached refcount zero at least + // STALE_TIME blocks ago; they are not visible in lookups, hence may be + // discarded. specifically, a stale entry version becomes "elgibile for + // removal" , if either it is: + // one that follows a stale entry version, -OR- + // the oldest entry version + // rationale: entries are generally valid from their block time until + // the block time of the following newer entry. this newer entry marks + // the end of the previous entry, and hence may not be removed until + // that previous entry gets discarded. keeping the stale entry versions + // ensures (future FindEntry) that blocks from that entry onward are + // stale (otherwise, a lookup might resolve successfully with an older + // non-stale entry version). For this, one - the oldest - marker is + // enough, and additional younger markers can be discarded. + // for example, consider this situation with versions A through E: + // A(stale), B, C(stale), D(stale), E + // in this case, A can be discarded because it is the oldest. C cannot + // be discarded because it marks that new blocks are stale (while older + // blocks between B and C map to B). D is unneeded as marker because C + // is already there, and can be discarded too. + + var removals []uint64 + safeToDeleteEntry := true // if oldest, or if previous entry was stale + safeToDeleteIndex := true // if non of the entry versions was skipped - iterator.Next() + for ; iterator.Valid(); iterator.Next() { + // umarshal the old entry version + var entry types.Entry + fs.cdc.MustUnmarshal(iterator.Value(), &entry) - // skipping removal of latest version - if !iterator.Valid() { - break + if !entry.IsStale(ctx) { + safeToDeleteEntry = false + safeToDeleteIndex = false + continue } - // delete stale entries (if they are at the end of the list) - if oldEntry.IsStale(ctx) { - fs.removeEntry(ctx, oldEntry.GetIndex(), oldEntry.GetBlock()) - } else { - // avoid removal of entries in the middle of the list, because it would - // break future lookup that may involve that (stale) entry. - break + if !safeToDeleteEntry { + safeToDeleteEntry = true + safeToDeleteIndex = false + continue } + + removals = append(removals, entry.Block) + } + + for _, block := range removals { + fs.removeEntry(ctx, safeIndex, block) + } + + if safeToDeleteIndex { + // non was skipped - so all were removed: delete the entry index + fs.removeEntryIndex(ctx, safeIndex) } } @@ -256,7 +283,7 @@ func (fs *FixationStore) getUnmarshaledEntryForBlock(ctx sdk.Context, safeIndex var entry types.Entry fs.cdc.MustUnmarshal(iterator.Value(), &entry) - if entry.GetBlock() <= block { + if entry.Block <= block { if entry.IsStale(ctx) { break } @@ -308,6 +335,23 @@ func (fs *FixationStore) GetEntry(ctx sdk.Context, index string, entryData codec return true } +// putEntry decrements the refcount of an entry and marks for staleness if needed +func (fs *FixationStore) putEntry(ctx sdk.Context, entry types.Entry) { + if entry.Refcount == 0 { + panic("Fixation: prefix " + fs.prefix + ": negative refcount safeIndex: " + entry.Index) + } + + entry.Refcount -= 1 + + if entry.Refcount == 0 { + // never overflows because ctx.BlockHeight is int64 + entry.StaleAt = uint64(ctx.BlockHeight()) + uint64(types.STALE_ENTRY_TIME) + fs.tstore.AddTimerByBlockHeight(ctx, entry.StaleAt, entry.Index) + } + + fs.setEntry(ctx, entry) +} + // PutEntry finds the entry by index and block and decrements the refcount func (fs *FixationStore) PutEntry(ctx sdk.Context, index string, block uint64) { safeIndex, err := sanitizeIndex(index) @@ -320,18 +364,12 @@ func (fs *FixationStore) PutEntry(ctx sdk.Context, index string, block uint64) { panic("PutEntry with unknown index: " + index) } - if entry.Refcount == 0 { - panic("PutEntry with refcount zero index: " + index) + if entry.Block != block { + panic("PutEntry with block mismatch index: " + index + + " got " + strconv.Itoa(int(entry.Block)) + " expected " + strconv.Itoa(int(block))) } - entry.Refcount -= 1 - - if entry.Refcount == 0 { - // never overflows because because ctx.BlockHeight is int64 - entry.StaleAt = uint64(ctx.BlockHeight()) + uint64(types.STALE_ENTRY_TIME) - 1 - } - - fs.setEntry(ctx, entry) + fs.putEntry(ctx, entry) } // removeEntry removes an entry from the store @@ -344,8 +382,15 @@ func (fs *FixationStore) createStoreKey(index string) string { return types.EntryKey + fs.prefix + index } +func (fs *FixationStore) AdvanceBlock(ctx sdk.Context) { + fs.tstore.Tick(ctx) +} + // NewFixationStore returns a new FixationStore object func NewFixationStore(storeKey sdk.StoreKey, cdc codec.BinaryCodec, prefix string) *FixationStore { fs := FixationStore{storeKey: storeKey, cdc: cdc, prefix: prefix} + callback := func(ctx sdk.Context, data string) { fs.deleteStaleEntries(ctx, data) } + tstore := NewTimerStore(storeKey, cdc, prefix).WithCallbackByBlockHeight(callback) + fs.tstore = *tstore return &fs } diff --git a/common/fixation_entry_index.go b/common/fixation_entry_index.go index bd3f1c7281..ed84ca16e3 100644 --- a/common/fixation_entry_index.go +++ b/common/fixation_entry_index.go @@ -4,6 +4,7 @@ import ( "github.com/cosmos/cosmos-sdk/store/prefix" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/lavanet/lava/common/types" + "github.com/lavanet/lava/utils" ) // FixationStore manages lists of entries with versions in the store. @@ -58,11 +59,38 @@ func (fs FixationStore) GetAllEntryIndicesWithPrefix(ctx sdk.Context, prefix str return indexList } -// GetAllEntryIndex returns all Entry indices +// GetAllEntryIndices returns all Entry indices func (fs FixationStore) GetAllEntryIndices(ctx sdk.Context) []string { return fs.GetAllEntryIndicesWithPrefix(ctx, "") } +// GetAllEntryVersions returns a list of all versions (blocks) of an entry. +// If stale == true, then the output will include stale versions (for testing). +func (fs *FixationStore) GetAllEntryVersions(ctx sdk.Context, index string, stale bool) (blocks []uint64) { + safeIndex, err := sanitizeIndex(index) + if err != nil { + details := map[string]string{"index": index} + utils.LavaError(ctx, ctx.Logger(), "GetAllEntryVersions", details, "invalid non-ascii entry") + return nil + } + + store := fs.getStore(ctx, safeIndex) + + iterator := sdk.KVStorePrefixIterator(store, []byte{}) + defer iterator.Close() + + for ; iterator.Valid(); iterator.Next() { + var entry types.Entry + fs.cdc.MustUnmarshal(iterator.Value(), &entry) + if !stale && entry.IsStale(ctx) { + continue + } + blocks = append(blocks, entry.Block) + } + + return blocks +} + func (fs FixationStore) createEntryIndexStoreKey() string { return types.EntryIndexKey + fs.prefix } diff --git a/common/fixation_entry_test.go b/common/fixation_entry_test.go index 08229f8378..fbe1f0d938 100644 --- a/common/fixation_entry_test.go +++ b/common/fixation_entry_test.go @@ -65,21 +65,21 @@ func testWithFixationTemplate(t *testing.T, playbook []fixationTemplate, countOb if block > uint64(ctx.BlockHeight()) { ctx = ctx.WithBlockHeight(int64(block)) } - err := fs[play.store].AppendEntry(ctx, play.index, block, &coins[play.coin]) + err := fs[play.store].AppendEntry(ctx, index, block, &coins[play.coin]) if !play.fail { require.Nil(t, err, what) } else { require.NotNil(t, err, what) } case "modify": - err := fs[play.store].ModifyEntry(ctx, play.index, block, &coins[play.coin]) + err := fs[play.store].ModifyEntry(ctx, index, block, &coins[play.coin]) if !play.fail { require.Nil(t, err, what) } else { require.NotNil(t, err, what) } case "find": - found := fs[play.store].FindEntry(ctx, play.index, block, &dummy) + found := fs[play.store].FindEntry(ctx, index, block, &dummy) if !play.fail { require.True(t, found, what) require.Equal(t, dummy, coins[play.coin], what) @@ -87,7 +87,7 @@ func testWithFixationTemplate(t *testing.T, playbook []fixationTemplate, countOb require.False(t, found, what) } case "get": - found := fs[play.store].GetEntry(ctx, play.index, &dummy) + found := fs[play.store].GetEntry(ctx, index, &dummy) if !play.fail { require.True(t, found, what) require.Equal(t, dummy, coins[play.coin], what) @@ -95,12 +95,16 @@ func testWithFixationTemplate(t *testing.T, playbook []fixationTemplate, countOb require.False(t, found, what) } case "put": - fs[play.store].PutEntry(ctx, play.index, block) + fs[play.store].PutEntry(ctx, index, block) case "block": ctx = ctx.WithBlockHeight(ctx.BlockHeight() + play.count) + fs[play.store].AdvanceBlock(ctx) case "getall": indexList := fs[play.store].GetAllEntryIndices(ctx) require.Equal(t, int(play.count), len(indexList), what) + case "getvers": + indexList := fs[play.store].GetAllEntryVersions(ctx, index, true) + require.Equal(t, int(play.count), len(indexList), what) case "getallprefix": indexList := fs[play.store].GetAllEntryIndicesWithPrefix(ctx, index) require.Equal(t, int(play.count), len(indexList), what) @@ -247,6 +251,20 @@ func TestGetAndPutEntry(t *testing.T) { testWithFixationTemplate(t, playbook, 3, 1) } +func TestDoublePutEntry(t *testing.T) { + block0 := int64(10) + block1 := block0 + types.STALE_ENTRY_TIME+1 + + playbook := []fixationTemplate{ + { op: "append", name: "entry #1 version 0", count: block0, coin: 0 }, + { op: "append", name: "entry #1 version 1", count: block1, coin: 0 }, + // entry #1 with block zero now has refcount = zero + { op: "put", name: "negative refcount entry #1 version 0", count: block0, fail: false }, + } + + require.Panics(t, func() { testWithFixationTemplate(t, playbook, 3, 1) }) +} + func TestDeleteTwoEntries(t *testing.T) { block0 := int64(10) block1 := block0 + int64(10) @@ -266,6 +284,69 @@ func TestDeleteTwoEntries(t *testing.T) { testWithFixationTemplate(t, playbook, 4, 1) } +func TestRemoveStaleEntries(t *testing.T) { + block0 := int64(10) + block1 := block0 + int64(10) + block2 := block1 + int64(10) + block3 := block2 + int64(10) + block4 := block3 + int64(10) + block5 := int64(100) + block6 := block5 + types.STALE_ENTRY_TIME + block7 := block6 + types.STALE_ENTRY_TIME/2 + block8 := block7 + types.STALE_ENTRY_TIME/2 + 1 + block9 := block8 + types.STALE_ENTRY_TIME/2 + 2 + + playbook := []fixationTemplate{ + { op: "append", name: "entry #1", count: block0, coin: 0 }, + { op: "get", name: "refcount entry #1", coin: 0 }, + { op: "append", name: "entry #2", count: block1, coin: 1 }, + { op: "get", name: "refcount entry #2", coin: 1 }, + { op: "append", name: "entry #3", count: block2, coin: 2 }, + { op: "get", name: "refcount entry #3", coin: 2 }, + { op: "append", name: "entry #4", count: block3, coin: 3 }, + { op: "get", name: "refcount entry #4", coin: 3 }, + { op: "append", name: "entry #5", count: block4, coin: 4 }, + { op: "get", name: "refcount entry #5", coin: 4 }, + // release an entry + { op: "block", name: "advance a bit", count: block5-block4 }, + { op: "put", name: "refcount entry #1", count: block0 }, + { op: "block", name: "wait entry #1 staled", count: block6-block5 }, + // expect 5 entry versions left + { op: "getvers", name: "to check 5 versions left", count: 4 }, + // release more entries + { op: "put", name: "refcount entry #4", count: block3 }, + { op: "block", name: "wait entry #1 half staled", count: block7-block6 }, + { op: "put", name: "refcount entry #3", count: block2 }, + { op: "block", name: "wait another #1 half staled", count: block8-block7 }, + // entry #4 is stale but un-removable because entry #3 sill alive + { op: "getvers", name: "to check 4 versions remain", count: 4 }, + { op: "block", name: "wait another #1 half staled", count: block9-block8 }, + // entry #3 became stale, so (stale) entry #4 was removed + { op: "getvers", name: "to check 3 versions remain", count: 3 }, + } + + testWithFixationTemplate(t, playbook, 5, 1) +} + +func TestRemoveLastEntry(t *testing.T) { + block0 := int64(10) + block1 := block0 + int64(10) + block2 := block1 + types.STALE_ENTRY_TIME + + playbook := []fixationTemplate{ + { op: "append", name: "entry #1", count: block0, coin: 0 }, + { op: "block", name: "advance a bit", count: block1-block0 }, + { op: "put", name: "refcount entry #1", count: block0 }, + // expect 1 (stale) entry versions left + { op: "getvers", name: "to check 1 versions left", count: 1 }, + { op: "block", name: "wait for entry #1 stale", count: block2-block1 }, + // expect entry #1 gone now + { op: "find", name: "try to find entry #1", count: block0, coin: 0, fail: true}, + } + + testWithFixationTemplate(t, playbook, 1, 1) +} + // Test that the appended entries are sorted (first element is oldest) func TestEntriesSort(t *testing.T) { block0 := int64(10) diff --git a/common/types/fixationEntry.go b/common/types/fixationEntry.go index 38bbabfde1..4689545fd4 100644 --- a/common/types/fixationEntry.go +++ b/common/types/fixationEntry.go @@ -8,7 +8,7 @@ import ( // has passed its stale_at time (more than STALE_ENTRY_TIME since deletion). func (entry Entry) IsStale(ctx sdk.Context) bool { if entry.GetRefcount() == 0 { - if entry.StaleAt < uint64(ctx.BlockHeight()) { + if entry.StaleAt <= uint64(ctx.BlockHeight()) { return true } } diff --git a/x/plans/keeper/keeper.go b/x/plans/keeper/keeper.go index 61b37f9663..f81bc7d082 100644 --- a/x/plans/keeper/keeper.go +++ b/x/plans/keeper/keeper.go @@ -31,13 +31,19 @@ func NewKeeper( ps = ps.WithKeyTable(types.ParamKeyTable()) } + fs := *common.NewFixationStore(storeKey, cdc, types.PlanFixationStorePrefix) + return &Keeper{ memKey: memKey, paramstore: ps, - plansFs: *common.NewFixationStore(storeKey, cdc, types.PlanFixationStorePrefix), + plansFs: fs, } } +func (k Keeper) BeginBlock(ctx sdk.Context) { + k.plansFs.AdvanceBlock(ctx) +} + func (k Keeper) Logger(ctx sdk.Context) log.Logger { return ctx.Logger().With("module", fmt.Sprintf("x/%s", types.ModuleName)) } diff --git a/x/plans/module.go b/x/plans/module.go index 699d058f39..78c7c3f7b2 100644 --- a/x/plans/module.go +++ b/x/plans/module.go @@ -161,6 +161,7 @@ func (AppModule) ConsensusVersion() uint64 { return 2 } // BeginBlock executes all ABCI BeginBlock logic respective to the capability module. func (am AppModule) BeginBlock(ctx sdk.Context, _ abci.RequestBeginBlock) { + am.keeper.BeginBlock(ctx) } // EndBlock executes all ABCI EndBlock logic respective to the capability module. It diff --git a/x/projects/keeper/keeper.go b/x/projects/keeper/keeper.go index 4c1b7a6fa7..5627a96952 100644 --- a/x/projects/keeper/keeper.go +++ b/x/projects/keeper/keeper.go @@ -48,6 +48,11 @@ func NewKeeper( } } +func (k Keeper) BeginBlock(ctx sdk.Context) { + k.projectsFS.AdvanceBlock(ctx) + k.developerKeysFS.AdvanceBlock(ctx) +} + func (k Keeper) Logger(ctx sdk.Context) log.Logger { return ctx.Logger().With("module", fmt.Sprintf("x/%s", types.ModuleName)) } diff --git a/x/projects/module.go b/x/projects/module.go index c60912c9b5..bf21fb9484 100644 --- a/x/projects/module.go +++ b/x/projects/module.go @@ -161,7 +161,9 @@ func (am AppModule) ExportGenesis(ctx sdk.Context, cdc codec.JSONCodec) json.Raw func (AppModule) ConsensusVersion() uint64 { return 2 } // BeginBlock executes all ABCI BeginBlock logic respective to the capability module. -func (am AppModule) BeginBlock(_ sdk.Context, _ abci.RequestBeginBlock) {} +func (am AppModule) BeginBlock(ctx sdk.Context, _ abci.RequestBeginBlock) { + am.keeper.BeginBlock(ctx) +} // EndBlock executes all ABCI EndBlock logic respective to the capability module. It // returns no validator updates. diff --git a/x/subscription/keeper/subscription.go b/x/subscription/keeper/subscription.go index a063743d07..d4ea097ffe 100644 --- a/x/subscription/keeper/subscription.go +++ b/x/subscription/keeper/subscription.go @@ -40,7 +40,7 @@ func (k Keeper) RemoveSubscription(ctx sdk.Context, consumer string) { // (PlanIndex is empty only in testing of RemoveSubscription) if sub.PlanIndex != "" { - k.plansKeeper.PutPlan(ctx, sub.PlanIndex, sub.Block) + k.plansKeeper.PutPlan(ctx, sub.PlanIndex, sub.PlanBlock) } store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.SubscriptionKeyPrefix))