Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: versiondb don't support restore from local snapshot #1108

Merged
merged 8 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
- [#1083](https://github.com/crypto-org-chain/cronos/pull/1083) memiavl support both sdk 46 and 47 root hash rules.
- [#1091](https://github.com/crypto-org-chain/cronos/pull/1091) memiavl support rollback.
- [#1100](https://github.com/crypto-org-chain/cronos/pull/1100) memiavl support read-only mode, and grab exclusive lock for write mode.
- [#1108](https://github.com/crypto-org-chain/cronos/pull/1108) versiondb support restore from local snapshot.

### Improvements

Expand Down
11 changes: 9 additions & 2 deletions integration_tests/cosmoscli.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,19 +219,20 @@ def distribution_commission(self, addr):
)["commission"][0]
return float(coin["amount"])

def distribution_community(self):
def distribution_community(self, **kwargs):
coin = json.loads(
self.raw(
"query",
"distribution",
"community-pool",
output="json",
node=self.node_rpc,
**kwargs,
)
)["pool"][0]
return float(coin["amount"])

def distribution_reward(self, delegator_addr):
def distribution_reward(self, delegator_addr, **kwargs):
coin = json.loads(
self.raw(
"query",
Expand All @@ -240,6 +241,7 @@ def distribution_reward(self, delegator_addr):
delegator_addr,
output="json",
node=self.node_rpc,
**kwargs,
)
)["total"][0]
return float(coin["amount"])
Expand Down Expand Up @@ -1617,6 +1619,11 @@ def changeset_ingest_versiondb_sst(self, versiondb_dir, sst_dir, **kwargs):
**kwargs,
).decode()

def restore_versiondb(self, height, format=2):
return self.raw(
"changeset", "restore-versiondb", height, format, home=self.data_dir
)

def dump_snapshot(self, height, tarball, format=2):
return self.raw(
"snapshots", "dump", height, format, home=self.data_dir, output=tarball
Expand Down
129 changes: 73 additions & 56 deletions integration_tests/test_basic.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import subprocess
import tempfile

Check failure on line 3 in integration_tests/test_basic.py

View workflow job for this annotation

GitHub Actions / Lint python

./integration_tests/test_basic.py:3:1: F401 'tempfile' imported but unused
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
Expand Down Expand Up @@ -249,13 +249,14 @@
clustercli.supervisor.stopProcess(f"{clustercli.chain_id}-node{i}")


def test_local_statesync(cronos):
def test_local_statesync(cronos, tmp_path_factory):
"""
- init a new node
- init a new node, enable versiondb
- dump snapshot on node0
- load snapshot to the new node
- restore the new node state from the snapshot
- bootstrap cometbft state
- restore the versiondb from the snapshot
- startup the node, should sync
- cleanup
"""
Expand All @@ -277,63 +278,79 @@
cronos.supervisorctl("start", "cronos_777-1-node0")
wait_for_port(ports.evmrpc_port(cronos.base_port(0)))

with tempfile.TemporaryDirectory() as home:
print("home", home)

i = len(cronos.config["validators"])
base_port = 26650 + i * 10
node_rpc = "tcp://127.0.0.1:%d" % ports.rpc_port(base_port)
cli = CosmosCLI.init(
"local_statesync",
Path(home),
node_rpc,
cronos.chain_binary,
"cronos_777-1",
)
home = tmp_path_factory.mktemp("local_statesync")
print("home", home)

i = len(cronos.config["validators"])
base_port = 26650 + i * 10
node_rpc = "tcp://127.0.0.1:%d" % ports.rpc_port(base_port)
cli = CosmosCLI.init(
"local_statesync",
Path(home),
node_rpc,
cronos.chain_binary,
"cronos_777-1",
)

# init the configs
peers = ",".join(
[
"tcp://%s@%s:%d"
% (
cronos.cosmos_cli(i).node_id(),
val["hostname"],
ports.p2p_port(val["base_port"]),
)
for i, val in enumerate(cronos.config["validators"])
]
)
rpc_servers = ",".join(cronos.node_rpc(i) for i in range(2))
trust_height = int(sync_info["latest_block_height"])
trust_hash = sync_info["latest_block_hash"]

cluster.edit_tm_cfg(
Path(home) / "config/config.toml",
base_port,
peers,
{
"statesync": {
"rpc_servers": rpc_servers,
"trust_height": trust_height,
"trust_hash": trust_hash,
},
# init the configs
peers = ",".join(
[
"tcp://%s@%s:%d"
% (
cronos.cosmos_cli(i).node_id(),
val["hostname"],
ports.p2p_port(val["base_port"]),
)
for i, val in enumerate(cronos.config["validators"])
]
)
rpc_servers = ",".join(cronos.node_rpc(i) for i in range(2))
trust_height = int(sync_info["latest_block_height"])
trust_hash = sync_info["latest_block_hash"]

cluster.edit_tm_cfg(
Path(home) / "config/config.toml",
base_port,
peers,
{
"statesync": {
"rpc_servers": rpc_servers,
"trust_height": trust_height,
"trust_hash": trust_hash,
},
)
},
)
cluster.edit_app_cfg(
Path(home) / "config/app.toml",
base_port,
{
"store": {
"streamers": ["versiondb"],
},
},
)

# restore the states
cli.load_snapshot(tarball)
print(cli.list_snapshot())
cli.restore_snapshot(height)
cli.bootstrap_state()

with subprocess.Popen(
[cronos.chain_binary, "start", "--home", home],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
):
wait_for_port(ports.rpc_port(base_port))
# check the node sync normally
wait_for_new_blocks(cli, 2)
# restore the states
cli.load_snapshot(tarball)
print(cli.list_snapshot())
cli.restore_snapshot(height)
cli.bootstrap_state()
cli.restore_versiondb(height)

with subprocess.Popen(
[cronos.chain_binary, "start", "--home", home],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
):
wait_for_port(ports.rpc_port(base_port))
# check the node sync normally
wait_for_new_blocks(cli, 2)
# check grpc works
print("distribution", cli.distribution_community(height=height))
with pytest.raises(Exception) as exc_info:
cli.distribution_community(height=height - 1)

assert "Stored fee pool should not have been nil" in exc_info.value.args[0]


def test_transaction(cronos):
Expand Down
1 change: 1 addition & 0 deletions versiondb/client/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
IngestVersionDBSSTCmd(),
ChangeSetToVersionDBCmd(),
RestoreAppDBCmd(opts),
RestoreVersionDBCmd(),

Check warning on line 30 in versiondb/client/cmd.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/cmd.go#L30

Added line #L30 was not covered by tests
)
return cmd
}
121 changes: 121 additions & 0 deletions versiondb/client/restore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package client

import (
"fmt"
"io"
"math"
"path/filepath"
"strconv"

"cosmossdk.io/errors"
protoio "github.com/gogo/protobuf/io"
"github.com/spf13/cobra"

"github.com/cosmos/cosmos-sdk/server"
"github.com/cosmos/cosmos-sdk/snapshots"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"

"github.com/crypto-org-chain/cronos/versiondb"
"github.com/crypto-org-chain/cronos/versiondb/tsrocksdb"
)

// RestoreVersionDBCmd returns a command to restore a versiondb from local snapshot
func RestoreVersionDBCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "restore-versiondb <height> <format>",
Short: "Restore initial versiondb from local snapshot",
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := server.GetServerContextFromCmd(cmd)

height, err := strconv.ParseUint(args[0], 10, 63)
mmsqe marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
format, err := strconv.ParseUint(args[1], 10, 32)
if err != nil {
return err
}

Check warning on line 38 in versiondb/client/restore.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/restore.go#L23-L38

Added lines #L23 - L38 were not covered by tests

store, err := server.GetSnapshotStore(ctx.Viper)
if err != nil {
return err
}

Check warning on line 43 in versiondb/client/restore.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/restore.go#L40-L43

Added lines #L40 - L43 were not covered by tests

snapshot, chChunks, err := store.Load(height, uint32(format))

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion Error

Potential integer overflow by integer type conversion
if err != nil {
return err
}

Check warning on line 48 in versiondb/client/restore.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/restore.go#L45-L48

Added lines #L45 - L48 were not covered by tests

if snapshot == nil {
return fmt.Errorf("snapshot doesn't exist, height: %d, format: %d", height, format)
}

Check warning on line 52 in versiondb/client/restore.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/restore.go#L50-L52

Added lines #L50 - L52 were not covered by tests

streamReader, err := snapshots.NewStreamReader(chChunks)
if err != nil {
return err
}
defer streamReader.Close()

home := ctx.Config.RootDir
versionDB, err := tsrocksdb.NewStore(filepath.Join(home, "data", "versiondb"))
if err != nil {
return err
}

Check warning on line 64 in versiondb/client/restore.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/restore.go#L54-L64

Added lines #L54 - L64 were not covered by tests

ch := make(chan versiondb.ImportEntry, 128)

go func() {
defer close(ch)

if err := readSnapshotEntries(streamReader, ch); err != nil {
ctx.Logger.Error("failed to read snapshot entries", "err", err)
}

Check warning on line 73 in versiondb/client/restore.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/restore.go#L66-L73

Added lines #L66 - L73 were not covered by tests
}()

return versionDB.Import(int64(height), ch)

Check warning on line 76 in versiondb/client/restore.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/restore.go#L76

Added line #L76 was not covered by tests

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion Error

Potential integer overflow by integer type conversion
},
}
return cmd

