Skip to content

Commit

Permalink
Add hooks to allow app modules to add things to state-sync
Browse files Browse the repository at this point in the history
Closes: cosmos#7340

- Support registering multiple snapshotters in snapshot manager.
- Append the extension snapshotters to existing snapshot stream.

fix v2 store

fix rootmulti unit tests

fix unit tests

fix unit tests

fix typo

add utility function WriteExtensionItem

add convenient public methods
  • Loading branch information
yihuang committed Feb 16, 2022
1 parent 7869d38 commit 67087bb
Show file tree
Hide file tree
Showing 20 changed files with 5,951 additions and 711 deletions.
3,126 changes: 3,098 additions & 28 deletions api/cosmos/base/snapshots/v1beta1/snapshot.pulsar.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,18 @@ func DefaultStoreLoader(ms sdk.CommitMultiStore) error {
return ms.LoadLatestVersion()
}

// CommitMultiStore returns the root multi-store.
// Extension snapshotter use this to read storage.
func (app *BaseApp) CommitMultiStore() sdk.CommitMultiStore {
return app.cms
}

// SnapshotManager returns the snapshot manager.
// application use this to register extra extension snapshotters.
func (app *BaseApp) SnapshotManager() *snapshots.Manager {
return app.snapshotManager
}

// LoadVersion loads the BaseApp application version. It will panic if called
// more than once on a running baseapp.
func (app *BaseApp) LoadVersion(version int64) error {
Expand Down
2 changes: 1 addition & 1 deletion baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (app *BaseApp) SetSnapshotStore(snapshotStore *snapshots.Store) {
app.snapshotManager = nil
return
}
app.snapshotManager = snapshots.NewManager(snapshotStore, app.cms)
app.snapshotManager = snapshots.NewManager(snapshotStore, app.cms, nil)
}

// SetSnapshotInterval sets the snapshot interval.
Expand Down
37 changes: 36 additions & 1 deletion proto/cosmos/base/snapshots/v1beta1/snapshot.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,39 @@ message Snapshot {
// Metadata contains SDK-specific snapshot metadata.
message Metadata {
repeated bytes chunk_hashes = 1; // SHA-256 chunk hashes
}
}

// SnapshotItem is an item contained in a rootmulti.Store snapshot.
message SnapshotItem {
// item is the specific type of snapshot item.
oneof item {
SnapshotStoreItem store = 1;
SnapshotIAVLItem iavl = 2 [(gogoproto.customname) = "IAVL"];
SnapshotExtensionMeta extension = 3;
SnapshotExtensionPayload extension_payload = 4;
}
}

// SnapshotStoreItem contains metadata about a snapshotted store.
message SnapshotStoreItem {
string name = 1;
}

// SnapshotIAVLItem is an exported IAVL node.
message SnapshotIAVLItem {
bytes key = 1;
bytes value = 2;
int64 version = 3;
int32 height = 4;
}

// SnapshotExtensionMeta contains metadata about an external snapshotter.
message SnapshotExtensionMeta {
string name = 1;
uint32 format = 2;
}

// SnapshotExtensionPayload contains payloads of an external snapshotter.
message SnapshotExtensionPayload {
bytes payload = 1;
}
28 changes: 0 additions & 28 deletions proto/cosmos/base/store/v1beta1/snapshot.proto

This file was deleted.

8 changes: 5 additions & 3 deletions server/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package mock
import (
"io"

protoio "github.com/gogo/protobuf/io"
dbm "github.com/tendermint/tm-db"

snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)
Expand Down Expand Up @@ -122,13 +124,13 @@ func (ms multiStore) SetInitialVersion(version int64) error {
panic("not implemented")
}

func (ms multiStore) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
func (ms multiStore) Snapshot(height uint64, protoWriter protoio.Writer) error {
panic("not implemented")
}

func (ms multiStore) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
panic("not implemented")
}

Expand Down
File renamed without changes.
File renamed without changes.
105 changes: 74 additions & 31 deletions snapshots/helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package snapshots_test

