diff --git a/app/app.go b/app/app.go index 7cd77b35e4..475c4632a7 100644 --- a/app/app.go +++ b/app/app.go @@ -5,12 +5,15 @@ import ( "net/http" "os" "path/filepath" + "strings" "sync" + "github.com/crypto-org-chain/cronos/x/cronos" "github.com/crypto-org-chain/cronos/x/cronos/middleware" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/server" "github.com/gorilla/mux" "github.com/rakyll/statik/fs" "github.com/spf13/cast" @@ -122,7 +125,7 @@ import ( // this line is used by starport scaffolding # stargate/app/moduleImport cronosappclient "github.com/crypto-org-chain/cronos/client" - "github.com/crypto-org-chain/cronos/x/cronos" + "github.com/crypto-org-chain/cronos/versiondb" cronosclient "github.com/crypto-org-chain/cronos/x/cronos/client" cronoskeeper "github.com/crypto-org-chain/cronos/x/cronos/keeper" evmhandlers "github.com/crypto-org-chain/cronos/x/cronos/keeper/evmhandlers" @@ -350,7 +353,8 @@ func New( // configure state listening capabilities using AppOptions // we are doing nothing with the returned streamingServices and waitGroup in this case // Only support file streamer right now. - if cast.ToString(appOpts.Get(cronosappclient.FlagStreamers)) == "file" { + streamers := cast.ToString(appOpts.Get(cronosappclient.FlagStreamers)) + if strings.Contains(streamers, "file") { streamingDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data", FileStreamerDirectory) if err := os.MkdirAll(streamingDir, os.ModePerm); err != nil { panic(err) @@ -373,6 +377,33 @@ func New( } } + if strings.Contains(streamers, "versiondb") { + rootDir := cast.ToString(appOpts.Get(flags.FlagHome)) + dataDir := filepath.Join(rootDir, "data", "versiondb") + backendType := server.GetAppDBBackend(appOpts) + plainDB, err := dbm.NewDB("plain.db", backendType, dataDir) + if err != nil { + panic(err) + } + historyDB, err := dbm.NewDB("history.db", backendType, dataDir) + if err != nil { + panic(err) + } + changesetDB, err := dbm.NewDB("changeset.db", backendType, dataDir) + if err != nil { + panic(err) + } + verstore := versiondb.NewStore(plainDB, historyDB, changesetDB) + + // default to exposing all + exposeStoreKeys := make([]storetypes.StoreKey, 0, len(keys)) + for _, storeKey := range keys { + exposeStoreKeys = append(exposeStoreKeys, storeKey) + } + service := versiondb.NewStreamingService(verstore, exposeStoreKeys) + bApp.SetStreamingService(service) + } + app := &App{ BaseApp: bApp, cdc: cdc, diff --git a/default.nix b/default.nix index fcbab2154a..0dd65d1259 100644 --- a/default.nix +++ b/default.nix @@ -27,6 +27,7 @@ buildGoApplication rec { "!/app/" "!/cmd/" "!/client/" + "!/versiondb/" "!go.mod" "!go.sum" "!gomod2nix.toml" diff --git a/go.mod b/go.mod index b463d1aa3d..ba6a100b51 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.18 require ( cosmossdk.io/math v1.0.0-beta.3 + github.com/RoaringBitmap/roaring v1.2.1 github.com/armon/go-metrics v0.4.1 github.com/cosmos/cosmos-sdk v0.46.2-0.20220923192627-95948f6692bb github.com/cosmos/ibc-go/v5 v5.0.0-rc2 @@ -45,6 +46,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect github.com/bgentry/speakeasy v0.1.0 // indirect + github.com/bits-and-blooms/bitset v1.2.0 // indirect github.com/btcsuite/btcd v0.22.1 // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect @@ -135,6 +137,7 @@ require ( github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-testing-interface v1.0.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/mschoch/smat v0.2.0 // indirect github.com/mtibben/percent v0.2.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/pelletier/go-toml v1.9.5 // indirect diff --git a/go.sum b/go.sum index 3f6978b948..9a76dcfa8c 100644 --- a/go.sum +++ b/go.sum @@ -239,6 +239,8 @@ github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/RoaringBitmap/roaring v1.2.1 h1:58/LJlg/81wfEHd5L9qsHduznOIhyv4qb1yWcSvVq9A= +github.com/RoaringBitmap/roaring v1.2.1/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA= github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= @@ -347,6 +349,7 @@ github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d/go.mod h1:6QX/PXZ github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= +github.com/bits-and-blooms/bitset v1.2.0 h1:Kn4yilvwNtMACtf1eYDlG8H77R07mZSPbMjLyS07ChA= github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/bkielbasa/cyclop v1.2.0/go.mod h1:qOI0yy6A7dYC4Zgsa72Ppm9kONl0RoIlPbzot9mhmeI= @@ -1631,6 +1634,8 @@ github.com/mozilla/tls-observatory v0.0.0-20210609171429-7bc42856d2e5/go.mod h1: github.com/mrunalp/fileutils v0.0.0-20200520151820-abd8a0e76976/go.mod h1:x8F1gnqOkIEiO4rqoeEEEqQbo7HjGMTvyoq3gej4iT0= github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2QJNHXfbSQ= github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg= +github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= +github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= diff --git a/gomod2nix.toml b/gomod2nix.toml index 6bbd102ce9..c08ebb2a75 100644 --- a/gomod2nix.toml +++ b/gomod2nix.toml @@ -31,6 +31,9 @@ schema = 3 [mod."github.com/ChainSafe/go-schnorrkel"] version = "v0.0.0-20200405005733-88cbf1b4c40d" hash = "sha256-i8RXZemJGlSjBT35oPm0SawFiBoIU5Pkq5xp4n/rzCY=" + [mod."github.com/RoaringBitmap/roaring"] + version = "v1.2.1" + hash = "sha256-0/R956wrCW71eOE36CbxGJJRuQjKwvvIQ/D8QTn2A6w=" [mod."github.com/StackExchange/wmi"] version = "v1.2.1" hash = "sha256-1BoEeWAWyebH+1mMuyPhWZut8nWHb6r73MgcqlGuUEY=" @@ -58,6 +61,9 @@ schema = 3 [mod."github.com/bgentry/speakeasy"] version = "v0.1.0" hash = "sha256-Gt1vj6CFovLnO6wX5u2O4UfecY9V2J9WGw1ez4HMrgk=" + [mod."github.com/bits-and-blooms/bitset"] + version = "v1.2.0" + hash = "sha256-IxNmtELycM+XVzg4qBv04hAJUT3nSWuyP9R+8zc9LmU=" [mod."github.com/btcsuite/btcd"] version = "v0.22.1" hash = "sha256-hBU+roIELcmbW2Gz7eGZzL9qNA1bakq5wNxqCgs4TKc=" @@ -354,6 +360,9 @@ schema = 3 [mod."github.com/mitchellh/mapstructure"] version = "v1.5.0" hash = "sha256-ztVhGQXs67MF8UadVvG72G3ly0ypQW0IRDdOOkjYwoE=" + [mod."github.com/mschoch/smat"] + version = "v0.2.0" + hash = "sha256-DZvUJXjIcta3U+zxzgU3wpoGn/V4lpBY7Xme8aQUi+E=" [mod."github.com/mtibben/percent"] version = "v0.2.1" hash = "sha256-Zj1lpCP6mKQ0UUTMs2By4LC414ou+iJzKkK+eBHfEcc=" diff --git a/scripts/cronos-devnet.yaml b/scripts/cronos-devnet.yaml index 0b9b865e1a..ec2f3c9fe0 100644 --- a/scripts/cronos-devnet.yaml +++ b/scripts/cronos-devnet.yaml @@ -1,7 +1,7 @@ dotenv: .env cronos_777-1: cmd: cronosd - start-flags: "--trace" + start-flags: "--trace --streamers versiondb,file" app-config: minimum-gas-prices: 0basetcro index-events: diff --git a/versiondb/dbutils.go b/versiondb/dbutils.go new file mode 100644 index 0000000000..2334d0f2c1 --- /dev/null +++ b/versiondb/dbutils.go @@ -0,0 +1,84 @@ +package versiondb + +import ( + "encoding/binary" + "sort" + + "github.com/RoaringBitmap/roaring/roaring64" +) + +var ChunkLimit = uint64(1950) // threshold beyond which MDBX overflow pages appear: 4096 / 2 - (keySize + 8) + +// CutLeft - cut from bitmap `targetSize` bytes from left +// removing lft part from `bm` +// returns nil on zero cardinality +func CutLeft64(bm *roaring64.Bitmap, sizeLimit uint64) *roaring64.Bitmap { + if bm.GetCardinality() == 0 { + return nil + } + + sz := bm.GetSerializedSizeInBytes() + if sz <= sizeLimit { + lft := roaring64.New() + lft.AddRange(bm.Minimum(), bm.Maximum()+1) + lft.And(bm) + lft.RunOptimize() + bm.Clear() + return lft + } + + from := bm.Minimum() + minMax := bm.Maximum() - bm.Minimum() + to := sort.Search(int(minMax), func(i int) bool { // can be optimized to avoid "too small steps", but let's leave it for readability + lft := roaring64.New() // bitmap.Clear() method intentionally not used here, because then serialized size of bitmap getting bigger + lft.AddRange(from, from+uint64(i)+1) + lft.And(bm) + lft.RunOptimize() + return lft.GetSerializedSizeInBytes() > sizeLimit + }) + + lft := roaring64.New() + lft.AddRange(from, from+uint64(to)) // no +1 because sort.Search returns element which is just higher threshold - but we need lower + lft.And(bm) + bm.RemoveRange(from, from+uint64(to)) + lft.RunOptimize() + return lft +} + +func WalkChunks64(bm *roaring64.Bitmap, sizeLimit uint64, f func(chunk *roaring64.Bitmap, isLast bool) error) error { + for bm.GetCardinality() > 0 { + if err := f(CutLeft64(bm, sizeLimit), bm.GetCardinality() == 0); err != nil { + return err + } + } + return nil +} + +func WalkChunkWithKeys64(k []byte, m *roaring64.Bitmap, sizeLimit uint64, f func(chunkKey []byte, chunk *roaring64.Bitmap) error) error { + return WalkChunks64(m, sizeLimit, func(chunk *roaring64.Bitmap, isLast bool) error { + chunkKey := make([]byte, len(k)+8) + copy(chunkKey, k) + if isLast { + binary.BigEndian.PutUint64(chunkKey[len(k):], ^uint64(0)) + } else { + binary.BigEndian.PutUint64(chunkKey[len(k):], chunk.Maximum()) + } + return f(chunkKey, chunk) + }) +} + +// SeekInBitmap64 - returns value in bitmap which is >= n +func SeekInBitmap64(m *roaring64.Bitmap, n uint64) (found uint64, ok bool) { + if m == nil || m.IsEmpty() { + return 0, false + } + if n == 0 { + return m.Minimum(), true + } + searchRank := m.Rank(n - 1) + if searchRank >= m.GetCardinality() { + return 0, false + } + found, _ = m.Select(searchRank) + return found, true +} diff --git a/versiondb/history.go b/versiondb/history.go new file mode 100644 index 0000000000..1fea7ebcdc --- /dev/null +++ b/versiondb/history.go @@ -0,0 +1,92 @@ +package versiondb + +import ( + "bytes" + + "github.com/RoaringBitmap/roaring/roaring64" + + sdk "github.com/cosmos/cosmos-sdk/types" + dbm "github.com/tendermint/tm-db" +) + +const LastChunkId = ^uint64(0) + +func HistoryIndexKey(key []byte, height uint64) []byte { + return append(key, sdk.Uint64ToBigEndian(height)...) +} + +// GetHistoryIndex returns the history index bitmap chunk which covers the target version. +func GetHistoryIndex(db dbm.DB, key []byte, height uint64) (*roaring64.Bitmap, error) { + // try to seek the first chunk whose maximum is bigger or equal to the target height. + it, err := db.Iterator( + HistoryIndexKey(key, height), + sdk.PrefixEndBytes(key), + ) + if err != nil { + return nil, err + } + defer it.Close() // nolint: errcheck + + if !it.Valid() { + return nil, nil + } + + m := roaring64.New() + _, err = m.ReadFrom(bytes.NewReader(it.Value())) + if err != nil { + return nil, err + } + return m, nil +} + +// SeekHistoryIndex locate the minimal version that changed the key and is larger than the target version, +// using the returned version can find the value for the target version in changeset table. +// If not found, return -1 +func SeekHistoryIndex(db dbm.DB, key []byte, version uint64) (int64, error) { + // either m.Maximum() >= version + 1, or is the last chunk. + m, err := GetHistoryIndex(db, key, version+1) + if err != nil { + return -1, err + } + found, ok := SeekInBitmap64(m, version+1) + if !ok { + return -1, nil + } + return int64(found), nil +} + +// WriteHistoryIndex set the block height to the history bitmap. +// it try to set to the last chunk, if the last chunk exceeds chunk limit, split it. +func WriteHistoryIndex(db dbm.DB, batch dbm.Batch, key []byte, height uint64) error { + lastKey := HistoryIndexKey(key, LastChunkId) + bz, err := db.Get(lastKey) + if err != nil { + return err + } + + m := roaring64.New() + if len(bz) > 0 { + _, err = m.ReadFrom(bytes.NewReader(bz)) + if err != nil { + return err + } + } + m.Add(height) + + // chunking + if err = WalkChunks64(m, ChunkLimit, func(chunk *roaring64.Bitmap, isLast bool) error { + chunkKey := lastKey + if !isLast { + chunkKey = HistoryIndexKey(key, chunk.Maximum()) + } + bz, err := chunk.ToBytes() + if err != nil { + return err + } + return batch.Set(chunkKey, bz) + }); err != nil { + return err + } + + return nil +} diff --git a/versiondb/store.go b/versiondb/store.go new file mode 100644 index 0000000000..f026cb85ff --- /dev/null +++ b/versiondb/store.go @@ -0,0 +1,156 @@ +package versiondb + +import ( + "bytes" + "errors" + + "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" + dbm "github.com/tendermint/tm-db" +) + +var _ VersionStore = (*Store)(nil) + +// Store implements `VersionStore`. +type Store struct { + // latest key-value pairs + plainDB dbm.DB + // history bitmap index of keys + historyDB dbm.DB + // changesets of each blocks + changesetDB dbm.DB +} + +func NewStore(plainDB, historyDB, changesetDB dbm.DB) *Store { + return &Store{plainDB, historyDB, changesetDB} +} + +// PutAtVersion implements VersionStore interface +// TODO reduce allocation within iterations. +func (s *Store) PutAtVersion(version int64, changeSet []types.StoreKVPair) error { + plainBatch := s.plainDB.NewBatch() + defer plainBatch.Close() // nolint: errcheck + historyBatch := s.historyDB.NewBatch() + defer historyBatch.Close() // nolint: errcheck + changesetBatch := s.changesetDB.NewBatch() + defer changesetBatch.Close() // nolint: errcheck + + for _, pair := range changeSet { + key := prependStoreKey(pair.StoreKey, pair.Key) + + if version == 0 { + // genesis state is written into plain state directly + if pair.Delete { + return errors.New("can't delete at genesis") + } else { + if err := plainBatch.Set(key, pair.Value); err != nil { + return err + } + } + continue + } + + original, err := s.plainDB.Get(key) + if err != nil { + return err + } + if bytes.Equal(original, pair.Value) { + // do nothing if the value is not changed + continue + } + + // write history index + if err := WriteHistoryIndex(s.historyDB, historyBatch, key, uint64(version)); err != nil { + return err + } + + // write the old value to changeset + if len(original) > 0 { + changesetKey := append(sdk.Uint64ToBigEndian(uint64(version)), key...) + if err := changesetBatch.Set(changesetKey, original); err != nil { + return err + } + } + + // write the new value to plain state + if pair.Delete { + if err := plainBatch.Delete(key); err != nil { + return err + } + } else { + if err := plainBatch.Set(key, pair.Value); err != nil { + return err + } + } + } + + if err := changesetBatch.WriteSync(); err != nil { + return err + } + if err := historyBatch.WriteSync(); err != nil { + return err + } + return plainBatch.WriteSync() +} + +// GetAtVersion implements VersionStore interface +func (s *Store) GetAtVersion(storeKey string, key []byte, version *int64) ([]byte, error) { + rawKey := prependStoreKey(storeKey, key) + if version == nil { + return s.plainDB.Get(rawKey) + } + height := uint64(*version) + found, err := SeekHistoryIndex(s.historyDB, rawKey, height) + if err != nil { + return nil, err + } + if found < 0 { + // there's no change records found after the target version, query the latest state. + return s.plainDB.Get(rawKey) + } + // get from changeset + changesetKey := ChangesetKey(uint64(found), rawKey) + return s.changesetDB.Get(changesetKey) +} + +// HasAtVersion implements VersionStore interface +func (s *Store) HasAtVersion(storeKey string, key []byte, version *int64) (bool, error) { + rawKey := prependStoreKey(storeKey, key) + if version == nil { + return s.plainDB.Has(rawKey) + } + height := uint64(*version) + found, err := SeekHistoryIndex(s.historyDB, rawKey, height) + if err != nil { + return false, err + } + if found < 0 { + // there's no change records after the target version, query the latest state. + return s.plainDB.Has(rawKey) + } + // get from changeset + changesetKey := ChangesetKey(uint64(found), rawKey) + return s.changesetDB.Has(changesetKey) +} + +// IteratorAtVersion implements VersionStore interface +func (s *Store) IteratorAtVersion(storeKey string, start, end []byte, version *int64) types.Iterator { + // TODO + return nil +} + +// ReverseIteratorAtVersion implements VersionStore interface +func (s *Store) ReverseIteratorAtVersion(storeKey string, start, end []byte, version *int64) types.Iterator { + // TODO + return nil +} + +// ChangesetKey build key changeset db +func ChangesetKey(version uint64, key []byte) []byte { + return append(sdk.Uint64ToBigEndian(version), key...) +} + +// prependStoreKey prepends storeKey to the key +func prependStoreKey(storeKey string, key []byte) []byte { + return append([]byte(storeKey), key...) +} diff --git a/versiondb/store_test.go b/versiondb/store_test.go new file mode 100644 index 0000000000..880f2ac297 --- /dev/null +++ b/versiondb/store_test.go @@ -0,0 +1,86 @@ +package versiondb + +import ( + "testing" + + "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tm-db" +) + +func TestBasics(t *testing.T) { + var v int64 + key1 := []byte("key1") + key2 := []byte("key2") + value1 := []byte("value1") + value2 := []byte("value2") + + store := NewStore(dbm.NewMemDB(), dbm.NewMemDB(), dbm.NewMemDB()) + require.NoError(t, store.PutAtVersion(0, []types.StoreKVPair{ + {StoreKey: "bank", Key: key1, Value: value1}, + {StoreKey: "bank", Key: key2, Value: value2}, + {StoreKey: "staking", Key: key1, Value: value1}, + {StoreKey: "evm", Key: key1, Value: value1}, + })) + require.NoError(t, store.PutAtVersion(1, []types.StoreKVPair{ + {StoreKey: "bank", Key: key1, Value: value2}, + })) + require.NoError(t, store.PutAtVersion(2, []types.StoreKVPair{ + {StoreKey: "staking", Delete: true, Key: key1}, + })) + require.NoError(t, store.PutAtVersion(3, []types.StoreKVPair{ + {StoreKey: "staking", Key: key1, Value: value2}, + })) + + value, err := store.GetAtVersion("staking", key1, nil) + require.NoError(t, err) + require.Equal(t, value, value2) + + v = 2 + ok, err := store.HasAtVersion("staking", key1, &v) + require.NoError(t, err) + require.False(t, ok) + value, err = store.GetAtVersion("staking", key1, &v) + require.NoError(t, err) + require.Empty(t, value) + + v = 1 + ok, err = store.HasAtVersion("staking", key1, &v) + require.NoError(t, err) + require.True(t, ok) + value, err = store.GetAtVersion("staking", key1, &v) + require.NoError(t, err) + require.Equal(t, value, value1) + + // never changed since genesis + ok, err = store.HasAtVersion("bank", key2, nil) + require.NoError(t, err) + require.True(t, ok) + value, err = store.GetAtVersion("bank", key2, nil) + require.NoError(t, err) + require.Equal(t, value2, value) + for i := int64(1); i < 4; i++ { + // never changed + ok, err = store.HasAtVersion("bank", key2, &i) + require.NoError(t, err) + require.True(t, ok) + + value, err = store.GetAtVersion("bank", key2, &i) + require.NoError(t, err) + require.Equal(t, value2, value) + } +} + +func TestBitmapChunking(t *testing.T) { + oldChunkLimit := ChunkLimit + ChunkLimit = uint64(100) + key1 := []byte("key1") + store := NewStore(dbm.NewMemDB(), dbm.NewMemDB(), dbm.NewMemDB()) + for i := 0; i < 100000; i++ { + require.NoError(t, store.PutAtVersion(uint64(i), []types.StoreKVPair{ + {StoreKey: "bank", Key: key1, Value: sdk.Uint64ToBigEndian(uint64(i))}, + })) + } + ChunkLimit = oldChunkLimit +} diff --git a/versiondb/streaming_service.go b/versiondb/streaming_service.go new file mode 100644 index 0000000000..ae833fd1eb --- /dev/null +++ b/versiondb/streaming_service.go @@ -0,0 +1,119 @@ +package versiondb + +import ( + "sort" + "strings" + "sync" + + abci "github.com/tendermint/tendermint/abci/types" + + "github.com/cosmos/cosmos-sdk/baseapp" + "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +var _ baseapp.StreamingService = &StreamingService{} + +// FlattenListener listens to the state writes and flatten them in memory. +// One listener only listens to a single storeKey. +type FlattenListener struct { + stateCache map[string]types.StoreKVPair +} + +func NewFlattenListener() *FlattenListener { + return &FlattenListener{ + stateCache: make(map[string]types.StoreKVPair), + } +} + +func (fl *FlattenListener) OnWrite(storeKey types.StoreKey, key []byte, value []byte, delete bool) error { + fl.stateCache[string(key)] = types.StoreKVPair{ + StoreKey: storeKey.Name(), + Delete: delete, + Key: key, + Value: value, + } + return nil +} + +// StreamingService is a concrete implementation of StreamingService that accumulate the state changes in current block, +// writes the ordered changeset out to version storage. +type StreamingService struct { + listeners map[types.StoreKey]*FlattenListener // the listeners that will be initialized with BaseApp + versionStore VersionStore + currentBlockNumber int64 // the current block number +} + +// NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys +func NewStreamingService(versionStore VersionStore, storeKeys []types.StoreKey) *StreamingService { + listeners := make(map[types.StoreKey]*FlattenListener, len(storeKeys)) + // in this case, we are using the same listener for each Store + for _, key := range storeKeys { + listeners[key] = NewFlattenListener() + } + return &StreamingService{listeners, versionStore, 0} +} + +// Listeners satisfies the baseapp.StreamingService interface +func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener { + listeners := make(map[types.StoreKey][]types.WriteListener, len(fss.listeners)) + for storeKey, listener := range fss.listeners { + listeners[storeKey] = []types.WriteListener{listener} + } + return listeners +} + +// ListenBeginBlock satisfies the baseapp.ABCIListener interface +// It sets the currentBlockNumber. +func (fss *StreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { + fss.currentBlockNumber = req.GetHeader().Height + return nil +} + +// ListenDeliverTx satisfies the baseapp.ABCIListener interface +func (fss *StreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { + return nil +} + +// ListenEndBlock satisfies the baseapp.ABCIListener interface +// It merge the state caches of all the listeners together, and write out to the versionStore. +func (fss *StreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error { + // sort by the storeKeys first + storeKeys := make([]types.StoreKey, 0, len(fss.listeners)) + for storeKey := range fss.listeners { + storeKeys = append(storeKeys, storeKey) + } + sort.SliceStable(storeKeys, func(i, j int) bool { + return strings.Compare(storeKeys[i].Name(), storeKeys[j].Name()) < 0 + }) + + // concat the state caches + var changeSet []types.StoreKVPair + for _, storeKey := range storeKeys { + cache := fss.listeners[storeKey].stateCache + fss.listeners[storeKey].stateCache = make(map[string]types.StoreKVPair) + + // sort the cache by key + keys := make([]string, 0, len(cache)) + for key := range cache { + keys = append(keys, key) + } + sort.Strings(keys) + + for _, key := range keys { + changeSet = append(changeSet, cache[key]) + } + } + + return fss.versionStore.PutAtVersion(fss.currentBlockNumber, changeSet) +} + +// Stream satisfies the baseapp.StreamingService interface +func (fss *StreamingService) Stream(wg *sync.WaitGroup) error { + return nil +} + +// Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface +func (fss *StreamingService) Close() error { + return nil +} diff --git a/versiondb/types.go b/versiondb/types.go new file mode 100644 index 0000000000..732875e8e8 --- /dev/null +++ b/versiondb/types.go @@ -0,0 +1,20 @@ +package versiondb + +import ( + "github.com/cosmos/cosmos-sdk/store/types" +) + +// VersionStore is a versioned storage of a flat key-value pairs. +// it don't need to support merkle proof, so could be implemented in a much more efficient way. +// `nil` version means the latest version. +type VersionStore interface { + GetAtVersion(storeKey string, key []byte, version *int64) ([]byte, error) + HasAtVersion(storeKey string, key []byte, version *int64) (bool, error) + IteratorAtVersion(storeKey string, start, end []byte, version *int64) types.Iterator + ReverseIteratorAtVersion(storeKey string, start, end []byte, version *int64) types.Iterator + + // Persist the change set of a block, + // the `changeSet` should be ordered by (storeKey, key), + // the version should be latest version plus one. + PutAtVersion(version int64, changeSet []types.StoreKVPair) error +}