diff --git a/UPGRADING.md b/UPGRADING.md index 5e52d8b56ba6..770c18e494da 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -5,6 +5,81 @@ Note, always read the **SimApp** section for more information on application wir ## [Unreleased] +### Unordered Transactions + +The Cosmos SDK now supports unordered transactions. This means that transactions +can be executed in any order and doesn't require the client to deal with or manage +nonces. This also means the order of execution is not guaranteed. To enable unordered +transactions in your application: + +* Update the `App` constructor to create, load, and save the unordered transaction + manager. + + ```go + func NewApp(...) *App { + // ... + + // create, start, and load the unordered tx manager + utxDataDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data") + app.UnorderedTxManager = unorderedtx.NewManager(utxDataDir) + app.UnorderedTxManager.Start() + + if err := app.UnorderedTxManager.OnInit(); err != nil { + panic(fmt.Errorf("failed to initialize unordered tx manager: %w", err)) + } + } + ``` + +* Add the decorator to the existing AnteHandler chain, which should be as early + as possible. + + ```go + anteDecorators := []sdk.AnteDecorator{ + ante.NewSetUpContextDecorator(), + // ... + ante.NewUnorderedTxDecorator(unorderedtx.DefaultMaxUnOrderedTTL, app.UnorderedTxManager), + // ... + } + + return sdk.ChainAnteDecorators(anteDecorators...), nil + ``` + +* If the App has a SnapshotManager defined, you must also register the extension + for the TxManager. + + ```go + if manager := app.SnapshotManager(); manager != nil { + err := manager.RegisterExtensions(unorderedtx.NewSnapshotter(app.UnorderedTxManager)) + if err != nil { + panic(fmt.Errorf("failed to register snapshot extension: %s", err)) + } + } + ``` + +* Create or update the App's `Close()` method to close the unordered tx manager. + Note, this is critical as it ensures the manager's state is written to file + such that when the node restarts, it can recover the state to provide replay + protection. + + ```go + func (app *App) Close() error { + // ... + + // close the unordered tx manager + if e := app.UnorderedTxManager.Close(); e != nil { + err = errors.Join(err, e) + } + + return err + } + ``` + +To submit an unordered transaction, the client must set the `unordered` flag to +`true` and ensure a reasonable `timeout_height` is set. The `timeout_height` is +used as a TTL for the transaction and is used to provide replay protection. See +[ADR-070](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-070-unordered-account.md) +for more details. + ### Params * Params Migrations were removed. It is required to migrate to 0.50 prior to upgrading to .51. diff --git a/docs/architecture/adr-070-unordered-account.md b/docs/architecture/adr-070-unordered-account.md index ac63a4a40daa..c2d6e382f13b 100644 --- a/docs/architecture/adr-070-unordered-account.md +++ b/docs/architecture/adr-070-unordered-account.md @@ -278,14 +278,22 @@ func (d *DedupTxDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, simulate bool, Wire the `OnNewBlock` method of `UnorderedTxManager` into the BaseApp's ABCI `Commit` event. -### Start Up +### State Management -On start up, the node needs to re-fill the tx hash dictionary of `UnorderedTxManager` -by scanning `MaxUnOrderedTTL` number of historical blocks for existing un-expired -un-ordered transactions. +On start up, the node needs to ensure the TxManager's state contains all un-expired +transactions that have been committed to the chain. This is critical since if the +state is not properly initialized, the node will not reject duplicate transactions +and thus will not provide replay protection, and will likely get an app hash mismatch error. -An alternative design is to store the tx hash dictionary in kv store, then no need -to warm up on start up. +We propose to write all un-expired unordered transactions from the TxManager's to +file on disk. On start up, the node will read this file and re-populate the TxManager's +map. The write to file will happen when the node gracefully shuts down on `Close()`. + +Note, this is not a perfect solution, in the context of store v1. With store v2, +we can omit explicit file handling altogether and simply write the all the transactions +to non-consensus state, i.e State Storage (SS). + +Alternatively, we can write all the transactions to consensus state. ## Consequences diff --git a/simapp/ante.go b/simapp/ante.go index 918244a81405..8ff82263566d 100644 --- a/simapp/ante.go +++ b/simapp/ante.go @@ -4,6 +4,7 @@ import ( "errors" "cosmossdk.io/x/auth/ante" + "cosmossdk.io/x/auth/ante/unorderedtx" circuitante "cosmossdk.io/x/circuit/ante" sdk "github.com/cosmos/cosmos-sdk/types" @@ -13,6 +14,7 @@ import ( type HandlerOptions struct { ante.HandlerOptions CircuitKeeper circuitante.CircuitBreaker + TxManager *unorderedtx.Manager } // NewAnteHandler returns an AnteHandler that checks and increments sequence @@ -37,6 +39,7 @@ func NewAnteHandler(options HandlerOptions) (sdk.AnteHandler, error) { ante.NewExtensionOptionsDecorator(options.ExtensionOptionChecker), ante.NewValidateBasicDecorator(), ante.NewTxTimeoutHeightDecorator(), + ante.NewUnorderedTxDecorator(unorderedtx.DefaultMaxUnOrderedTTL, options.TxManager), ante.NewValidateMemoDecorator(options.AccountKeeper), ante.NewConsumeGasForTxSizeDecorator(options.AccountKeeper), ante.NewDeductFeeDecorator(options.AccountKeeper, options.BankKeeper, options.FeegrantKeeper, options.TxFeeChecker), diff --git a/simapp/app.go b/simapp/app.go index 4bffaaa09a7a..7007d9a62efe 100644 --- a/simapp/app.go +++ b/simapp/app.go @@ -26,6 +26,7 @@ import ( "cosmossdk.io/x/accounts/testing/counter" "cosmossdk.io/x/auth" "cosmossdk.io/x/auth/ante" + "cosmossdk.io/x/auth/ante/unorderedtx" authcodec "cosmossdk.io/x/auth/codec" authkeeper "cosmossdk.io/x/auth/keeper" "cosmossdk.io/x/auth/posthandler" @@ -169,6 +170,8 @@ type SimApp struct { ModuleManager *module.Manager BasicModuleManager module.BasicManager + UnorderedTxManager *unorderedtx.Manager + // simulation manager sm *module.SimulationManager @@ -519,6 +522,25 @@ func NewSimApp( } app.sm = module.NewSimulationManagerFromAppModules(app.ModuleManager.Modules, overrideModules) + // create, start, and load the unordered tx manager + utxDataDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data") + app.UnorderedTxManager = unorderedtx.NewManager(utxDataDir) + app.UnorderedTxManager.Start() + + if err := app.UnorderedTxManager.OnInit(); err != nil { + panic(fmt.Errorf("failed to initialize unordered tx manager: %w", err)) + } + + // register custom snapshot extensions (if any) + if manager := app.SnapshotManager(); manager != nil { + err := manager.RegisterExtensions( + unorderedtx.NewSnapshotter(app.UnorderedTxManager), + ) + if err != nil { + panic(fmt.Errorf("failed to register snapshot extension: %s", err)) + } + } + app.sm.RegisterStoreDecoders() // initialize stores @@ -579,6 +601,7 @@ func (app *SimApp) setAnteHandler(txConfig client.TxConfig) { SigGasConsumer: ante.DefaultSigVerificationGasConsumer, }, &app.CircuitKeeper, + app.UnorderedTxManager, }, ) if err != nil { @@ -600,6 +623,12 @@ func (app *SimApp) setPostHandler() { app.SetPostHandler(postHandler) } +// Close implements the Application interface and closes all necessary application +// resources. +func (app *SimApp) Close() error { + return app.UnorderedTxManager.Close() +} + // Name returns the name of the App func (app *SimApp) Name() string { return app.BaseApp.Name() } diff --git a/simapp/app_v2.go b/simapp/app_v2.go index 172e6e7af408..0c4ea6f63ac8 100644 --- a/simapp/app_v2.go +++ b/simapp/app_v2.go @@ -3,16 +3,19 @@ package simapp import ( + "fmt" "io" "os" "path/filepath" dbm "github.com/cosmos/cosmos-db" + "github.com/spf13/cast" "cosmossdk.io/depinject" "cosmossdk.io/log" storetypes "cosmossdk.io/store/types" "cosmossdk.io/x/auth" + "cosmossdk.io/x/auth/ante/unorderedtx" authkeeper "cosmossdk.io/x/auth/keeper" authsims "cosmossdk.io/x/auth/simulation" authtypes "cosmossdk.io/x/auth/types" @@ -34,6 +37,7 @@ import ( "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/client/flags" "github.com/cosmos/cosmos-sdk/codec" codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/runtime" @@ -64,6 +68,8 @@ type SimApp struct { txConfig client.TxConfig interfaceRegistry codectypes.InterfaceRegistry + UnorderedTxManager *unorderedtx.Manager + // keepers AuthKeeper authkeeper.AccountKeeper BankKeeper bankkeeper.Keeper @@ -256,6 +262,25 @@ func NewSimApp( // return app.App.InitChainer(ctx, req) // }) + // create, start, and load the unordered tx manager + utxDataDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data") + app.UnorderedTxManager = unorderedtx.NewManager(utxDataDir) + app.UnorderedTxManager.Start() + + if err := app.UnorderedTxManager.OnInit(); err != nil { + panic(fmt.Errorf("failed to initialize unordered tx manager: %w", err)) + } + + // register custom snapshot extensions (if any) + if manager := app.SnapshotManager(); manager != nil { + err := manager.RegisterExtensions( + unorderedtx.NewSnapshotter(app.UnorderedTxManager), + ) + if err != nil { + panic(fmt.Errorf("failed to register snapshot extension: %s", err)) + } + } + if err := app.Load(loadLatest); err != nil { panic(err) } @@ -263,6 +288,12 @@ func NewSimApp( return app } +// Close implements the Application interface and closes all necessary application +// resources. +func (app *SimApp) Close() error { + return app.UnorderedTxManager.Close() +} + // LegacyAmino returns SimApp's amino codec. // // NOTE: This is solely to be used for testing purposes as it may be desirable diff --git a/x/auth/ante/unordered.go b/x/auth/ante/unordered.go index 9e0a933dd703..c110e63650ce 100644 --- a/x/auth/ante/unordered.go +++ b/x/auth/ante/unordered.go @@ -45,11 +45,16 @@ func (d *UnorderedTxDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, simulate b return next(ctx, tx, simulate) } - if unorderedTx.GetTimeoutHeight() == 0 { + // TTL is defined as a specific block height at which this tx is no longer valid + ttl := unorderedTx.GetTimeoutHeight() + + if ttl == 0 { return ctx, errorsmod.Wrap(sdkerrors.ErrInvalidRequest, "unordered transaction must have timeout_height set") } - - if unorderedTx.GetTimeoutHeight() > uint64(ctx.BlockHeight())+d.maxUnOrderedTTL { + if ttl < uint64(ctx.BlockHeight()) { + return ctx, errorsmod.Wrap(sdkerrors.ErrInvalidRequest, "unordered transaction has a timeout_height that has already passed") + } + if ttl > uint64(ctx.BlockHeight())+d.maxUnOrderedTTL { return ctx, errorsmod.Wrapf(sdkerrors.ErrInvalidRequest, "unordered tx ttl exceeds %d", d.maxUnOrderedTTL) } @@ -62,7 +67,7 @@ func (d *UnorderedTxDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, simulate b if ctx.ExecMode() == sdk.ExecModeFinalize { // a new tx included in the block, add the hash to the unordered tx manager - d.txManager.Add(txHash, unorderedTx.GetTimeoutHeight()) + d.txManager.Add(txHash, ttl) } return next(ctx, tx, simulate) diff --git a/x/auth/ante/unordered_test.go b/x/auth/ante/unordered_test.go index 13792822d07c..61653ee75a46 100644 --- a/x/auth/ante/unordered_test.go +++ b/x/auth/ante/unordered_test.go @@ -16,8 +16,10 @@ import ( ) func TestUnorderedTxDecorator_OrderedTx(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() @@ -31,8 +33,10 @@ func TestUnorderedTxDecorator_OrderedTx(t *testing.T) { } func TestUnorderedTxDecorator_UnorderedTx_NoTTL(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() @@ -46,8 +50,10 @@ func TestUnorderedTxDecorator_UnorderedTx_NoTTL(t *testing.T) { } func TestUnorderedTxDecorator_UnorderedTx_InvalidTTL(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() @@ -61,8 +67,10 @@ func TestUnorderedTxDecorator_UnorderedTx_InvalidTTL(t *testing.T) { } func TestUnorderedTxDecorator_UnorderedTx_AlreadyExists(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() @@ -79,8 +87,10 @@ func TestUnorderedTxDecorator_UnorderedTx_AlreadyExists(t *testing.T) { } func TestUnorderedTxDecorator_UnorderedTx_ValidCheckTx(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() @@ -94,8 +104,10 @@ func TestUnorderedTxDecorator_UnorderedTx_ValidCheckTx(t *testing.T) { } func TestUnorderedTxDecorator_UnorderedTx_ValidDeliverTx(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() diff --git a/x/auth/ante/unorderedtx/manager.go b/x/auth/ante/unorderedtx/manager.go index 3a6474baacbc..14fbe018b83b 100644 --- a/x/auth/ante/unorderedtx/manager.go +++ b/x/auth/ante/unorderedtx/manager.go @@ -1,15 +1,29 @@ package unorderedtx import ( + "bufio" + "bytes" "context" + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "sort" "sync" "time" + + "golang.org/x/exp/maps" ) const ( // DefaultMaxUnOrderedTTL defines the default maximum TTL an un-ordered transaction // can set. DefaultMaxUnOrderedTTL = 1024 + + dirName = "unordered_txs" + fileName = "data" ) // TxHash defines a transaction hash type alias, which is a fixed array of 32 bytes. @@ -23,6 +37,16 @@ type Manager struct { // doneCh allows us to ensure the purgeLoop has gracefully terminated prior to closing doneCh chan struct{} + // dataDir defines the directory to store unexpired unordered transactions + // + // XXX: Note, ideally we avoid the need to store unexpired unordered transactions + // directly to file. However, store v1 does not allow such a primitive. But, + // once store v2 is fully integrated, we can remove manual file handling and + // store the unexpired unordered transactions directly to SS. + // + // Ref: https://github.com/cosmos/cosmos-sdk/issues/18467 + dataDir string + mu sync.RWMutex // txHashes defines a map from tx hash -> TTL value, which is used for duplicate // checking and replay protection, as well as purging the map when the TTL is @@ -30,8 +54,14 @@ type Manager struct { txHashes map[TxHash]uint64 } -func NewManager() *Manager { +func NewManager(dataDir string) *Manager { + path := filepath.Join(dataDir, dirName) + if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { + _ = os.Mkdir(path, os.ModePerm) + } + m := &Manager{ + dataDir: dataDir, blockCh: make(chan uint64, 16), doneCh: make(chan struct{}), txHashes: make(map[TxHash]uint64), @@ -44,12 +74,18 @@ func (m *Manager) Start() { go m.purgeLoop() } +// Close must be called when a node gracefully shuts down. Typically, this should +// be called in an application's Close() function, which is called by the server. +// Note, Start() must be called in order for Close() to not hang. +// +// It will free all necessary resources as well as writing all unexpired unordered +// transactions along with their TTL values to file. func (m *Manager) Close() error { close(m.blockCh) <-m.doneCh m.blockCh = nil - return nil + return m.flushToFile() } func (m *Manager) Contains(hash TxHash) bool { @@ -74,12 +110,106 @@ func (m *Manager) Add(txHash TxHash, ttl uint64) { m.txHashes[txHash] = ttl } -// OnNewBlock send the latest block number to the background purge loop, which +// OnInit must be called when a node starts up. Typically, this should be called +// in an application's constructor, which is called by the server. +func (m *Manager) OnInit() error { + f, err := os.Open(filepath.Join(m.dataDir, dirName, fileName)) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + // File does not exist, which we can assume that there are no unexpired + // unordered transactions. + return nil + } + + return fmt.Errorf("failed to open unconfirmed txs file: %w", err) + } + defer f.Close() + + var ( + r = bufio.NewReader(f) + buf = make([]byte, chunkSize) + ) + for { + n, err := io.ReadFull(r, buf) + if err != nil { + if errors.Is(err, io.EOF) { + break + } else { + return fmt.Errorf("failed to read unconfirmed txs file: %w", err) + } + } + if n != 32+8 { + return fmt.Errorf("read unexpected number of bytes from unconfirmed txs file: %d", n) + } + + var txHash TxHash + copy(txHash[:], buf[:txHashSize]) + + m.Add(txHash, binary.BigEndian.Uint64(buf[txHashSize:])) + } + + return nil +} + +// OnNewBlock sends the latest block number to the background purge loop, which // should be called in ABCI Commit event. func (m *Manager) OnNewBlock(blockHeight uint64) { m.blockCh <- blockHeight } +func (m *Manager) exportSnapshot(height uint64, snapshotWriter func([]byte) error) error { + var buf bytes.Buffer + w := bufio.NewWriter(&buf) + + keys := maps.Keys(m.txHashes) + sort.Slice(keys, func(i, j int) bool { return bytes.Compare(keys[i][:], keys[j][:]) < 0 }) + + for _, txHash := range keys { + ttl := m.txHashes[txHash] + if height > ttl { + // skip expired txs that have yet to be purged + continue + } + + chunk := unorderedTxToBytes(txHash, ttl) + + if _, err := w.Write(chunk); err != nil { + return fmt.Errorf("failed to write unordered tx to buffer: %w", err) + } + } + + if err := w.Flush(); err != nil { + return fmt.Errorf("failed to flush unordered txs buffer: %w", err) + } + + return snapshotWriter(buf.Bytes()) +} + +// flushToFile writes all unexpired unordered transactions along with their TTL +// to file, overwriting the existing file if it exists. +func (m *Manager) flushToFile() error { + f, err := os.Create(filepath.Join(m.dataDir, dirName, fileName)) + if err != nil { + return fmt.Errorf("failed to create unordered txs file: %w", err) + } + defer f.Close() + + w := bufio.NewWriter(f) + for txHash, ttl := range m.txHashes { + chunk := unorderedTxToBytes(txHash, ttl) + + if _, err = w.Write(chunk); err != nil { + return fmt.Errorf("failed to write unordered tx to buffer: %w", err) + } + } + + if err = w.Flush(); err != nil { + return fmt.Errorf("failed to flush unordered txs buffer: %w", err) + } + + return nil +} + // expiredTxs returns expired tx hashes based on the provided block height. func (m *Manager) expiredTxs(blockHeight uint64) []TxHash { m.mu.RLock() @@ -142,3 +272,14 @@ func (m *Manager) batchReceive() (uint64, bool) { } } } + +func unorderedTxToBytes(txHash TxHash, ttl uint64) []byte { + chunk := make([]byte, chunkSize) + copy(chunk[:txHashSize], txHash[:]) + + ttlBz := make([]byte, ttlSize) + binary.BigEndian.PutUint64(ttlBz, ttl) + copy(chunk[txHashSize:], ttlBz) + + return chunk +} diff --git a/x/auth/ante/unorderedtx/manager_test.go b/x/auth/ante/unorderedtx/manager_test.go index caf03a3c269b..04138e344657 100644 --- a/x/auth/ante/unorderedtx/manager_test.go +++ b/x/auth/ante/unorderedtx/manager_test.go @@ -10,7 +10,7 @@ import ( ) func TestUnorderedTxManager_Close(t *testing.T) { - txm := unorderedtx.NewManager() + txm := unorderedtx.NewManager(t.TempDir()) txm.Start() require.NoError(t, txm.Close()) @@ -18,8 +18,10 @@ func TestUnorderedTxManager_Close(t *testing.T) { } func TestUnorderedTxManager_SimpleSize(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() @@ -31,8 +33,10 @@ func TestUnorderedTxManager_SimpleSize(t *testing.T) { } func TestUnorderedTxManager_SimpleContains(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() @@ -48,9 +52,51 @@ func TestUnorderedTxManager_SimpleContains(t *testing.T) { } } +func TestUnorderedTxManager_InitEmpty(t *testing.T) { + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() + + txm.Start() + + require.NoError(t, txm.OnInit()) +} + +func TestUnorderedTxManager_CloseInit(t *testing.T) { + dataDir := t.TempDir() + txm := unorderedtx.NewManager(dataDir) + txm.Start() + + // add a handful of unordered txs + for i := 0; i < 100; i++ { + txm.Add([32]byte{byte(i)}, 100) + } + + // close the manager, which should flush all unexpired txs to file + require.NoError(t, txm.Close()) + + // create a new manager, start it + txm2 := unorderedtx.NewManager(dataDir) + defer func() { + require.NoError(t, txm2.Close()) + }() + + // start and execute OnInit, which should load the unexpired txs from file + txm2.Start() + require.NoError(t, txm2.OnInit()) + require.Equal(t, 100, txm2.Size()) + + for i := 0; i < 100; i++ { + require.True(t, txm2.Contains([32]byte{byte(i)})) + } +} + func TestUnorderedTxManager_Flow(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() diff --git a/x/auth/ante/unorderedtx/snapshotter.go b/x/auth/ante/unorderedtx/snapshotter.go new file mode 100644 index 000000000000..5941a11a6888 --- /dev/null +++ b/x/auth/ante/unorderedtx/snapshotter.go @@ -0,0 +1,92 @@ +package unorderedtx + +import ( + "encoding/binary" + "errors" + "io" + + snapshot "cosmossdk.io/store/snapshots/types" +) + +const ( + txHashSize = 32 + ttlSize = 8 + chunkSize = txHashSize + ttlSize +) + +var _ snapshot.ExtensionSnapshotter = &Snapshotter{} + +const ( + // SnapshotFormat defines the snapshot format of exported unordered transactions. + // No protobuf envelope, no metadata. + SnapshotFormat = 1 + + // SnapshotName defines the snapshot name of exported unordered transactions. + SnapshotName = "unordered_txs" +) + +type Snapshotter struct { + m *Manager +} + +func NewSnapshotter(m *Manager) *Snapshotter { + return &Snapshotter{m: m} +} + +func (s *Snapshotter) SnapshotName() string { + return SnapshotName +} + +func (s *Snapshotter) SnapshotFormat() uint32 { + return SnapshotFormat +} + +func (s *Snapshotter) SupportedFormats() []uint32 { + return []uint32{SnapshotFormat} +} + +func (s *Snapshotter) SnapshotExtension(height uint64, payloadWriter snapshot.ExtensionPayloadWriter) error { + // export all unordered transactions as a single blob + return s.m.exportSnapshot(height, payloadWriter) +} + +func (s *Snapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshot.ExtensionPayloadReader) error { + if format == SnapshotFormat { + return s.restore(height, payloadReader) + } + + return snapshot.ErrUnknownFormat +} + +func (s *Snapshotter) restore(height uint64, payloadReader snapshot.ExtensionPayloadReader) error { + // the payload should be the entire set of unordered transactions + payload, err := payloadReader() + if err != nil { + if errors.Is(err, io.EOF) { + return io.ErrUnexpectedEOF + } + + return err + } + + if len(payload)%chunkSize != 0 { + return errors.New("invalid unordered txs payload length") + } + + var i int + for i < len(payload) { + var txHash TxHash + copy(txHash[:], payload[i:i+txHashSize]) + + ttl := binary.BigEndian.Uint64(payload[i+txHashSize : i+chunkSize]) + + if height < ttl { + // only add unordered transactions that are still valid, i.e. unexpired + s.m.Add(txHash, ttl) + } + + i += chunkSize + } + + return nil +} diff --git a/x/auth/ante/unorderedtx/snapshotter_test.go b/x/auth/ante/unorderedtx/snapshotter_test.go new file mode 100644 index 000000000000..1645fbb90677 --- /dev/null +++ b/x/auth/ante/unorderedtx/snapshotter_test.go @@ -0,0 +1,56 @@ +package unorderedtx_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "cosmossdk.io/x/auth/ante/unorderedtx" +) + +func TestSnapshotter(t *testing.T) { + dataDir := t.TempDir() + txm := unorderedtx.NewManager(dataDir) + + // add a handful of unordered txs + for i := 0; i < 100; i++ { + txm.Add([32]byte{byte(i)}, 100) + } + + var unorderedTxBz []byte + s := unorderedtx.NewSnapshotter(txm) + w := func(bz []byte) error { + unorderedTxBz = bz + return nil + } + + err := s.SnapshotExtension(50, w) + require.NoError(t, err) + require.NotEmpty(t, unorderedTxBz) + + pr := func() ([]byte, error) { + return unorderedTxBz, nil + } + + // restore with an invalid format which should result in an error + err = s.RestoreExtension(50, 2, pr) + require.Error(t, err) + + // restore with height > ttl which should result in no unordered txs synced + txm2 := unorderedtx.NewManager(dataDir) + s2 := unorderedtx.NewSnapshotter(txm2) + err = s2.RestoreExtension(200, unorderedtx.SnapshotFormat, pr) + require.NoError(t, err) + require.Empty(t, txm2.Size()) + + // restore with with height < ttl which should result in all unordered txs synced + txm3 := unorderedtx.NewManager(dataDir) + s3 := unorderedtx.NewSnapshotter(txm3) + err = s3.RestoreExtension(50, unorderedtx.SnapshotFormat, pr) + require.NoError(t, err) + require.Equal(t, 100, txm3.Size()) + + for i := 0; i < 100; i++ { + require.True(t, txm3.Contains([32]byte{byte(i)})) + } +}