import (
"bufio"
"bytes"
"compress/zlib"
"crypto/sha256"
"errors"
"io"
"os"
"testing"
"time"

protoio "github.com/gogo/protobuf/io"
"github.com/stretchr/testify/require"
db "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/snapshots"
"github.com/cosmos/cosmos-sdk/snapshots/types"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

func checksums(slice [][]byte) [][]byte {
Expand Down Expand Up @@ -56,45 +61,85 @@ func readChunks(chunks <-chan io.ReadCloser) [][]byte {
return bodies
}

// snapshotItems serialize a array of bytes as SnapshotItem_ExtensionPayload, and return the chunks.
func snapshotItems(items [][]byte) [][]byte {
// copy the same parameters from the code
snapshotChunkSize := uint64(10e6)
snapshotBufferSize := int(snapshotChunkSize)

ch := make(chan io.ReadCloser)
go func() {
chunkWriter := snapshots.NewChunkWriter(ch, snapshotChunkSize)
bufWriter := bufio.NewWriterSize(chunkWriter, snapshotBufferSize)
zWriter, _ := zlib.NewWriterLevel(bufWriter, 7)
protoWriter := protoio.NewDelimitedWriter(zWriter)
for _, item := range items {
types.WriteExtensionItem(protoWriter, item)
}
protoWriter.Close()
zWriter.Close()
bufWriter.Flush()
chunkWriter.Close()
}()

var chunks [][]byte
for chunkBody := range ch {
chunk, err := io.ReadAll(chunkBody)
if err != nil {
panic(err)
}
chunks = append(chunks, chunk)
}
return chunks
}

type mockSnapshotter struct {
chunks [][]byte
items [][]byte
}

func (m *mockSnapshotter) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
if format == 0 {
return types.ErrUnknownFormat
return snapshottypes.SnapshotItem{}, types.ErrUnknownFormat
}
if m.chunks != nil {
return errors.New("already has contents")
if m.items != nil {
return snapshottypes.SnapshotItem{}, errors.New("already has contents")
}
if ready != nil {
close(ready)

m.items = [][]byte{}
for {
item := &snapshottypes.SnapshotItem{}
err := protoReader.ReadMsg(item)
if err == io.EOF {
break
} else if err != nil {
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message")
}
payload := item.GetExtensionPayload()
if payload == nil {
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message")
}
m.items = append(m.items, payload.Payload)
}

m.chunks = [][]byte{}
for reader := range chunks {
chunk, err := io.ReadAll(reader)
if err != nil {
return snapshottypes.SnapshotItem{}, nil
}

func (m *mockSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
for _, item := range m.items {
if err := types.WriteExtensionItem(protoWriter, item); err != nil {
return err
}
m.chunks = append(m.chunks, chunk)
}

return nil
}

func (m *mockSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
if format == 0 {
return nil, types.ErrUnknownFormat
}
ch := make(chan io.ReadCloser, len(m.chunks))
for _, chunk := range m.chunks {
ch <- io.NopCloser(bytes.NewReader(chunk))
}
close(ch)
return ch, nil
func (m *mockSnapshotter) SnapshotFormat() uint32 {
return 1
}
func (m *mockSnapshotter) SupportedFormats() []uint32 {
return []uint32{1}
}

// setupBusyManager creates a manager with an empty store that is busy creating a snapshot at height 1.
Expand All @@ -110,7 +155,7 @@ func setupBusyManager(t *testing.T) *snapshots.Manager {
store, err := snapshots.NewStore(db.NewMemDB(), tempdir)
require.NoError(t, err)
hung := newHungSnapshotter()
mgr := snapshots.NewManager(store, hung)
mgr := snapshots.NewManager(store, hung, nil)

go func() {
_, err := mgr.Create(1)
Expand All @@ -137,15 +182,13 @@ func (m *hungSnapshotter) Close() {
close(m.ch)
}

func (m *hungSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
func (m *hungSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
<-m.ch
ch := make(chan io.ReadCloser, 1)
ch <- io.NopCloser(bytes.NewReader([]byte{}))
return ch, nil
return nil
}

func (m *hungSnapshotter) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
panic("not implemented")
}
Loading

0 comments on commit 67087bb

Please sign in to comment.