Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add state sync support #823

Merged
merged 22 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3080845
Add WasmSnapshotter
assafmo Apr 25, 2022
870bba8
Make snapshot from keeper
ethanfrey Apr 27, 2022
abfadce
Rough draft of the restore method using multistore
ethanfrey Apr 27, 2022
f789aed
properly register the snapshot manager
ethanfrey Apr 27, 2022
9952475
Move snapshotter to keeper and implement
ethanfrey Apr 27, 2022
1b3c133
Add compression to snapshot wasm bytecode
ethanfrey Apr 27, 2022
4cccddb
Put GzipIt and Uncompress logic in a common folder
ethanfrey Apr 27, 2022
f68ef90
Fix linting errors
ethanfrey Apr 27, 2022
abad37d
Avoid sending duplicate wasm codes over the wire
ethanfrey Apr 27, 2022
d2b0fb5
Properly handle when a message not for this extension arrives
ethanfrey Apr 27, 2022
a5dd494
Start testing snapshots
ethanfrey Apr 27, 2022
fea9d39
Basic tend to end test for no errors
ethanfrey Apr 27, 2022
9705a8d
Address @giansalex PR comments
ethanfrey Apr 27, 2022
d8426fb
Updates inspired by @giansalex branch
ethanfrey Apr 28, 2022
7663d6f
Cannot commit in my test cases to load from proper height
ethanfrey Apr 28, 2022
a154a9a
SRemove loading history until I can fix tests here
ethanfrey Apr 28, 2022
cdcb18d
Cleanup from Simon's comments
ethanfrey Apr 28, 2022
6b4accb
Add snapshotter integration tests and minor cleanup
alpe May 3, 2022
297a646
Minor test update
alpe May 3, 2022
a5695ee
Fix linter warnings
alpe May 3, 2022
cb2e8f2
Merge pull request #834 from CosmWasm/add-state-sync_integration
ethanfrey May 3, 2022
1253c8e
Merge branch 'main' into add-state-sync
ethanfrey May 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ import (
wasmappparams "github.com/CosmWasm/wasmd/app/params"
"github.com/CosmWasm/wasmd/x/wasm"
wasmclient "github.com/CosmWasm/wasmd/x/wasm/client"
wasmkeeper "github.com/CosmWasm/wasmd/x/wasm/keeper"

// unnamed import of statik for swagger UI support
_ "github.com/cosmos/cosmos-sdk/client/docs/statik"
Expand Down Expand Up @@ -667,6 +668,22 @@ func NewWasmApp(
app.SetBeginBlocker(app.BeginBlocker)
app.SetEndBlocker(app.EndBlocker)

// must be before Loading version
// requires the snapshot store to be created and registered as a BaseAppOption
// see cmd/wasmd/root.go: 206 - 214 approx
if manager := app.SnapshotManager(); manager != nil {
err := manager.RegisterExtensions(
wasmkeeper.NewWasmSnapshotter(app.CommitMultiStore(), &app.wasmKeeper),
)
if err != nil {
panic(fmt.Errorf("failed to register snapshot extension: %s", err))
}
}

app.scopedIBCKeeper = scopedIBCKeeper
app.scopedTransferKeeper = scopedTransferKeeper
app.scopedWasmKeeper = scopedWasmKeeper

if loadLatest {
if err := app.LoadLatestVersion(); err != nil {
tmos.Exit(fmt.Sprintf("failed to load latest version: %s", err))
Expand All @@ -679,9 +696,6 @@ func NewWasmApp(
}
}

app.scopedIBCKeeper = scopedIBCKeeper
app.scopedTransferKeeper = scopedTransferKeeper
app.scopedWasmKeeper = scopedWasmKeeper
return app
}

Expand Down
8 changes: 4 additions & 4 deletions x/wasm/client/cli/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"

wasmUtils "github.com/CosmWasm/wasmd/x/wasm/client/utils"
"github.com/CosmWasm/wasmd/x/wasm/ioutils"
"github.com/CosmWasm/wasmd/x/wasm/types"
)

Expand Down Expand Up @@ -85,13 +85,13 @@ func parseStoreCodeArgs(file string, sender sdk.AccAddress, flags *flag.FlagSet)
}