Check warning on line 79 in versiondb/client/restore.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/restore.go#L79

Added line #L79 was not covered by tests
}

// readSnapshotEntries reads key-value entries from protobuf reader and feed to the channel
func readSnapshotEntries(protoReader protoio.Reader, ch chan<- versiondb.ImportEntry) error {
var (
snapshotItem snapshottypes.SnapshotItem
storeKey string
)

loop:
for {
snapshotItem = snapshottypes.SnapshotItem{}
err := protoReader.ReadMsg(&snapshotItem)
if err == io.EOF {
break
} else if err != nil {
return errors.Wrap(err, "invalid protobuf message")
}

Check warning on line 97 in versiondb/client/restore.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/restore.go#L83-L97

Added lines #L83 - L97 were not covered by tests

switch item := snapshotItem.Item.(type) {
case *snapshottypes.SnapshotItem_Store:
storeKey = item.Store.Name
case *snapshottypes.SnapshotItem_IAVL:
if storeKey == "" {
return errors.Wrap(err, "invalid protobuf message, store name is empty")
}
if item.IAVL.Height > math.MaxInt8 {
return fmt.Errorf("node height %v cannot exceed %v",
item.IAVL.Height, math.MaxInt8)
}
ch <- versiondb.ImportEntry{
StoreKey: storeKey,
Key: item.IAVL.Key,
Value: item.IAVL.Value,
}
default:
break loop

Check warning on line 116 in versiondb/client/restore.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/restore.go#L99-L116

Added lines #L99 - L116 were not covered by tests
}
}

return nil

Check warning on line 120 in versiondb/client/restore.go

View check run for this annotation

Codecov / codecov/patch

versiondb/client/restore.go#L120

Added line #L120 was not covered by tests
}
33 changes: 33 additions & 0 deletions versiondb/tsrocksdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

