From 4e3213ca528cf48142222faca2e3dcaf62629b93 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Tue, 25 Jul 2023 11:46:02 +0800 Subject: [PATCH 1/4] Problem: versiondb don't support restore from local snapshot to setup a pruned versiondb node, we need to restore versiondb from local snapshot. Solution: - implement a new subcommand for that. --- CHANGELOG.md | 1 + versiondb/client/cmd.go | 1 + versiondb/client/restore.go | 121 +++++++++++++++++++++++++++++++++++ versiondb/tsrocksdb/store.go | 33 ++++++++++ versiondb/types.go | 9 +++ 5 files changed, 165 insertions(+) create mode 100644 versiondb/client/restore.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 94a333cc1d..25783233b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ - [#1083](https://github.com/crypto-org-chain/cronos/pull/1083) memiavl support both sdk 46 and 47 root hash rules. - [#1091](https://github.com/crypto-org-chain/cronos/pull/1091) memiavl support rollback. - [#1100](https://github.com/crypto-org-chain/cronos/pull/1100) memiavl support read-only mode, and grab exclusive lock for write mode. +- [#]() versiondb support restore from local snapshot. ### Improvements diff --git a/versiondb/client/cmd.go b/versiondb/client/cmd.go index 5d085d7b5f..97a134caab 100644 --- a/versiondb/client/cmd.go +++ b/versiondb/client/cmd.go @@ -27,6 +27,7 @@ func ChangeSetGroupCmd(opts Options) *cobra.Command { IngestVersionDBSSTCmd(), ChangeSetToVersionDBCmd(), RestoreAppDBCmd(opts), + RestoreVersionDBCmd(), ) return cmd } diff --git a/versiondb/client/restore.go b/versiondb/client/restore.go new file mode 100644 index 0000000000..4bb81d8b0f --- /dev/null +++ b/versiondb/client/restore.go @@ -0,0 +1,121 @@ +package client + +import ( + "fmt" + "io" + "math" + "path/filepath" + "strconv" + + "cosmossdk.io/errors" + protoio "github.com/gogo/protobuf/io" + "github.com/spf13/cobra" + + "github.com/cosmos/cosmos-sdk/server" + "github.com/cosmos/cosmos-sdk/snapshots" + snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types" + + "github.com/crypto-org-chain/cronos/versiondb" + "github.com/crypto-org-chain/cronos/versiondb/tsrocksdb" +) + +// RestoreVersionDBCmd returns a command to restore a versiondb from local snapshot +func RestoreVersionDBCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "restore-versiondb ", + Short: "Restore initial versiondb from local snapshot", + Args: cobra.ExactArgs(2), + RunE: func(cmd *cobra.Command, args []string) error { + ctx := server.GetServerContextFromCmd(cmd) + + height, err := strconv.ParseUint(args[0], 10, 63) + if err != nil { + return err + } + format, err := strconv.ParseUint(args[1], 10, 32) + if err != nil { + return err + } + + store, err := server.GetSnapshotStore(ctx.Viper) + if err != nil { + return err + } + + snapshot, chChunks, err := store.Load(height, uint32(format)) + if err != nil { + return err + } + + if snapshot == nil { + return fmt.Errorf("snapshot doesn't exist, height: %d, format: %d", height, format) + } + + streamReader, err := snapshots.NewStreamReader(chChunks) + if err != nil { + return err + } + defer streamReader.Close() + + home := ctx.Config.RootDir + versionDB, err := tsrocksdb.NewStore(filepath.Join(home, "data", "versiondb")) + if err != nil { + return err + } + + ch := make(chan versiondb.ImportEntry, 128) + + go func() { + defer close(ch) + + if err := readSnapshotEntries(streamReader, ch); err != nil { + ctx.Logger.Error("failed to read snapshot entries", "err", err) + } + }() + + return versionDB.Import(int64(height), ch) + }, + } + return cmd +} + +// readSnapshotEntries reads key-value entries from protobuf reader and feed to the channel +func readSnapshotEntries(protoReader protoio.Reader, ch chan<- versiondb.ImportEntry) error { + var ( + snapshotItem snapshottypes.SnapshotItem + storeKey string + ) + +loop: + for { + snapshotItem = snapshottypes.SnapshotItem{} + err := protoReader.ReadMsg(&snapshotItem) + if err == io.EOF { + break + } else if err != nil { + return errors.Wrap(err, "invalid protobuf message") + } + + switch item := snapshotItem.Item.(type) { + case *snapshottypes.SnapshotItem_Store: + storeKey = item.Store.Name + case *snapshottypes.SnapshotItem_IAVL: + if storeKey == "" { + return errors.Wrap(err, "invalid protobuf message, store name is empty") + } + if item.IAVL.Height > math.MaxInt8 { + return fmt.Errorf("node height %v cannot exceed %v", + item.IAVL.Height, math.MaxInt8) + } + ch <- versiondb.ImportEntry{ + StoreKey: storeKey, + Key: item.IAVL.Key, + Value: item.IAVL.Value, + } + default: + break loop + } + } + + return nil +} diff --git a/versiondb/tsrocksdb/store.go b/versiondb/tsrocksdb/store.go index 821cb1bc1d..ebecd19717 100644 --- a/versiondb/tsrocksdb/store.go +++ b/versiondb/tsrocksdb/store.go @@ -17,6 +17,8 @@ const ( StorePrefixTpl = "s/k:%s/" latestVersionKey = "s/latest" + + ImportCommitBatchSize = 10000 ) var ( @@ -167,6 +169,37 @@ func (s Store) FeedChangeSet(version int64, store string, changeSet *iavl.Change return s.db.Write(defaultWriteOpts, batch) } +// Import loads the initial version of the state +func (s Store) Import(version int64, ch <-chan versiondb.ImportEntry) error { + batch := grocksdb.NewWriteBatch() + defer batch.Destroy() + + var ts [TimestampSize]byte + binary.LittleEndian.PutUint64(ts[:], uint64(version)) + + var counter int + for entry := range ch { + key := cloneAppend(storePrefix(entry.StoreKey), entry.Key) + batch.PutCFWithTS(s.cfHandle, key, ts[:], entry.Value) + + counter++ + if counter%ImportCommitBatchSize == 0 { + if err := s.db.Write(defaultWriteOpts, batch); err != nil { + return err + } + batch.Clear() + } + } + + if batch.Count() > 0 { + if err := s.db.Write(defaultWriteOpts, batch); err != nil { + return err + } + } + + return s.SetLatestVersion(version) +} + func newTSReadOptions(version *int64) *grocksdb.ReadOptions { var ver uint64 if version == nil { diff --git a/versiondb/types.go b/versiondb/types.go index fab4a6a21d..46bbe88e9e 100644 --- a/versiondb/types.go +++ b/versiondb/types.go @@ -18,4 +18,13 @@ type VersionStore interface { // the `changeSet` should be ordered by (storeKey, key), // the version should be latest version plus one. PutAtVersion(version int64, changeSet []types.StoreKVPair) error + + // Import the initial state of the store + Import(version int64, ch <-chan ImportEntry) error +} + +type ImportEntry struct { + StoreKey string + Key []byte + Value []byte } From f2f647bc7dcf3fda8888d7c1fa248426b338cc07 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Tue, 25 Jul 2023 12:09:52 +0800 Subject: [PATCH 2/4] integration test --- integration_tests/cosmoscli.py | 11 ++- integration_tests/test_basic.py | 129 ++++++++++++++++++-------------- 2 files changed, 82 insertions(+), 58 deletions(-) diff --git a/integration_tests/cosmoscli.py b/integration_tests/cosmoscli.py index b6ab39fdda..cc726a9128 100644 --- a/integration_tests/cosmoscli.py +++ b/integration_tests/cosmoscli.py @@ -219,7 +219,7 @@ def distribution_commission(self, addr): )["commission"][0] return float(coin["amount"]) - def distribution_community(self): + def distribution_community(self, **kwargs): coin = json.loads( self.raw( "query", @@ -227,11 +227,12 @@ def distribution_community(self): "community-pool", output="json", node=self.node_rpc, + **kwargs, ) )["pool"][0] return float(coin["amount"]) - def distribution_reward(self, delegator_addr): + def distribution_reward(self, delegator_addr, **kwargs): coin = json.loads( self.raw( "query", @@ -240,6 +241,7 @@ def distribution_reward(self, delegator_addr): delegator_addr, output="json", node=self.node_rpc, + **kwargs, ) )["total"][0] return float(coin["amount"]) @@ -1617,6 +1619,11 @@ def changeset_ingest_versiondb_sst(self, versiondb_dir, sst_dir, **kwargs): **kwargs, ).decode() + def restore_versiondb(self, height, format=2): + return self.raw( + "changeset", "restore-versiondb", height, format, home=self.data_dir + ) + def dump_snapshot(self, height, tarball, format=2): return self.raw( "snapshots", "dump", height, format, home=self.data_dir, output=tarball diff --git a/integration_tests/test_basic.py b/integration_tests/test_basic.py index b2b0229f31..c83169c9be 100644 --- a/integration_tests/test_basic.py +++ b/integration_tests/test_basic.py @@ -249,13 +249,14 @@ def test_statesync(cronos): clustercli.supervisor.stopProcess(f"{clustercli.chain_id}-node{i}") -def test_local_statesync(cronos): +def test_local_statesync(cronos, tmp_path_factory): """ - - init a new node + - init a new node, enable versiondb - dump snapshot on node0 - load snapshot to the new node - restore the new node state from the snapshot - bootstrap cometbft state + - restore the versiondb from the snapshot - startup the node, should sync - cleanup """ @@ -277,63 +278,79 @@ def test_local_statesync(cronos): cronos.supervisorctl("start", "cronos_777-1-node0") wait_for_port(ports.evmrpc_port(cronos.base_port(0))) - with tempfile.TemporaryDirectory() as home: - print("home", home) - - i = len(cronos.config["validators"]) - base_port = 26650 + i * 10 - node_rpc = "tcp://127.0.0.1:%d" % ports.rpc_port(base_port) - cli = CosmosCLI.init( - "local_statesync", - Path(home), - node_rpc, - cronos.chain_binary, - "cronos_777-1", - ) + home = tmp_path_factory.mktemp("local_statesync") + print("home", home) + + i = len(cronos.config["validators"]) + base_port = 26650 + i * 10 + node_rpc = "tcp://127.0.0.1:%d" % ports.rpc_port(base_port) + cli = CosmosCLI.init( + "local_statesync", + Path(home), + node_rpc, + cronos.chain_binary, + "cronos_777-1", + ) - # init the configs - peers = ",".join( - [ - "tcp://%s@%s:%d" - % ( - cronos.cosmos_cli(i).node_id(), - val["hostname"], - ports.p2p_port(val["base_port"]), - ) - for i, val in enumerate(cronos.config["validators"]) - ] - ) - rpc_servers = ",".join(cronos.node_rpc(i) for i in range(2)) - trust_height = int(sync_info["latest_block_height"]) - trust_hash = sync_info["latest_block_hash"] - - cluster.edit_tm_cfg( - Path(home) / "config/config.toml", - base_port, - peers, - { - "statesync": { - "rpc_servers": rpc_servers, - "trust_height": trust_height, - "trust_hash": trust_hash, - }, + # init the configs + peers = ",".join( + [ + "tcp://%s@%s:%d" + % ( + cronos.cosmos_cli(i).node_id(), + val["hostname"], + ports.p2p_port(val["base_port"]), + ) + for i, val in enumerate(cronos.config["validators"]) + ] + ) + rpc_servers = ",".join(cronos.node_rpc(i) for i in range(2)) + trust_height = int(sync_info["latest_block_height"]) + trust_hash = sync_info["latest_block_hash"] + + cluster.edit_tm_cfg( + Path(home) / "config/config.toml", + base_port, + peers, + { + "statesync": { + "rpc_servers": rpc_servers, + "trust_height": trust_height, + "trust_hash": trust_hash, }, - ) + }, + ) + cluster.edit_app_cfg( + Path(home) / "config/app.toml", + base_port, + { + "store": { + "streamers": ["versiondb"], + }, + }, + ) - # restore the states - cli.load_snapshot(tarball) - print(cli.list_snapshot()) - cli.restore_snapshot(height) - cli.bootstrap_state() - - with subprocess.Popen( - [cronos.chain_binary, "start", "--home", home], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - ): - wait_for_port(ports.rpc_port(base_port)) - # check the node sync normally - wait_for_new_blocks(cli, 2) + # restore the states + cli.load_snapshot(tarball) + print(cli.list_snapshot()) + cli.restore_snapshot(height) + cli.bootstrap_state() + cli.restore_versiondb(height) + + with subprocess.Popen( + [cronos.chain_binary, "start", "--home", home], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ): + wait_for_port(ports.rpc_port(base_port)) + # check the node sync normally + wait_for_new_blocks(cli, 2) + # check grpc works + print("distribution", cli.distribution_community(height=height)) + with pytest.raises(Exception) as exc_info: + cli.distribution_community(height=height - 1) + + assert "Stored fee pool should not have been nil" in exc_info.value.args[0] def test_transaction(cronos): From c254d6bebf25939fc75e54d8c718df6b70ea4e43 Mon Sep 17 00:00:00 2001 From: yihuang Date: Tue, 25 Jul 2023 12:11:11 +0800 Subject: [PATCH 3/4] Update CHANGELOG.md Signed-off-by: yihuang --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25783233b9..e61408a3f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,7 +40,7 @@ - [#1083](https://github.com/crypto-org-chain/cronos/pull/1083) memiavl support both sdk 46 and 47 root hash rules. - [#1091](https://github.com/crypto-org-chain/cronos/pull/1091) memiavl support rollback. - [#1100](https://github.com/crypto-org-chain/cronos/pull/1100) memiavl support read-only mode, and grab exclusive lock for write mode. -- [#]() versiondb support restore from local snapshot. +- [#1108](https://github.com/crypto-org-chain/cronos/pull/1108) versiondb support restore from local snapshot. ### Improvements From 3415978c4973781c4ae007b32331e9cafd20ef15 Mon Sep 17 00:00:00 2001 From: mmsqe Date: Tue, 25 Jul 2023 13:39:49 +0800 Subject: [PATCH 4/4] fix lint --- integration_tests/test_basic.py | 1 - 1 file changed, 1 deletion(-) diff --git a/integration_tests/test_basic.py b/integration_tests/test_basic.py index 2199046ded..f2778b4d8f 100644 --- a/integration_tests/test_basic.py +++ b/integration_tests/test_basic.py @@ -1,6 +1,5 @@ import json import subprocess -import tempfile import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path