// gzip the wasm file
if wasmUtils.IsWasm(wasm) {
wasm, err = wasmUtils.GzipIt(wasm)
if ioutils.IsWasm(wasm) {
wasm, err = ioutils.GzipIt(wasm)

if err != nil {
return types.MsgStoreCode{}, err
}
} else if !wasmUtils.IsGzip(wasm) {
} else if !ioutils.IsGzip(wasm) {
return types.MsgStoreCode{}, fmt.Errorf("invalid input file. Use wasm binary or gzip")
}

Expand Down
8 changes: 4 additions & 4 deletions x/wasm/client/rest/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/cosmos/cosmos-sdk/types/rest"
"github.com/gorilla/mux"

wasmUtils "github.com/CosmWasm/wasmd/x/wasm/client/utils"
"github.com/CosmWasm/wasmd/x/wasm/ioutils"
"github.com/CosmWasm/wasmd/x/wasm/types"
)

Expand Down Expand Up @@ -55,13 +55,13 @@ func storeCodeHandlerFn(cliCtx client.Context) http.HandlerFunc {
wasm := req.WasmBytes

// gzip the wasm file
if wasmUtils.IsWasm(wasm) {
wasm, err = wasmUtils.GzipIt(wasm)
if ioutils.IsWasm(wasm) {
wasm, err = ioutils.GzipIt(wasm)
if err != nil {
rest.WriteErrorResponse(w, http.StatusBadRequest, err.Error())
return
}
} else if !wasmUtils.IsGzip(wasm) {
} else if !ioutils.IsGzip(wasm) {
rest.WriteErrorResponse(w, http.StatusBadRequest, "Invalid input file, use wasm binary or zip")
return
}
Expand Down
11 changes: 3 additions & 8 deletions x/wasm/keeper/ioutil.go → x/wasm/ioutils/ioutil.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package keeper
package ioutils

import (
"bytes"
Expand All @@ -9,13 +9,8 @@ import (
"github.com/CosmWasm/wasmd/x/wasm/types"
)

// magic bytes to identify gzip.
// See https://www.ietf.org/rfc/rfc1952.txt
// and https://github.com/golang/go/blob/master/src/net/http/sniff.go#L186
var gzipIdent = []byte("\x1F\x8B\x08")

// uncompress returns gzip uncompressed content or given src when not gzip.
func uncompress(src []byte, limit uint64) ([]byte, error) {
// Uncompress returns gzip uncompressed content if input was gzip, or original src otherwise
func Uncompress(src []byte, limit uint64) ([]byte, error) {
switch n := uint64(len(src)); {
case n < 3:
return src, nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package keeper
package ioutils

import (
"bytes"
Expand All @@ -16,10 +16,10 @@ import (
)

func TestUncompress(t *testing.T) {
wasmRaw, err := ioutil.ReadFile("./testdata/hackatom.wasm")
wasmRaw, err := ioutil.ReadFile("../keeper/testdata/hackatom.wasm")
require.NoError(t, err)

wasmGzipped, err := ioutil.ReadFile("./testdata/hackatom.wasm.gzip")
wasmGzipped, err := ioutil.ReadFile("../keeper/testdata/hackatom.wasm.gzip")
require.NoError(t, err)

const maxSize = 400_000
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestUncompress(t *testing.T) {
}
for msg, spec := range specs {
t.Run(msg, func(t *testing.T) {
r, err := uncompress(spec.src, maxSize)
r, err := Uncompress(spec.src, maxSize)
require.True(t, errors.Is(spec.expError, err), "exp %v got %+v", spec.expError, err)
if spec.expError != nil {
return
Expand Down
6 changes: 5 additions & 1 deletion x/wasm/client/utils/utils.go → x/wasm/ioutils/utils.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package utils
package ioutils

import (
"bytes"
"compress/gzip"
)

var (
// magic bytes to identify gzip.
// See https://www.ietf.org/rfc/rfc1952.txt
// and https://github.com/golang/go/blob/master/src/net/http/sniff.go#L186
gzipIdent = []byte("\x1F\x8B\x08")
ethanfrey marked this conversation as resolved.
Show resolved Hide resolved

wasmIdent = []byte("\x00\x61\x73\x6D")
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package ioutils

import (
"io/ioutil"
Expand All @@ -8,7 +8,7 @@ import (
)

func GetTestData() ([]byte, []byte, []byte, error) {
wasmCode, err := ioutil.ReadFile("../../keeper/testdata/hackatom.wasm")
wasmCode, err := ioutil.ReadFile("../keeper/testdata/hackatom.wasm")

if err != nil {
return nil, nil, nil, err
Expand Down
5 changes: 3 additions & 2 deletions x/wasm/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
paramtypes "github.com/cosmos/cosmos-sdk/x/params/types"
"github.com/tendermint/tendermint/libs/log"

"github.com/CosmWasm/wasmd/x/wasm/ioutils"
"github.com/CosmWasm/wasmd/x/wasm/types"
)

Expand Down Expand Up @@ -164,7 +165,7 @@ func (k Keeper) create(ctx sdk.Context, creator sdk.AccAddress, wasmCode []byte,
if !authZ.CanCreateCode(k.getUploadAccessConfig(ctx), creator) {
return 0, sdkerrors.Wrap(sdkerrors.ErrUnauthorized, "can not create code")
}
wasmCode, err = uncompress(wasmCode, k.GetMaxWasmCodeSize(ctx))
wasmCode, err = ioutils.Uncompress(wasmCode, k.GetMaxWasmCodeSize(ctx))
if err != nil {
return 0, sdkerrors.Wrap(types.ErrCreateFailed, err.Error())
}
Expand Down Expand Up @@ -206,7 +207,7 @@ func (k Keeper) storeCodeInfo(ctx sdk.Context, codeID uint64, codeInfo types.Cod
}

func (k Keeper) importCode(ctx sdk.Context, codeID uint64, codeInfo types.CodeInfo, wasmCode []byte) error {
wasmCode, err := uncompress(wasmCode, k.GetMaxWasmCodeSize(ctx))
wasmCode, err := ioutils.Uncompress(wasmCode, k.GetMaxWasmCodeSize(ctx))
if err != nil {
return sdkerrors.Wrap(types.ErrCreateFailed, err.Error())
}
Expand Down
190 changes: 190 additions & 0 deletions x/wasm/keeper/snapshotter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package keeper

import (
"encoding/hex"
"io"

snapshot "github.com/cosmos/cosmos-sdk/snapshots/types"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
protoio "github.com/gogo/protobuf/io"
"github.com/tendermint/tendermint/libs/log"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"

"github.com/CosmWasm/wasmd/x/wasm/ioutils"
"github.com/CosmWasm/wasmd/x/wasm/types"
)

/*
API to implement:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lot of in line doc. Personally, I don't think it adds more value than a code reference, like this:

var (
_ snapshot.Snapshotter = &WasmSnapshotter{}
 _ snapshot.ExtensionSnapshotter = &WasmSnapshotter{}
 )

Doc needs to be maintained, while the code reference gives you the latest with 1 click.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExtensionSnapshotter inherits from Snapshotter so that would be enough as a ref


// Snapshotter is something that can create and restore snapshots, consisting of streamed binary
// chunks - all of which must be read from the channel and closed. If an unsupported format is
// given, it must return ErrUnknownFormat (possibly wrapped with fmt.Errorf).
type Snapshotter interface {
// Snapshot writes snapshot items into the protobuf writer.
Snapshot(height uint64, protoWriter protoio.Writer) error

// Restore restores a state snapshot from the protobuf items read from the reader.
// If the ready channel is non-nil, it returns a ready signal (by being closed) once the
// restorer is ready to accept chunks.
Restore(height uint64, format uint32, protoReader protoio.Reader) (SnapshotItem, error)
}

// ExtensionSnapshotter is an extension Snapshotter that is appended to the snapshot stream.
// ExtensionSnapshotter has an unique name and manages it's own internal formats.
type ExtensionSnapshotter interface {
Snapshotter

// SnapshotName returns the name of snapshotter, it should be unique in the manager.
SnapshotName() string

// SnapshotFormat returns the default format the extension snapshotter use to encode the
// payloads when taking a snapshot.
// It's defined within the extension, different from the global format for the whole state-sync snapshot.
SnapshotFormat() uint32

// SupportedFormats returns a list of formats it can restore from.
SupportedFormats() []uint32
}
*/

// Format 1 is just gzipped wasm byte code for each item payload. No protobuf envelope, no metadata.
const SnapshotFormat = 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 good description. To render nice Godoc, it should start with the element name, In this case SnapshotFormat


type WasmSnapshotter struct {
wasm *Keeper
cms sdk.MultiStore
}

func NewWasmSnapshotter(cms sdk.MultiStore, wasm *Keeper) *WasmSnapshotter {
return &WasmSnapshotter{
wasm: wasm,
cms: cms,
}
}

func (ws *WasmSnapshotter) SnapshotName() string {
return types.ModuleName
}

func (ws *WasmSnapshotter) SnapshotFormat() uint32 {
return SnapshotFormat
}

func (ws *WasmSnapshotter) SupportedFormats() []uint32 {
// If we support older formats, add them here and handle them in Restore
return []uint32{SnapshotFormat}
}

func (ws *WasmSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
// TODO: This seems more correct (historical info), but kills my tests
// Since codeIDs and wasm are immutible, it is never wrong to return new wasm data than the
// user requests
// ------
// cacheMS, err := ws.cms.CacheMultiStoreWithVersion(int64(height))
Copy link

@yihuang yihuang Apr 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't fix the storage at the height, would different nodes generate different snapshots, because their current block height maybe different?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that might make result in 2 nodes creating a different snapshot for the same height, which will make a client reject the snapshot.

// if err != nil {
// return err
// }
cacheMS := ws.cms.CacheMultiStore()

ctx := sdk.NewContext(cacheMS, tmproto.Header{}, false, log.NewNopLogger())
uniqueHashes := make(map[string]bool)
ethanfrey marked this conversation as resolved.
Show resolved Hide resolved
var rerr error

ws.wasm.IterateCodeInfos(ctx, func(id uint64, info types.CodeInfo) bool {
// Many code ids may point to the same code hash... only sync it once
hexHash := hex.EncodeToString(info.CodeHash)
// if uniqueHashes, just skip this one and move to the next
if uniqueHashes[hexHash] {
return false
}
uniqueHashes[hexHash] = true

// load code and abort on error
wasmBytes, err := ws.wasm.GetByteCode(ctx, id)
if err != nil {
rerr = err
return true
}

compressedWasm, err := ioutils.GzipIt(wasmBytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just found:

The current version 1 snapshot format is a zlib-compressed, length-prefixed Protobuf stream of cosmos.base.store.v1beta1.SnapshotItem messages, split into chunks at exact 10 MB byte boundaries.

https://github.com/cosmos/cosmos-sdk/tree/main/snapshots#snapshot-format

if err != nil {
rerr = err
return true
}

err = snapshot.WriteExtensionItem(protoWriter, compressedWasm)
if err != nil {
rerr = err
return true
}

return false
})

return rerr
}

func (ws *WasmSnapshotter) Restore(
height uint64, format uint32, protoReader protoio.Reader,
) (snapshot.SnapshotItem, error) {
if format == 1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if format == 1 {
if format == SnapshotFormat {

return ws.processAllItems(height, protoReader, restoreV1, finalizeV1)
}
return snapshot.SnapshotItem{}, snapshot.ErrUnknownFormat
}

func restoreV1(ctx sdk.Context, k *Keeper, compressedCode []byte) error {
wasmCode, err := ioutils.Uncompress(compressedCode, k.GetMaxWasmCodeSize(ctx))
if err != nil {
return sdkerrors.Wrap(types.ErrCreateFailed, err.Error())
}

// FIXME: check which codeIDs the checksum matches??
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which code ID is not important. But ensuring the checksum exists in the codes list at all makes sense. Otherwise a malicious node could send all kind of unrelated Wasm that gets stored and compiled, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, they can send us anything.
And at the point we have the checksum to compare it has been stored and compiled.

I guess this check would only allow one useless wasm.

I am more interested in asserting that all needed wasm has been synced. The biggest problem is a sync where some wasm file is missing and the node crashes later when that contract is called.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this check would only allow one useless wasm.

What prevents the other node from sending us thousands of Wasm blobs that we decompress, store and compile but that are not part of the blockchain at all?

I am more interested in asserting that all needed wasm has been synced. The biggest problem is a sync where some wasm file is missing and the node crashes later when that contract is called.

Right, the codes missing issue is mentioned in the finalize method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave this for a follow up PR, nice to have check. I would love to get a full-node integration test on current state

_, err = k.wasmVM.Create(wasmCode)
if err != nil {
return sdkerrors.Wrap(types.ErrCreateFailed, err.Error())
}
return nil
}

func finalizeV1(ctx sdk.Context, k *Keeper) error {
// FIXME: ensure all codes have been uploaded?
return k.InitializePinnedCodes(ctx)
}

func (ws *WasmSnapshotter) processAllItems(
height uint64,
protoReader protoio.Reader,
cb func(sdk.Context, *Keeper, []byte) error,
finalize func(sdk.Context, *Keeper) error,
) (snapshot.SnapshotItem, error) {
ctx := sdk.NewContext(ws.cms, tmproto.Header{}, false, log.NewNopLogger())

// keep the last item here... if we break, it will either be empty (if we hit io.EOF)
// or contain the last item (if we hit payload == nil)
var item snapshot.SnapshotItem
for {
item = snapshot.SnapshotItem{}
err := protoReader.ReadMsg(&item)
if err == io.EOF {
break
} else if err != nil {
return snapshot.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message")
}

// if it is not another ExtensionPayload message, then it is not for us.
// we should return it an let the manager handle this one
payload := item.GetExtensionPayload()
if payload == nil {
break
}

if err := cb(ctx, ws.wasm, payload.Payload); err != nil {
return snapshot.SnapshotItem{}, sdkerrors.Wrap(err, "processing snapshot item")
}
}

return item, finalize(ctx, ws.wasm)
}
Loading