Skip to content

Commit

Permalink
Merge pull request #427 from lavanet/CNS-385-project-propagate-charge-cu
Browse files Browse the repository at this point in the history
CNS-385: project propagate charge of CU onward
  • Loading branch information
Yaroms authored May 1, 2023
2 parents 288eced + ea131cb commit c094633
Show file tree
Hide file tree
Showing 18 changed files with 442 additions and 260 deletions.
59 changes: 26 additions & 33 deletions common/fixation_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@ import (
//
// Once instantiated with NewFixationStore(), it offers the following methods:
// - AppendEntry(index, block, *entry): add a new "block" version of an entry "index".
// - ModifyEntry(index, block, *entry): modify existing entry with "index" and "block"
// - FindEntry(index, block, *entry): get a copy (no reference) of a version of an entry
// - ModifyEntry(index, block, *entry): modify an existing entry with "index" and exact "block" (*)
// - ReadEntry(index, block, *entry): copy an existing entry with "index" and exact "block" (*)
// - FindEntry(index, block, *entry): get a copy (no reference) of a version of an entry (**)
// - GetEntry(index, *entry): get a copy (and reference) of the latest version of an entry
// - PutEntry(index, block): drop a reference of a version of an entry
// - PutEntry(index, block): drop reference to an existing entry with "index" and exact "block" (*)
// - [TBD] RemoveEntry(index): mark an entry as unavailable for new GetEntry() calls
// - GetAllEntryIndices(): get all the entries indices (without versions)
// - GetAllEntryVersions(index): get all the versions of an entry (for testing)
// - GetEntryVersionsRange(index, block, delta): get range of entry versions (**)
// - AdvanceBlock(): notify of block progress (e.g. BeginBlock) for garbage collection
// Note:
// - methods marked with (*) expect an exact existing method, or otherwise will panic
// - methods marked with (**) will match an entry with the nearest-smaller block version
//
// Entry names (index) must contain only visible ascii characters (ascii values 32-125).
// The ascii 'DEL' invisible character is used internally to terminate the index values
Expand Down Expand Up @@ -134,7 +139,8 @@ func (fs *FixationStore) AppendEntry(

// if the new entry's block is equal to the latest entry, overwrite the latest entry
if block == latestEntry.Block {
return fs.ModifyEntry(ctx, index, block, entryData)
fs.ModifyEntry(ctx, index, block, entryData)
return nil
}

fs.putEntry(ctx, latestEntry)
Expand Down Expand Up @@ -218,31 +224,27 @@ func (fs *FixationStore) deleteStaleEntries(ctx sdk.Context, safeIndex string) {
}
}

// ModifyEntry modifies an existing entry in the store
func (fs *FixationStore) ModifyEntry(ctx sdk.Context, index string, block uint64, entryData codec.ProtoMarshaler) error {
// ReadEntry returns and existing entry with index and specific block
func (fs *FixationStore) ReadEntry(ctx sdk.Context, index string, block uint64, entryData codec.ProtoMarshaler) {
safeIndex, err := types.SanitizeIndex(index)
if err != nil {
details := map[string]string{"index": index}
return utils.LavaError(ctx, ctx.Logger(), "ModifyEntry_invalid_index", details, "invalid non-ascii entry")
panic("ReadEntry invalid non-ascii entry: " + index)
}

// get the entry from the store
entry, found := fs.getUnmarshaledEntryForBlock(ctx, safeIndex, block)
if !found {
details := map[string]string{
"fs.prefix": fs.prefix,
"index": index,
"block": strconv.FormatUint(block, 10),
}
return utils.LavaError(ctx, ctx.Logger(), "SetEntry_cant_find_entry", details, "entry does not exist")
}
entry := fs.getEntry(ctx, safeIndex, block)
fs.cdc.MustUnmarshal(entry.GetData(), entryData)
}

// update entry data
marshaledEntryData := fs.cdc.MustMarshal(entryData)
entry.Data = marshaledEntryData
// ModifyEntry modifies an existing entry in the store
func (fs *FixationStore) ModifyEntry(ctx sdk.Context, index string, block uint64, entryData codec.ProtoMarshaler) {
safeIndex, err := types.SanitizeIndex(index)
if err != nil {
panic("ModifyEntry with non-ascii index: " + index)
}

entry := fs.getEntry(ctx, safeIndex, block)
entry.Data = fs.cdc.MustMarshal(entryData)
fs.setEntry(ctx, entry)
return nil
}

// getUnmarshaledEntryForBlock gets an entry version for an index that has
Expand Down Expand Up @@ -339,19 +341,10 @@ func (fs *FixationStore) putEntry(ctx sdk.Context, entry types.Entry) {
func (fs *FixationStore) PutEntry(ctx sdk.Context, index string, block uint64) {
safeIndex, err := types.SanitizeIndex(index)
if err != nil {
panic("PutEntry with non-ascii index: " + index)
}

entry, found := fs.getUnmarshaledEntryForBlock(ctx, safeIndex, block)
if !found {
panic("PutEntry with unknown index: " + index)
}

if entry.Block != block {
panic("PutEntry with block mismatch index: " + index +
" got " + strconv.Itoa(int(entry.Block)) + " expected " + strconv.Itoa(int(block)))
panic("PutEntry invalid non-ascii entry: " + index)
}

entry := fs.getEntry(ctx, safeIndex, block)
fs.putEntry(ctx, entry)
}

Expand Down
53 changes: 45 additions & 8 deletions common/fixation_entry_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,33 +64,70 @@ func (fs FixationStore) GetAllEntryIndices(ctx sdk.Context) []string {
return fs.GetAllEntryIndicesWithPrefix(ctx, "")
}

// GetAllEntryVersions returns a list of all versions (blocks) of an entry.
// If stale == true, then the output will include stale versions (for testing).
func (fs *FixationStore) GetAllEntryVersions(ctx sdk.Context, index string, stale bool) (blocks []uint64) {
func (fs *FixationStore) getEntryVersionsFilter(ctx sdk.Context, index string, block uint64, filter func(*types.Entry) bool) (blocks []uint64) {
safeIndex, err := types.SanitizeIndex(index)
if err != nil {
details := map[string]string{"index": index}
utils.LavaError(ctx, ctx.Logger(), "GetAllEntryVersions", details, "invalid non-ascii entry")
utils.LavaError(ctx, ctx.Logger(), "getEntryVersionsFilter", details, "invalid non-ascii entry")
return nil
}

store := fs.getStore(ctx, safeIndex)

iterator := sdk.KVStorePrefixIterator(store, []byte{})
iterator := sdk.KVStoreReversePrefixIterator(store, []byte{})
defer iterator.Close()

for ; iterator.Valid(); iterator.Next() {
var entry types.Entry
fs.cdc.MustUnmarshal(iterator.Value(), &entry)
if !stale && entry.IsStale(ctx) {
continue

if filter(&entry) {
blocks = append(blocks, entry.Block)
}

if entry.Block <= block {
break
}
blocks = append(blocks, entry.Block)
}

// reverse the result slice to return the block in ascending order
length := len(blocks)
for i := 0; i < length/2; i++ {
blocks[i], blocks[length-i-1] = blocks[length-i-1], blocks[i]
}

return blocks
}

// GetEntryVersionsRange returns a list of versions from nearest-smaller block
// and onward, and not more than delta blocks further (skip stale entries).
func (fs *FixationStore) GetEntryVersionsRange(ctx sdk.Context, index string, block, delta uint64) (blocks []uint64) {
filter := func(entry *types.Entry) bool {
if entry.IsStale(ctx) {
return false
}
if entry.Block > block+delta {
return false
}
return true
}

return fs.getEntryVersionsFilter(ctx, index, block, filter)
}

// GetAllEntryVersions returns a list of all versions (blocks) of an entry.
// If stale == true, then the output will include stale versions (for testing).
func (fs *FixationStore) GetAllEntryVersions(ctx sdk.Context, index string, stale bool) (blocks []uint64) {
filter := func(entry *types.Entry) bool {
if !stale && entry.IsStale(ctx) {
return false
}
return true
}

return fs.getEntryVersionsFilter(ctx, index, 0, filter)
}

func (fs FixationStore) createEntryIndexStoreKey() string {
return types.EntryIndexPrefix + fs.prefix
}
Expand Down
49 changes: 40 additions & 9 deletions common/fixation_entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,9 @@ func testWithFixationTemplate(t *testing.T, playbook []fixationTemplate, countOb
require.NotNil(t, err, what)
}
case "modify":
err := fs[play.store].ModifyEntry(ctx, index, block, &coins[play.coin])
if !play.fail {
require.Nil(t, err, what)
} else {
require.NotNil(t, err, what)
}
fs[play.store].ModifyEntry(ctx, index, block, &coins[play.coin])
case "read":
fs[play.store].ReadEntry(ctx, index, block, &dummy)
case "find":
found := fs[play.store].FindEntry(ctx, index, block, &dummy)
if !play.fail {
Expand Down Expand Up @@ -114,15 +111,17 @@ func testWithFixationTemplate(t *testing.T, playbook []fixationTemplate, countOb
// Test API calls with invalid entry index
func TestEntryInvalidIndex(t *testing.T) {
invalid := "index" + string('\001')
unknown := "unknown"

playbook := []fixationTemplate{
{op: "append", name: "with invalid index (fail)", index: invalid, fail: true},
{op: "modify", name: "with invalid index (fail)", index: invalid, fail: true},
{op: "find", name: "with invalid index (fail)", index: invalid, fail: true},
{op: "get", name: "with invalid index (fail)", index: invalid, fail: true},
{op: "find", name: "with unknown index (fail)", index: unknown, fail: true},
{op: "get", name: "with unknown index (fail)", index: unknown, fail: true},
}

testWithFixationTemplate(t, playbook, 3, 1)
testWithFixationTemplate(t, playbook, 1, 1)
}

// Test addition and auto-removal of a fixation entry
Expand Down Expand Up @@ -261,7 +260,39 @@ func TestDoublePutEntry(t *testing.T) {
{op: "put", name: "negative refcount entry #1 version 0", count: block0, fail: false},
}

require.Panics(t, func() { testWithFixationTemplate(t, playbook, 3, 1) })
require.Panics(t, func() { testWithFixationTemplate(t, playbook, 1, 1) })
}

func TestExactEntryMethods(t *testing.T) {
invalid := "index" + string('\001')
unknown := "unknown"

block0 := int64(10)
block1 := block0 + types.STALE_ENTRY_TIME + 1

playbook := []fixationTemplate{
{op: "append", name: "entry #1 version 0", count: block0, coin: 0},
{op: "append", name: "entry #1 version 1", count: block1, coin: 0},
}

testWithFixationTemplate(t, playbook, 1, 1)

playbooks := [][]fixationTemplate{
{{op: "read", name: "with invalid index (fail)", index: invalid}},
{{op: "modify", name: "with invalid index (fail)", index: invalid}},
{{op: "put", name: "with invalid index (fail)", index: invalid}},
{{op: "read", name: "with unknown index (fail)", index: unknown}},
{{op: "modify", name: "with unknown index (fail)", index: unknown}},
{{op: "put", name: "with unknown index (fail)", index: unknown}},
{{op: "read", name: "entry #1 version 0", count: block0 + 1, coin: 0}},
{{op: "modify", name: "entry #1 version 0", count: block0 + 1, coin: 0}},
{{op: "put", name: "entry #1 version 0", count: block0 + 1, coin: 0}},
}

for _, p := range playbooks {
what := p[0].op + " " + p[0].name
require.Panics(t, func() { testWithFixationTemplate(t, p, 1, 1) }, what)
}
}

func TestDeleteTwoEntries(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion common/types/fixationEntry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func FixationVersion() uint64 {
return 2
}

// IsEntryStale tests whether an entry is stale, i.e. has refcount zero _and_
// IsStale tests whether an entry is stale, i.e. has refcount zero _and_
// has passed its stale_at time (more than STALE_ENTRY_TIME since deletion).
func (entry Entry) IsStale(ctx sdk.Context) bool {
if entry.GetRefcount() == 0 {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ require (
github.com/ghodss/yaml v1.0.0 // indirect
github.com/gogo/googleapis v1.4.0 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/tools v0.2.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2 h1:gDLXvp5S9izjldquuoAhDzccbskOL6tDC5jMSyx3zxE=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2/go.mod h1:7pdNwVWBBHGiCxa9lAszqCJMbfTISJ7oMftp8+UGV08=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/gtank/merlin v0.1.1-0.20191105220539-8318aed1a79f/go.mod h1:T86dnYJhcGOh5BjZFCJWTDeTK7XW8uE+E21Cy/bIQ+s=
Expand Down
3 changes: 2 additions & 1 deletion proto/projects/project.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ message Project {
Policy admin_policy = 6;
uint64 used_cu = 7;
Policy subscription_policy = 8;
uint64 snapshot = 9; // snapshot id to uniquely identify snapshots
}

message ProjectKey {
Expand Down Expand Up @@ -54,4 +55,4 @@ message ProjectData {
bool enabled = 3;
repeated ProjectKey projectKeys = 4 [(gogoproto.nullable) = false];
Policy policy = 5;
}
}
4 changes: 2 additions & 2 deletions protocol/chainlib/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package chainlib

import (
"encoding/json"
"io/ioutil"
"io"
"net/http/httptest"
"testing"
"time"
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestExtractDappIDFromFiberContext(t *testing.T) {
req := httptest.NewRequest("GET", testCase.route, nil)

resp, _ := app.Test(req, 1)
body, _ := ioutil.ReadAll(resp.Body)
body, _ := io.ReadAll(resp.Body)
responseString := string(body)
if responseString != testCase.expected {
t.Errorf("Expected %s but got %s", testCase.expected, responseString)
Expand Down
2 changes: 1 addition & 1 deletion x/pairing/keeper/msg_server_relay_payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (k Keeper) chargeComputeUnitsToProjectAndSubscription(ctx sdk.Context, clie
return fmt.Errorf("failed to get project for client")
}

err = k.projectsKeeper.ChargeComputeUnitsToProject(ctx, project, relay.CuSum)
err = k.projectsKeeper.ChargeComputeUnitsToProject(ctx, project, uint64(ctx.BlockHeight()), relay.CuSum)
if err != nil {
return fmt.Errorf("failed to add CU to the project")
}
Expand Down
2 changes: 1 addition & 1 deletion x/pairing/types/expected_keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type BankKeeper interface {
}

type ProjectsKeeper interface {
ChargeComputeUnitsToProject(ctx sdk.Context, project projectstypes.Project, cu uint64) (err error)
ChargeComputeUnitsToProject(ctx sdk.Context, project projectstypes.Project, block uint64, cu uint64) (err error)
GetProjectForDeveloper(ctx sdk.Context, developerKey string, blockHeight uint64) (proj projectstypes.Project, vrfpk string, errRet error)
}

Expand Down
9 changes: 2 additions & 7 deletions x/plans/keeper/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ func (m Migrator) Migrate2to3(ctx sdk.Context) error {
blocks := m.keeper.plansFS.GetAllEntryVersions(ctx, planIndex, true)
for _, block := range blocks {
var plan_v2 v2.PlanV2
if found := m.keeper.plansFS.FindEntry(ctx, planIndex, block, &plan_v2); !found {
return fmt.Errorf("could not find plan with index %s", planIndex)
}
m.keeper.plansFS.ReadEntry(ctx, planIndex, block, &plan_v2)

// create policy struct
planPolicy := projecttypes.Policy{
Expand All @@ -55,10 +53,7 @@ func (m Migrator) Migrate2to3(ctx sdk.Context) error {
PlanPolicy: planPolicy,
}

err := m.keeper.plansFS.ModifyEntry(ctx, planIndex, block, &plan_v3)
if err != nil {
return err
}
m.keeper.plansFS.ModifyEntry(ctx, planIndex, block, &plan_v3)
}
}

Expand Down
1 change: 1 addition & 0 deletions x/projects/keeper/creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (k Keeper) snapshotProject(ctx sdk.Context, projectID string) error {
}

project.UsedCu = 0
project.Snapshot += 1

return k.projectsFS.AppendEntry(ctx, project.Index, uint64(ctx.BlockHeight()), &project)
}
Expand Down
2 changes: 1 addition & 1 deletion x/projects/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
paramtypes "github.com/cosmos/cosmos-sdk/x/params/types"
common "github.com/lavanet/lava/common"
"github.com/lavanet/lava/common"
"github.com/lavanet/lava/x/projects/types"
)

Expand Down
Loading

0 comments on commit c094633

Please sign in to comment.