StorePrefixTpl = "s/k:%s/"
latestVersionKey = "s/latest"

ImportCommitBatchSize = 10000
)

var (
Expand Down Expand Up @@ -167,6 +169,37 @@
return s.db.Write(defaultWriteOpts, batch)
}

// Import loads the initial version of the state
func (s Store) Import(version int64, ch <-chan versiondb.ImportEntry) error {
batch := grocksdb.NewWriteBatch()
defer batch.Destroy()

var ts [TimestampSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion Error

Potential integer overflow by integer type conversion

var counter int
for entry := range ch {
key := cloneAppend(storePrefix(entry.StoreKey), entry.Key)
batch.PutCFWithTS(s.cfHandle, key, ts[:], entry.Value)

counter++
if counter%ImportCommitBatchSize == 0 {
if err := s.db.Write(defaultWriteOpts, batch); err != nil {
return err
}
batch.Clear()

Check warning on line 190 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L173-L190

Added lines #L173 - L190 were not covered by tests
}
}

if batch.Count() > 0 {
if err := s.db.Write(defaultWriteOpts, batch); err != nil {
return err
}

Check warning on line 197 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L194-L197

Added lines #L194 - L197 were not covered by tests
}

return s.SetLatestVersion(version)

Check warning on line 200 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L200

Added line #L200 was not covered by tests
}

func newTSReadOptions(version *int64) *grocksdb.ReadOptions {
var ver uint64
if version == nil {
Expand Down
9 changes: 9 additions & 0 deletions versiondb/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,13 @@ type VersionStore interface {
// the `changeSet` should be ordered by (storeKey, key),
// the version should be latest version plus one.
PutAtVersion(version int64, changeSet []types.StoreKVPair) error

// Import the initial state of the store
Import(version int64, ch <-chan ImportEntry) error
}

type ImportEntry struct {
StoreKey string
Key []byte
Value []byte
}
Loading