From 4cfa6b8ccd4a88e36429de069396cfdcb289d210 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Wed, 13 Dec 2023 14:42:57 -0800 Subject: [PATCH 01/25] updates --- x/auth/ante/unorderedtx/manager.go | 10 +++++++- x/auth/ante/unorderedtx/snapshotter.go | 35 ++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 x/auth/ante/unorderedtx/snapshotter.go diff --git a/x/auth/ante/unorderedtx/manager.go b/x/auth/ante/unorderedtx/manager.go index 3a6474baacbc..5245f4670b11 100644 --- a/x/auth/ante/unorderedtx/manager.go +++ b/x/auth/ante/unorderedtx/manager.go @@ -74,7 +74,15 @@ func (m *Manager) Add(txHash TxHash, ttl uint64) { m.txHashes[txHash] = ttl } -// OnNewBlock send the latest block number to the background purge loop, which +func (m *Manager) Export() any { + panic("not implemented") +} + +func (m *Manager) OnInit() error { + panic("not implemented") +} + +// OnNewBlock sends the latest block number to the background purge loop, which // should be called in ABCI Commit event. func (m *Manager) OnNewBlock(blockHeight uint64) { m.blockCh <- blockHeight diff --git a/x/auth/ante/unorderedtx/snapshotter.go b/x/auth/ante/unorderedtx/snapshotter.go new file mode 100644 index 000000000000..6b4b0edbbcb1 --- /dev/null +++ b/x/auth/ante/unorderedtx/snapshotter.go @@ -0,0 +1,35 @@ +package unorderedtx + +import ( + snapshot "cosmossdk.io/store/snapshots/types" +) + +var _ snapshot.ExtensionSnapshotter = &Snapshotter{} + +// SnapshotFormat defines the snapshot format of exported unordered transactions. +// No protobuf envelope, no metadata. +const SnapshotFormat = 1 + +type Snapshotter struct { + m *Manager +} + +func (s *Snapshotter) SnapshotName() string { + panic("not implemented!") +} + +func (s *Snapshotter) SnapshotFormat() uint32 { + panic("not implemented!") +} + +func (s *Snapshotter) SupportedFormats() []uint32 { + panic("not implemented!") +} + +func (s *Snapshotter) SnapshotExtension(height uint64, payloadWriter snapshot.ExtensionPayloadWriter) error { + panic("not implemented!") +} + +func (s *Snapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshot.ExtensionPayloadReader) error { + panic("not implemented!") +} From 15ac2527e644a6ee606b6b1fcb9e895529edcb60 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Wed, 13 Dec 2023 14:54:35 -0800 Subject: [PATCH 02/25] updates --- x/auth/ante/unorderedtx/manager.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/x/auth/ante/unorderedtx/manager.go b/x/auth/ante/unorderedtx/manager.go index 5245f4670b11..eb75ad638fbb 100644 --- a/x/auth/ante/unorderedtx/manager.go +++ b/x/auth/ante/unorderedtx/manager.go @@ -82,6 +82,10 @@ func (m *Manager) OnInit() error { panic("not implemented") } +func (m *Manager) OnClose() error { + panic("not implemented") +} + // OnNewBlock sends the latest block number to the background purge loop, which // should be called in ABCI Commit event. func (m *Manager) OnNewBlock(blockHeight uint64) { From 45b565ce3b316dc0cd30559345c2a8854e6cb656 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Wed, 13 Dec 2023 16:53:56 -0800 Subject: [PATCH 03/25] updates --- x/auth/ante/unordered_test.go | 36 +++++--- x/auth/ante/unorderedtx/manager.go | 115 ++++++++++++++++++++++-- x/auth/ante/unorderedtx/manager_test.go | 60 +++++++++++-- 3 files changed, 183 insertions(+), 28 deletions(-) diff --git a/x/auth/ante/unordered_test.go b/x/auth/ante/unordered_test.go index 13792822d07c..61653ee75a46 100644 --- a/x/auth/ante/unordered_test.go +++ b/x/auth/ante/unordered_test.go @@ -16,8 +16,10 @@ import ( ) func TestUnorderedTxDecorator_OrderedTx(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() @@ -31,8 +33,10 @@ func TestUnorderedTxDecorator_OrderedTx(t *testing.T) { } func TestUnorderedTxDecorator_UnorderedTx_NoTTL(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() @@ -46,8 +50,10 @@ func TestUnorderedTxDecorator_UnorderedTx_NoTTL(t *testing.T) { } func TestUnorderedTxDecorator_UnorderedTx_InvalidTTL(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() @@ -61,8 +67,10 @@ func TestUnorderedTxDecorator_UnorderedTx_InvalidTTL(t *testing.T) { } func TestUnorderedTxDecorator_UnorderedTx_AlreadyExists(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() @@ -79,8 +87,10 @@ func TestUnorderedTxDecorator_UnorderedTx_AlreadyExists(t *testing.T) { } func TestUnorderedTxDecorator_UnorderedTx_ValidCheckTx(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() @@ -94,8 +104,10 @@ func TestUnorderedTxDecorator_UnorderedTx_ValidCheckTx(t *testing.T) { } func TestUnorderedTxDecorator_UnorderedTx_ValidDeliverTx(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() diff --git a/x/auth/ante/unorderedtx/manager.go b/x/auth/ante/unorderedtx/manager.go index eb75ad638fbb..dafa53bee856 100644 --- a/x/auth/ante/unorderedtx/manager.go +++ b/x/auth/ante/unorderedtx/manager.go @@ -1,7 +1,14 @@ package unorderedtx import ( + "bufio" "context" + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "path/filepath" "sync" "time" ) @@ -23,6 +30,16 @@ type Manager struct { // doneCh allows us to ensure the purgeLoop has gracefully terminated prior to closing doneCh chan struct{} + // dataDir defines the directory to store unexpired unordered transactions + // + // XXX: Note, ideally we avoid the need to store unexpired unordered transactions + // directly to file. However, store v1 does not allow such a primitive. However, + // once store v2 is fully integrated, we can remove manual file handling and + // store the unexpired unordered transactions directly to SS. + // + // Ref: https://github.com/cosmos/cosmos-sdk/issues/18467 + dataDir string + mu sync.RWMutex // txHashes defines a map from tx hash -> TTL value, which is used for duplicate // checking and replay protection, as well as purging the map when the TTL is @@ -30,8 +47,14 @@ type Manager struct { txHashes map[TxHash]uint64 } -func NewManager() *Manager { +func NewManager(dataDir string) *Manager { + path := filepath.Join(dataDir, "unordered_txs") + if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { + _ = os.Mkdir(path, os.ModePerm) + } + m := &Manager{ + dataDir: dataDir, blockCh: make(chan uint64, 16), doneCh: make(chan struct{}), txHashes: make(map[TxHash]uint64), @@ -44,12 +67,17 @@ func (m *Manager) Start() { go m.purgeLoop() } +// Close must be called when a node gracefully shuts down. Typically, this should +// be called in an application's Close() function, which is called by the server. +// +// It will free all necessary resources as well as writing all unexpired unordered +// transactions along with their TTL values to file. func (m *Manager) Close() error { close(m.blockCh) <-m.doneCh m.blockCh = nil - return nil + return m.flushToFile() } func (m *Manager) Contains(hash TxHash) bool { @@ -74,16 +102,57 @@ func (m *Manager) Add(txHash TxHash, ttl uint64) { m.txHashes[txHash] = ttl } -func (m *Manager) Export() any { - panic("not implemented") -} +// // Export returns the current set of unexpired unordered transactions along with +// // their TTL values. +// func (m *Manager) Export() map[TxHash]uint64 { +// m.mu.RLock() +// defer m.mu.RUnlock() +// result := make(map[TxHash]uint64, len(m.txHashes)) +// maps.Copy(m.txHashes, result) + +// return result +// } + +// OnInit must be called when a node starts up. Typically, this should be called +// in an application's constructor, which is called by the server. func (m *Manager) OnInit() error { - panic("not implemented") -} + f, err := os.Open(filepath.Join(m.dataDir, "unordered_txs", "unordered_txs")) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + // File does not exist, which we can assume that there are no unexpired + // unordered transactions. + return nil + } + + return fmt.Errorf("failed to open unconfirmed txs file: %w", err) + } + defer f.Close() + + var ( + r = bufio.NewReader(f) + buf = make([]byte, 32+8) + ) + for { + n, err := r.Read(buf) + if err != nil { + if errors.Is(err, io.EOF) { + break + } else { + return fmt.Errorf("failed to read unconfirmed txs file: %w", err) + } + } + if n != 32+8 { + return fmt.Errorf("read unexpected number of bytes from unconfirmed txs file: %d", n) + } + + var txHash TxHash + copy(txHash[:], buf[:32]) -func (m *Manager) OnClose() error { - panic("not implemented") + m.Add(txHash, binary.BigEndian.Uint64(buf[32:])) + } + + return nil } // OnNewBlock sends the latest block number to the background purge loop, which @@ -92,6 +161,34 @@ func (m *Manager) OnNewBlock(blockHeight uint64) { m.blockCh <- blockHeight } +func (m *Manager) flushToFile() error { + f, err := os.Create(filepath.Join(m.dataDir, "unordered_txs", "unordered_txs")) + if err != nil { + return fmt.Errorf("failed to create unconfirmed txs file: %w", err) + } + defer f.Close() + + w := bufio.NewWriter(f) + for txHash, ttl := range m.txHashes { + buf := make([]byte, 32+8) + copy(buf[:32], txHash[:]) + + ttlBz := make([]byte, 8) + binary.BigEndian.PutUint64(ttlBz, ttl) + copy(buf[32:], ttlBz) + + if _, err = w.Write(buf); err != nil { + return fmt.Errorf("failed to write buffer to unconfirmed txs file: %w", err) + } + } + + if err = w.Flush(); err != nil { + return fmt.Errorf("failed to flush buffer to unconfirmed txs file: %w", err) + } + + return nil +} + // expiredTxs returns expired tx hashes based on the provided block height. func (m *Manager) expiredTxs(blockHeight uint64) []TxHash { m.mu.RLock() diff --git a/x/auth/ante/unorderedtx/manager_test.go b/x/auth/ante/unorderedtx/manager_test.go index caf03a3c269b..04138e344657 100644 --- a/x/auth/ante/unorderedtx/manager_test.go +++ b/x/auth/ante/unorderedtx/manager_test.go @@ -10,7 +10,7 @@ import ( ) func TestUnorderedTxManager_Close(t *testing.T) { - txm := unorderedtx.NewManager() + txm := unorderedtx.NewManager(t.TempDir()) txm.Start() require.NoError(t, txm.Close()) @@ -18,8 +18,10 @@ func TestUnorderedTxManager_Close(t *testing.T) { } func TestUnorderedTxManager_SimpleSize(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() @@ -31,8 +33,10 @@ func TestUnorderedTxManager_SimpleSize(t *testing.T) { } func TestUnorderedTxManager_SimpleContains(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() @@ -48,9 +52,51 @@ func TestUnorderedTxManager_SimpleContains(t *testing.T) { } } +func TestUnorderedTxManager_InitEmpty(t *testing.T) { + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() + + txm.Start() + + require.NoError(t, txm.OnInit()) +} + +func TestUnorderedTxManager_CloseInit(t *testing.T) { + dataDir := t.TempDir() + txm := unorderedtx.NewManager(dataDir) + txm.Start() + + // add a handful of unordered txs + for i := 0; i < 100; i++ { + txm.Add([32]byte{byte(i)}, 100) + } + + // close the manager, which should flush all unexpired txs to file + require.NoError(t, txm.Close()) + + // create a new manager, start it + txm2 := unorderedtx.NewManager(dataDir) + defer func() { + require.NoError(t, txm2.Close()) + }() + + // start and execute OnInit, which should load the unexpired txs from file + txm2.Start() + require.NoError(t, txm2.OnInit()) + require.Equal(t, 100, txm2.Size()) + + for i := 0; i < 100; i++ { + require.True(t, txm2.Contains([32]byte{byte(i)})) + } +} + func TestUnorderedTxManager_Flow(t *testing.T) { - txm := unorderedtx.NewManager() - defer txm.Close() + txm := unorderedtx.NewManager(t.TempDir()) + defer func() { + require.NoError(t, txm.Close()) + }() txm.Start() From 9b44960417b8ce786654175573581db338bdc945 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Wed, 13 Dec 2023 16:54:46 -0800 Subject: [PATCH 04/25] updates --- x/auth/ante/unorderedtx/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x/auth/ante/unorderedtx/manager.go b/x/auth/ante/unorderedtx/manager.go index dafa53bee856..afd066325fce 100644 --- a/x/auth/ante/unorderedtx/manager.go +++ b/x/auth/ante/unorderedtx/manager.go @@ -33,7 +33,7 @@ type Manager struct { // dataDir defines the directory to store unexpired unordered transactions // // XXX: Note, ideally we avoid the need to store unexpired unordered transactions - // directly to file. However, store v1 does not allow such a primitive. However, + // directly to file. However, store v1 does not allow such a primitive. But, // once store v2 is fully integrated, we can remove manual file handling and // store the unexpired unordered transactions directly to SS. // From 6e1a342238801544865b4bb166b9103b41eb760f Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Wed, 13 Dec 2023 16:55:57 -0800 Subject: [PATCH 05/25] updates --- x/auth/ante/unorderedtx/manager.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/x/auth/ante/unorderedtx/manager.go b/x/auth/ante/unorderedtx/manager.go index afd066325fce..21ec6bb1051f 100644 --- a/x/auth/ante/unorderedtx/manager.go +++ b/x/auth/ante/unorderedtx/manager.go @@ -17,6 +17,9 @@ const ( // DefaultMaxUnOrderedTTL defines the default maximum TTL an un-ordered transaction // can set. DefaultMaxUnOrderedTTL = 1024 + + dirName = "unordered_txs" + fileName = "data" ) // TxHash defines a transaction hash type alias, which is a fixed array of 32 bytes. @@ -48,7 +51,7 @@ type Manager struct { } func NewManager(dataDir string) *Manager { - path := filepath.Join(dataDir, "unordered_txs") + path := filepath.Join(dataDir, dirName) if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { _ = os.Mkdir(path, os.ModePerm) } @@ -117,7 +120,7 @@ func (m *Manager) Add(txHash TxHash, ttl uint64) { // OnInit must be called when a node starts up. Typically, this should be called // in an application's constructor, which is called by the server. func (m *Manager) OnInit() error { - f, err := os.Open(filepath.Join(m.dataDir, "unordered_txs", "unordered_txs")) + f, err := os.Open(filepath.Join(m.dataDir, dirName, fileName)) if err != nil { if errors.Is(err, os.ErrNotExist) { // File does not exist, which we can assume that there are no unexpired @@ -162,7 +165,7 @@ func (m *Manager) OnNewBlock(blockHeight uint64) { } func (m *Manager) flushToFile() error { - f, err := os.Create(filepath.Join(m.dataDir, "unordered_txs", "unordered_txs")) + f, err := os.Create(filepath.Join(m.dataDir, dirName, fileName)) if err != nil { return fmt.Errorf("failed to create unconfirmed txs file: %w", err) } From 3bd1e62f36e27773f6edb521f7ba21f5e50a2464 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Wed, 13 Dec 2023 16:56:36 -0800 Subject: [PATCH 06/25] updates --- x/auth/ante/unorderedtx/manager.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x/auth/ante/unorderedtx/manager.go b/x/auth/ante/unorderedtx/manager.go index 21ec6bb1051f..669560bda3bb 100644 --- a/x/auth/ante/unorderedtx/manager.go +++ b/x/auth/ante/unorderedtx/manager.go @@ -164,6 +164,8 @@ func (m *Manager) OnNewBlock(blockHeight uint64) { m.blockCh <- blockHeight } +// flushToFile writes all unexpired unordered transactions along with their TTL +// to file, overwriting the existing file if it exists. func (m *Manager) flushToFile() error { f, err := os.Create(filepath.Join(m.dataDir, dirName, fileName)) if err != nil { From d311087b2676d6a703c6fccefe16e2a9b4c27a2c Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Sat, 16 Dec 2023 08:30:51 -0800 Subject: [PATCH 07/25] updates --- x/auth/ante/unorderedtx/manager.go | 41 ++++++++++++++++++-------- x/auth/ante/unorderedtx/snapshotter.go | 31 +++++++++++++------ 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/x/auth/ante/unorderedtx/manager.go b/x/auth/ante/unorderedtx/manager.go index 669560bda3bb..f4fdaf047d40 100644 --- a/x/auth/ante/unorderedtx/manager.go +++ b/x/auth/ante/unorderedtx/manager.go @@ -2,6 +2,7 @@ package unorderedtx import ( "bufio" + "bytes" "context" "encoding/binary" "errors" @@ -105,18 +106,6 @@ func (m *Manager) Add(txHash TxHash, ttl uint64) { m.txHashes[txHash] = ttl } -// // Export returns the current set of unexpired unordered transactions along with -// // their TTL values. -// func (m *Manager) Export() map[TxHash]uint64 { -// m.mu.RLock() -// defer m.mu.RUnlock() - -// result := make(map[TxHash]uint64, len(m.txHashes)) -// maps.Copy(m.txHashes, result) - -// return result -// } - // OnInit must be called when a node starts up. Typically, this should be called // in an application's constructor, which is called by the server. func (m *Manager) OnInit() error { @@ -164,6 +153,34 @@ func (m *Manager) OnNewBlock(blockHeight uint64) { m.blockCh <- blockHeight } +func (m *Manager) exportSnapshot(height uint64, snapshotWriter func([]byte) error) error { + var buf bytes.Buffer + w := bufio.NewWriter(&buf) + + for txHash, ttl := range m.txHashes { + if height > ttl { + continue + } + + chunk := make([]byte, 32+8) + copy(chunk[:32], txHash[:]) + + ttlBz := make([]byte, 8) + binary.BigEndian.PutUint64(ttlBz, ttl) + copy(chunk[32:], ttlBz) + + if _, err := w.Write(chunk); err != nil { + return fmt.Errorf("failed to write unconfirmed tx to buffer: %w", err) + } + } + + if err := w.Flush(); err != nil { + return fmt.Errorf("failed to flush unconfirmed txs buffer: %w", err) + } + + return snapshotWriter(buf.Bytes()) +} + // flushToFile writes all unexpired unordered transactions along with their TTL // to file, overwriting the existing file if it exists. func (m *Manager) flushToFile() error { diff --git a/x/auth/ante/unorderedtx/snapshotter.go b/x/auth/ante/unorderedtx/snapshotter.go index 6b4b0edbbcb1..42aa56e9b122 100644 --- a/x/auth/ante/unorderedtx/snapshotter.go +++ b/x/auth/ante/unorderedtx/snapshotter.go @@ -6,30 +6,43 @@ import ( var _ snapshot.ExtensionSnapshotter = &Snapshotter{} -// SnapshotFormat defines the snapshot format of exported unordered transactions. -// No protobuf envelope, no metadata. -const SnapshotFormat = 1 +const ( + // SnapshotFormat defines the snapshot format of exported unordered transactions. + // No protobuf envelope, no metadata. + SnapshotFormat = 1 + + // SnapshotName defines the snapshot name of exported unordered transactions. + SnapshotName = "unordered_txs" +) type Snapshotter struct { m *Manager } +func NewSnapshotter(m *Manager) *Snapshotter { + return &Snapshotter{m: m} +} + func (s *Snapshotter) SnapshotName() string { - panic("not implemented!") + return SnapshotName } func (s *Snapshotter) SnapshotFormat() uint32 { - panic("not implemented!") + return SnapshotFormat } func (s *Snapshotter) SupportedFormats() []uint32 { - panic("not implemented!") + return []uint32{SnapshotFormat} } func (s *Snapshotter) SnapshotExtension(height uint64, payloadWriter snapshot.ExtensionPayloadWriter) error { - panic("not implemented!") + return s.m.exportSnapshot(height, payloadWriter) } -func (s *Snapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshot.ExtensionPayloadReader) error { - panic("not implemented!") +func (s *Snapshotter) RestoreExtension(_ uint64, format uint32, payloadReader snapshot.ExtensionPayloadReader) error { + if format == SnapshotFormat { + return s.restore(payloadReader) + } + + return snapshot.ErrUnknownFormat } From ae5945abee2aa536db50515c17cdb40c4de8d283 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Sat, 16 Dec 2023 08:33:26 -0800 Subject: [PATCH 08/25] updates --- x/auth/ante/unorderedtx/manager.go | 32 ++++++++++++++++-------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/x/auth/ante/unorderedtx/manager.go b/x/auth/ante/unorderedtx/manager.go index f4fdaf047d40..dd2be5b2b2ae 100644 --- a/x/auth/ante/unorderedtx/manager.go +++ b/x/auth/ante/unorderedtx/manager.go @@ -159,15 +159,11 @@ func (m *Manager) exportSnapshot(height uint64, snapshotWriter func([]byte) erro for txHash, ttl := range m.txHashes { if height > ttl { + // skip expired txs that have yet to be purged continue } - chunk := make([]byte, 32+8) - copy(chunk[:32], txHash[:]) - - ttlBz := make([]byte, 8) - binary.BigEndian.PutUint64(ttlBz, ttl) - copy(chunk[32:], ttlBz) + chunk := unconfirmedTxToBytes(txHash, ttl) if _, err := w.Write(chunk); err != nil { return fmt.Errorf("failed to write unconfirmed tx to buffer: %w", err) @@ -192,20 +188,15 @@ func (m *Manager) flushToFile() error { w := bufio.NewWriter(f) for txHash, ttl := range m.txHashes { - buf := make([]byte, 32+8) - copy(buf[:32], txHash[:]) - - ttlBz := make([]byte, 8) - binary.BigEndian.PutUint64(ttlBz, ttl) - copy(buf[32:], ttlBz) + chunk := unconfirmedTxToBytes(txHash, ttl) - if _, err = w.Write(buf); err != nil { - return fmt.Errorf("failed to write buffer to unconfirmed txs file: %w", err) + if _, err = w.Write(chunk); err != nil { + return fmt.Errorf("failed to write unconfirmed tx to buffer: %w", err) } } if err = w.Flush(); err != nil { - return fmt.Errorf("failed to flush buffer to unconfirmed txs file: %w", err) + return fmt.Errorf("failed to flush unconfirmed txs buffer: %w", err) } return nil @@ -273,3 +264,14 @@ func (m *Manager) batchReceive() (uint64, bool) { } } } + +func unconfirmedTxToBytes(txHash TxHash, ttl uint64) []byte { + chunk := make([]byte, 32+8) + copy(chunk[:32], txHash[:]) + + ttlBz := make([]byte, 8) + binary.BigEndian.PutUint64(ttlBz, ttl) + copy(chunk[32:], ttlBz) + + return chunk +} From 349ef2b35296efced1aebba633b6555d63319a81 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Sat, 16 Dec 2023 08:43:46 -0800 Subject: [PATCH 09/25] updates --- x/auth/ante/unorderedtx/manager.go | 16 ++++++------- x/auth/ante/unorderedtx/snapshotter.go | 32 ++++++++++++++++++++++++-- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/x/auth/ante/unorderedtx/manager.go b/x/auth/ante/unorderedtx/manager.go index dd2be5b2b2ae..f6bbcd8aac48 100644 --- a/x/auth/ante/unorderedtx/manager.go +++ b/x/auth/ante/unorderedtx/manager.go @@ -163,15 +163,15 @@ func (m *Manager) exportSnapshot(height uint64, snapshotWriter func([]byte) erro continue } - chunk := unconfirmedTxToBytes(txHash, ttl) + chunk := unorderedTxToBytes(txHash, ttl) if _, err := w.Write(chunk); err != nil { - return fmt.Errorf("failed to write unconfirmed tx to buffer: %w", err) + return fmt.Errorf("failed to write unordered tx to buffer: %w", err) } } if err := w.Flush(); err != nil { - return fmt.Errorf("failed to flush unconfirmed txs buffer: %w", err) + return fmt.Errorf("failed to flush unordered txs buffer: %w", err) } return snapshotWriter(buf.Bytes()) @@ -182,21 +182,21 @@ func (m *Manager) exportSnapshot(height uint64, snapshotWriter func([]byte) erro func (m *Manager) flushToFile() error { f, err := os.Create(filepath.Join(m.dataDir, dirName, fileName)) if err != nil { - return fmt.Errorf("failed to create unconfirmed txs file: %w", err) + return fmt.Errorf("failed to create unordered txs file: %w", err) } defer f.Close() w := bufio.NewWriter(f) for txHash, ttl := range m.txHashes { - chunk := unconfirmedTxToBytes(txHash, ttl) + chunk := unorderedTxToBytes(txHash, ttl) if _, err = w.Write(chunk); err != nil { - return fmt.Errorf("failed to write unconfirmed tx to buffer: %w", err) + return fmt.Errorf("failed to write unordered tx to buffer: %w", err) } } if err = w.Flush(); err != nil { - return fmt.Errorf("failed to flush unconfirmed txs buffer: %w", err) + return fmt.Errorf("failed to flush unordered txs buffer: %w", err) } return nil @@ -265,7 +265,7 @@ func (m *Manager) batchReceive() (uint64, bool) { } } -func unconfirmedTxToBytes(txHash TxHash, ttl uint64) []byte { +func unorderedTxToBytes(txHash TxHash, ttl uint64) []byte { chunk := make([]byte, 32+8) copy(chunk[:32], txHash[:]) diff --git a/x/auth/ante/unorderedtx/snapshotter.go b/x/auth/ante/unorderedtx/snapshotter.go index 42aa56e9b122..5b146345e624 100644 --- a/x/auth/ante/unorderedtx/snapshotter.go +++ b/x/auth/ante/unorderedtx/snapshotter.go @@ -1,6 +1,10 @@ package unorderedtx import ( + "encoding/binary" + "errors" + "io" + snapshot "cosmossdk.io/store/snapshots/types" ) @@ -36,13 +40,37 @@ func (s *Snapshotter) SupportedFormats() []uint32 { } func (s *Snapshotter) SnapshotExtension(height uint64, payloadWriter snapshot.ExtensionPayloadWriter) error { + // export all unordered transactions as a single blob return s.m.exportSnapshot(height, payloadWriter) } -func (s *Snapshotter) RestoreExtension(_ uint64, format uint32, payloadReader snapshot.ExtensionPayloadReader) error { +func (s *Snapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshot.ExtensionPayloadReader) error { if format == SnapshotFormat { - return s.restore(payloadReader) + return s.restore(height, payloadReader) } return snapshot.ErrUnknownFormat } + +func (s *Snapshotter) restore(height uint64, payloadReader snapshot.ExtensionPayloadReader) error { + // the payload should be the entire set of unordered transactions + payload, err := payloadReader() + if err != nil { + if !errors.Is(err, io.EOF) { + return err + } + } + + var i int + for i < len(payload) { + var txHash TxHash + copy(txHash[:], payload[i:i+32]) + + ttl := binary.BigEndian.Uint64(payload[i+32 : i+40]) + s.m.Add(txHash, ttl) + + i += 40 + } + + return nil +} From a1c49cd576a3cdadb721552623268dac62f8d509 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Sat, 16 Dec 2023 12:02:49 -0800 Subject: [PATCH 10/25] updates --- x/auth/ante/unorderedtx/snapshotter_test.go | 35 +++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 x/auth/ante/unorderedtx/snapshotter_test.go diff --git a/x/auth/ante/unorderedtx/snapshotter_test.go b/x/auth/ante/unorderedtx/snapshotter_test.go new file mode 100644 index 000000000000..7dc5f2181c71 --- /dev/null +++ b/x/auth/ante/unorderedtx/snapshotter_test.go @@ -0,0 +1,35 @@ +package unorderedtx_test + +import ( + "testing" + + "cosmossdk.io/x/auth/ante/unorderedtx" + "github.com/stretchr/testify/require" +) + +func TestSnapshotter_SnapshotExtension(t *testing.T) { + dataDir := t.TempDir() + txm := unorderedtx.NewManager(dataDir) + txm.Start() + + defer txm.Close() + + // add a handful of unordered txs + for i := 0; i < 100; i++ { + txm.Add([32]byte{byte(i)}, 100) + } + + var unorderedTxBz []byte + s := unorderedtx.NewSnapshotter(txm) + w := func(bz []byte) error { + unorderedTxBz = bz + return nil + } + + err := s.SnapshotExtension(50, w) + require.NoError(t, err) + require.NotEmpty(t, unorderedTxBz) +} + +func TestSnapshotter_RestoreExtension(t *testing.T) { +} From 80e25fc77d20dff9c11294e3090aed9263f3a9f8 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Mon, 18 Dec 2023 09:56:30 -0800 Subject: [PATCH 11/25] updates --- x/auth/ante/unordered.go | 10 ++++--- x/auth/ante/unorderedtx/manager.go | 1 + x/auth/ante/unorderedtx/snapshotter.go | 6 +++- x/auth/ante/unorderedtx/snapshotter_test.go | 32 +++++++++++++++++---- 4 files changed, 38 insertions(+), 11 deletions(-) diff --git a/x/auth/ante/unordered.go b/x/auth/ante/unordered.go index 9e0a933dd703..d3ce6d230b07 100644 --- a/x/auth/ante/unordered.go +++ b/x/auth/ante/unordered.go @@ -45,11 +45,13 @@ func (d *UnorderedTxDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, simulate b return next(ctx, tx, simulate) } - if unorderedTx.GetTimeoutHeight() == 0 { + // TTL is defined as a specific block height at which this tx is no longer valid + ttl := unorderedTx.GetTimeoutHeight() + + if ttl == 0 { return ctx, errorsmod.Wrap(sdkerrors.ErrInvalidRequest, "unordered transaction must have timeout_height set") } - - if unorderedTx.GetTimeoutHeight() > uint64(ctx.BlockHeight())+d.maxUnOrderedTTL { + if ttl > uint64(ctx.BlockHeight())+d.maxUnOrderedTTL { return ctx, errorsmod.Wrapf(sdkerrors.ErrInvalidRequest, "unordered tx ttl exceeds %d", d.maxUnOrderedTTL) } @@ -62,7 +64,7 @@ func (d *UnorderedTxDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, simulate b if ctx.ExecMode() == sdk.ExecModeFinalize { // a new tx included in the block, add the hash to the unordered tx manager - d.txManager.Add(txHash, unorderedTx.GetTimeoutHeight()) + d.txManager.Add(txHash, ttl) } return next(ctx, tx, simulate) diff --git a/x/auth/ante/unorderedtx/manager.go b/x/auth/ante/unorderedtx/manager.go index f6bbcd8aac48..779fcca21627 100644 --- a/x/auth/ante/unorderedtx/manager.go +++ b/x/auth/ante/unorderedtx/manager.go @@ -73,6 +73,7 @@ func (m *Manager) Start() { // Close must be called when a node gracefully shuts down. Typically, this should // be called in an application's Close() function, which is called by the server. +// Note, Start() must be called in order for Close() to not hang. // // It will free all necessary resources as well as writing all unexpired unordered // transactions along with their TTL values to file. diff --git a/x/auth/ante/unorderedtx/snapshotter.go b/x/auth/ante/unorderedtx/snapshotter.go index 5b146345e624..58a9ad04c3ef 100644 --- a/x/auth/ante/unorderedtx/snapshotter.go +++ b/x/auth/ante/unorderedtx/snapshotter.go @@ -67,7 +67,11 @@ func (s *Snapshotter) restore(height uint64, payloadReader snapshot.ExtensionPay copy(txHash[:], payload[i:i+32]) ttl := binary.BigEndian.Uint64(payload[i+32 : i+40]) - s.m.Add(txHash, ttl) + + if height < ttl { + // only add unordered transactions that are still valid, i.e. unexpired + s.m.Add(txHash, ttl) + } i += 40 } diff --git a/x/auth/ante/unorderedtx/snapshotter_test.go b/x/auth/ante/unorderedtx/snapshotter_test.go index 7dc5f2181c71..23ed14fadbb8 100644 --- a/x/auth/ante/unorderedtx/snapshotter_test.go +++ b/x/auth/ante/unorderedtx/snapshotter_test.go @@ -7,12 +7,9 @@ import ( "github.com/stretchr/testify/require" ) -func TestSnapshotter_SnapshotExtension(t *testing.T) { +func TestSnapshotter(t *testing.T) { dataDir := t.TempDir() txm := unorderedtx.NewManager(dataDir) - txm.Start() - - defer txm.Close() // add a handful of unordered txs for i := 0; i < 100; i++ { @@ -29,7 +26,30 @@ func TestSnapshotter_SnapshotExtension(t *testing.T) { err := s.SnapshotExtension(50, w) require.NoError(t, err) require.NotEmpty(t, unorderedTxBz) -} -func TestSnapshotter_RestoreExtension(t *testing.T) { + pr := func() ([]byte, error) { + return unorderedTxBz, nil + } + + // restore with an invalid format which should result in an error + err = s.RestoreExtension(50, 2, pr) + require.Error(t, err) + + // restore with height > ttl which should result in no unordered txs synced + txm2 := unorderedtx.NewManager(dataDir) + s2 := unorderedtx.NewSnapshotter(txm2) + err = s2.RestoreExtension(200, unorderedtx.SnapshotFormat, pr) + require.NoError(t, err) + require.Empty(t, txm2.Size()) + + // restore with with height < ttl which should result in all unordered txs synced + txm3 := unorderedtx.NewManager(dataDir) + s3 := unorderedtx.NewSnapshotter(txm3) + err = s3.RestoreExtension(50, unorderedtx.SnapshotFormat, pr) + require.NoError(t, err) + require.Equal(t, 100, txm3.Size()) + + for i := 0; i < 100; i++ { + require.True(t, txm3.Contains([32]byte{byte(i)})) + } } From d70489163994f5c073d712323293dacf6bf3943e Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Mon, 18 Dec 2023 10:12:30 -0800 Subject: [PATCH 12/25] updates --- simapp/app_v2.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/simapp/app_v2.go b/simapp/app_v2.go index 172e6e7af408..77a26e22a60c 100644 --- a/simapp/app_v2.go +++ b/simapp/app_v2.go @@ -3,16 +3,19 @@ package simapp import ( + "fmt" "io" "os" "path/filepath" dbm "github.com/cosmos/cosmos-db" + "github.com/spf13/cast" "cosmossdk.io/depinject" "cosmossdk.io/log" storetypes "cosmossdk.io/store/types" "cosmossdk.io/x/auth" + "cosmossdk.io/x/auth/ante/unorderedtx" authkeeper "cosmossdk.io/x/auth/keeper" authsims "cosmossdk.io/x/auth/simulation" authtypes "cosmossdk.io/x/auth/types" @@ -34,6 +37,7 @@ import ( "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/client/flags" "github.com/cosmos/cosmos-sdk/codec" codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/runtime" @@ -64,6 +68,8 @@ type SimApp struct { txConfig client.TxConfig interfaceRegistry codectypes.InterfaceRegistry + UnorderedTxManager *unorderedtx.Manager + // keepers AuthKeeper authkeeper.AccountKeeper BankKeeper bankkeeper.Keeper @@ -256,6 +262,24 @@ func NewSimApp( // return app.App.InitChainer(ctx, req) // }) + dataDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data") + app.UnorderedTxManager = unorderedtx.NewManager(dataDir) + app.UnorderedTxManager.Start() + + if err := app.UnorderedTxManager.OnInit(); err != nil { + panic(fmt.Errorf("failed to initialize unordered tx manager: %w", err)) + } + + // register custom snapshot extensions (if any) + // if manager := app.SnapshotManager(); manager != nil { + // err := manager.RegisterExtensions( + // unorderedtx.NewSnapshotter(app.UnorderedTxManager), + // ) + // if err != nil { + // panic(fmt.Errorf("failed to register snapshot extension: %s", err)) + // } + // } + if err := app.Load(loadLatest); err != nil { panic(err) } @@ -263,6 +287,12 @@ func NewSimApp( return app } +// Close implements the Application interface and closes all necessary application +// resources. +func (app *SimApp) Close() error { + return app.UnorderedTxManager.Close() +} + // LegacyAmino returns SimApp's amino codec. // // NOTE: This is solely to be used for testing purposes as it may be desirable From ae621b827c9a8e274094b3dee1f219c93c1315d4 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Mon, 18 Dec 2023 10:13:13 -0800 Subject: [PATCH 13/25] updates --- simapp/app_v2.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/simapp/app_v2.go b/simapp/app_v2.go index 77a26e22a60c..adf6ebd801f6 100644 --- a/simapp/app_v2.go +++ b/simapp/app_v2.go @@ -262,8 +262,8 @@ func NewSimApp( // return app.App.InitChainer(ctx, req) // }) - dataDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data") - app.UnorderedTxManager = unorderedtx.NewManager(dataDir) + utxDataDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data") + app.UnorderedTxManager = unorderedtx.NewManager(utxDataDir) app.UnorderedTxManager.Start() if err := app.UnorderedTxManager.OnInit(); err != nil { From 403bbbe9af0f92ef419fd90740f3c18f83df5d36 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Mon, 18 Dec 2023 10:20:35 -0800 Subject: [PATCH 14/25] updates --- simapp/app.go | 28 +++++++++++++++++++++ simapp/app_v2.go | 1 + x/auth/ante/unorderedtx/manager.go | 9 ++++++- x/auth/ante/unorderedtx/snapshotter_test.go | 3 ++- 4 files changed, 39 insertions(+), 2 deletions(-) diff --git a/simapp/app.go b/simapp/app.go index 4bffaaa09a7a..e19b943e2cd9 100644 --- a/simapp/app.go +++ b/simapp/app.go @@ -26,6 +26,7 @@ import ( "cosmossdk.io/x/accounts/testing/counter" "cosmossdk.io/x/auth" "cosmossdk.io/x/auth/ante" + "cosmossdk.io/x/auth/ante/unorderedtx" authcodec "cosmossdk.io/x/auth/codec" authkeeper "cosmossdk.io/x/auth/keeper" "cosmossdk.io/x/auth/posthandler" @@ -169,6 +170,8 @@ type SimApp struct { ModuleManager *module.Manager BasicModuleManager module.BasicManager + UnorderedTxManager *unorderedtx.Manager + // simulation manager sm *module.SimulationManager @@ -519,6 +522,25 @@ func NewSimApp( } app.sm = module.NewSimulationManagerFromAppModules(app.ModuleManager.Modules, overrideModules) + // create, start, and load the unordered tx manager + utxDataDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data") + app.UnorderedTxManager = unorderedtx.NewManager(utxDataDir) + app.UnorderedTxManager.Start() + + if err := app.UnorderedTxManager.OnInit(); err != nil { + panic(fmt.Errorf("failed to initialize unordered tx manager: %w", err)) + } + + // register custom snapshot extensions (if any) + // if manager := app.SnapshotManager(); manager != nil { + // err := manager.RegisterExtensions( + // unorderedtx.NewSnapshotter(app.UnorderedTxManager), + // ) + // if err != nil { + // panic(fmt.Errorf("failed to register snapshot extension: %s", err)) + // } + // } + app.sm.RegisterStoreDecoders() // initialize stores @@ -600,6 +622,12 @@ func (app *SimApp) setPostHandler() { app.SetPostHandler(postHandler) } +// Close implements the Application interface and closes all necessary application +// resources. +func (app *SimApp) Close() error { + return app.UnorderedTxManager.Close() +} + // Name returns the name of the App func (app *SimApp) Name() string { return app.BaseApp.Name() } diff --git a/simapp/app_v2.go b/simapp/app_v2.go index adf6ebd801f6..c494ba420fd5 100644 --- a/simapp/app_v2.go +++ b/simapp/app_v2.go @@ -262,6 +262,7 @@ func NewSimApp( // return app.App.InitChainer(ctx, req) // }) + // create, start, and load the unordered tx manager utxDataDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data") app.UnorderedTxManager = unorderedtx.NewManager(utxDataDir) app.UnorderedTxManager.Start() diff --git a/x/auth/ante/unorderedtx/manager.go b/x/auth/ante/unorderedtx/manager.go index 779fcca21627..f61b6a8fcba0 100644 --- a/x/auth/ante/unorderedtx/manager.go +++ b/x/auth/ante/unorderedtx/manager.go @@ -10,8 +10,11 @@ import ( "io" "os" "path/filepath" + "sort" "sync" "time" + + "golang.org/x/exp/maps" ) const ( @@ -158,7 +161,11 @@ func (m *Manager) exportSnapshot(height uint64, snapshotWriter func([]byte) erro var buf bytes.Buffer w := bufio.NewWriter(&buf) - for txHash, ttl := range m.txHashes { + keys := maps.Keys(m.txHashes) + sort.Slice(keys, func(i, j int) bool { return bytes.Compare(keys[i][:], keys[j][:]) < 0 }) + + for _, txHash := range keys { + ttl := m.txHashes[txHash] if height > ttl { // skip expired txs that have yet to be purged continue diff --git a/x/auth/ante/unorderedtx/snapshotter_test.go b/x/auth/ante/unorderedtx/snapshotter_test.go index 23ed14fadbb8..1645fbb90677 100644 --- a/x/auth/ante/unorderedtx/snapshotter_test.go +++ b/x/auth/ante/unorderedtx/snapshotter_test.go @@ -3,8 +3,9 @@ package unorderedtx_test import ( "testing" - "cosmossdk.io/x/auth/ante/unorderedtx" "github.com/stretchr/testify/require" + + "cosmossdk.io/x/auth/ante/unorderedtx" ) func TestSnapshotter(t *testing.T) { From 232ef9944963a4af50d45546de7f4222bfae8116 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 19 Dec 2023 09:18:02 -0800 Subject: [PATCH 15/25] updates --- x/auth/ante/unordered.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x/auth/ante/unordered.go b/x/auth/ante/unordered.go index d3ce6d230b07..c110e63650ce 100644 --- a/x/auth/ante/unordered.go +++ b/x/auth/ante/unordered.go @@ -51,6 +51,9 @@ func (d *UnorderedTxDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, simulate b if ttl == 0 { return ctx, errorsmod.Wrap(sdkerrors.ErrInvalidRequest, "unordered transaction must have timeout_height set") } + if ttl < uint64(ctx.BlockHeight()) { + return ctx, errorsmod.Wrap(sdkerrors.ErrInvalidRequest, "unordered transaction has a timeout_height that has already passed") + } if ttl > uint64(ctx.BlockHeight())+d.maxUnOrderedTTL { return ctx, errorsmod.Wrapf(sdkerrors.ErrInvalidRequest, "unordered tx ttl exceeds %d", d.maxUnOrderedTTL) } From 34f519e821eaf56b2f12c2bbe162727b059cb7d1 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 19 Dec 2023 09:46:06 -0800 Subject: [PATCH 16/25] updates --- UPGRADING.md | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++ simapp/ante.go | 3 +++ simapp/app.go | 1 + 3 files changed, 68 insertions(+) diff --git a/UPGRADING.md b/UPGRADING.md index 5e52d8b56ba6..4d0a2fe0e096 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -5,6 +5,70 @@ Note, always read the **SimApp** section for more information on application wir ## [Unreleased] +### Unordered Transactions + +The Cosmos SDK now supports unordered transactions. This means that transactions +can be executed in any order and doesn't require the client to deal with or manage +nonces. This also means the order of execution is not guaranteed. To enable unordered +transactions in your application: + +* Update the `App` constructor to create, load, and save the unordered transaction + manager. + + ```go + func NewApp(...) *App { + // ... + + // create, start, and load the unordered tx manager + utxDataDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data") + app.UnorderedTxManager = unorderedtx.NewManager(utxDataDir) + app.UnorderedTxManager.Start() + + if err := app.UnorderedTxManager.OnInit(); err != nil { + panic(fmt.Errorf("failed to initialize unordered tx manager: %w", err)) + } + } + ``` + +* Add the decorator to the existing AnteHandler chain, which should be as early + as possible. + + ```go + anteDecorators := []sdk.AnteDecorator{ + ante.NewSetUpContextDecorator(), + // ... + ante.NewUnorderedTxDecorator(unorderedtx.DefaultMaxUnOrderedTTL, app.UnorderedTxManager), + // ... + } + + return sdk.ChainAnteDecorators(anteDecorators...), nil + ``` + +* Create or update the App's `Close()` method to close the unordered tx manager. + Note, this is critical as it ensures the manager's state is written to file + such that when the node restarts, it can recover the state to provide replay + protection. + + ```go + func (app *App) Close() error { + // ... + + // close the unordered tx manager + if e := app.UnorderedTxManager.Close(); e != nil { + err = errors.Join(err, e) + } + + return err + } + ``` + +To submit an unordered transaction, the client must set the `unordered` flag to +`true` and ensure a reasonable `timeout_height` is set. The `timeout_height` is +used as a TTL for the transaction and is used to provide replay protection. See +[ADR-070](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-070-unordered-account.md) +for more details. + + ### Params * Params Migrations were removed. It is required to migrate to 0.50 prior to upgrading to .51. diff --git a/simapp/ante.go b/simapp/ante.go index 918244a81405..8ff82263566d 100644 --- a/simapp/ante.go +++ b/simapp/ante.go @@ -4,6 +4,7 @@ import ( "errors" "cosmossdk.io/x/auth/ante" + "cosmossdk.io/x/auth/ante/unorderedtx" circuitante "cosmossdk.io/x/circuit/ante" sdk "github.com/cosmos/cosmos-sdk/types" @@ -13,6 +14,7 @@ import ( type HandlerOptions struct { ante.HandlerOptions CircuitKeeper circuitante.CircuitBreaker + TxManager *unorderedtx.Manager } // NewAnteHandler returns an AnteHandler that checks and increments sequence @@ -37,6 +39,7 @@ func NewAnteHandler(options HandlerOptions) (sdk.AnteHandler, error) { ante.NewExtensionOptionsDecorator(options.ExtensionOptionChecker), ante.NewValidateBasicDecorator(), ante.NewTxTimeoutHeightDecorator(), + ante.NewUnorderedTxDecorator(unorderedtx.DefaultMaxUnOrderedTTL, options.TxManager), ante.NewValidateMemoDecorator(options.AccountKeeper), ante.NewConsumeGasForTxSizeDecorator(options.AccountKeeper), ante.NewDeductFeeDecorator(options.AccountKeeper, options.BankKeeper, options.FeegrantKeeper, options.TxFeeChecker), diff --git a/simapp/app.go b/simapp/app.go index e19b943e2cd9..ff28043a47a3 100644 --- a/simapp/app.go +++ b/simapp/app.go @@ -601,6 +601,7 @@ func (app *SimApp) setAnteHandler(txConfig client.TxConfig) { SigGasConsumer: ante.DefaultSigVerificationGasConsumer, }, &app.CircuitKeeper, + app.UnorderedTxManager, }, ) if err != nil { From 6e924cc28b32f4bfb5b65dd00a02bde242eb022c Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 19 Dec 2023 09:55:37 -0800 Subject: [PATCH 17/25] ADR++ --- .../architecture/adr-070-unordered-account.md | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/docs/architecture/adr-070-unordered-account.md b/docs/architecture/adr-070-unordered-account.md index ac63a4a40daa..635b8714db3b 100644 --- a/docs/architecture/adr-070-unordered-account.md +++ b/docs/architecture/adr-070-unordered-account.md @@ -278,14 +278,22 @@ func (d *DedupTxDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, simulate bool, Wire the `OnNewBlock` method of `UnorderedTxManager` into the BaseApp's ABCI `Commit` event. -### Start Up +### State Management -On start up, the node needs to re-fill the tx hash dictionary of `UnorderedTxManager` -by scanning `MaxUnOrderedTTL` number of historical blocks for existing un-expired -un-ordered transactions. +On start up, the node needs to ensure the TxManager's state contains all un-expired +transactions that have been committed to the chain. This is critical since if the +state is not properly initialized, the node will not reject duplicate transactions +and thus will not provide replay protection. -An alternative design is to store the tx hash dictionary in kv store, then no need -to warm up on start up. +We propose to write all un-expired unordered transactions from the TxManager's to +file on disk. On start up, the node will read this file and re-populate the TxManager's +map. The write to file will happen when the node gracefully shuts down on `Close()`. + +Note, this is not a perfect solution, in the context of store v1. With store v2, +we can omit explicit file handling altogether and simply write the all the transactions +to non-consensus state, i.e State Storage (SS). + +Alternatively, we can write all the transactions to consensus state. ## Consequences From b685c61c6a67f2bee5386a8a9bc788761fb74662 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 19 Dec 2023 10:00:54 -0800 Subject: [PATCH 18/25] updates --- UPGRADING.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/UPGRADING.md b/UPGRADING.md index 4d0a2fe0e096..a178ce3e1a41 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -46,8 +46,8 @@ transactions in your application: * Create or update the App's `Close()` method to close the unordered tx manager. Note, this is critical as it ensures the manager's state is written to file - such that when the node restarts, it can recover the state to provide replay - protection. + such that when the node restarts, it can recover the state to provide replay + protection. ```go func (app *App) Close() error { From 4e52ffb03e7e0ae5cc64277e9ad0bd43931606d9 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 19 Dec 2023 10:04:01 -0800 Subject: [PATCH 19/25] updates --- UPGRADING.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/UPGRADING.md b/UPGRADING.md index a178ce3e1a41..57cdb25d9dae 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -44,6 +44,18 @@ transactions in your application: return sdk.ChainAnteDecorators(anteDecorators...), nil ``` +* If the App has a SnapshotManager defined, you must also register the extension + for the TxManager. + + ```go + if manager := app.SnapshotManager(); manager != nil { + err := manager.RegisterExtensions(unorderedtx.NewSnapshotter(app.UnorderedTxManager)) + if err != nil { + panic(fmt.Errorf("failed to register snapshot extension: %s", err)) + } + } + ``` + * Create or update the App's `Close()` method to close the unordered tx manager. Note, this is critical as it ensures the manager's state is written to file such that when the node restarts, it can recover the state to provide replay From 1356e5b678b3d3b552c6dadb49864b0d66b30dcf Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 19 Dec 2023 10:04:31 -0800 Subject: [PATCH 20/25] updates --- UPGRADING.md | 1 - 1 file changed, 1 deletion(-) diff --git a/UPGRADING.md b/UPGRADING.md index 57cdb25d9dae..770c18e494da 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -80,7 +80,6 @@ used as a TTL for the transaction and is used to provide replay protection. See [ADR-070](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-070-unordered-account.md) for more details. - ### Params * Params Migrations were removed. It is required to migrate to 0.50 prior to upgrading to .51. From 058569cc9a1226069bd4c7c237f4cf153b2f9272 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 2 Jan 2024 18:07:58 -0500 Subject: [PATCH 21/25] Update docs/architecture/adr-070-unordered-account.md Co-authored-by: yihuang --- docs/architecture/adr-070-unordered-account.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/architecture/adr-070-unordered-account.md b/docs/architecture/adr-070-unordered-account.md index 635b8714db3b..c2d6e382f13b 100644 --- a/docs/architecture/adr-070-unordered-account.md +++ b/docs/architecture/adr-070-unordered-account.md @@ -283,7 +283,7 @@ Wire the `OnNewBlock` method of `UnorderedTxManager` into the BaseApp's ABCI `Co On start up, the node needs to ensure the TxManager's state contains all un-expired transactions that have been committed to the chain. This is critical since if the state is not properly initialized, the node will not reject duplicate transactions -and thus will not provide replay protection. +and thus will not provide replay protection, and will likely get an app hash mismatch error. We propose to write all un-expired unordered transactions from the TxManager's to file on disk. On start up, the node will read this file and re-populate the TxManager's From ec10bc9614c961e8046fdeee7c3dfa70bf484a16 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 2 Jan 2024 15:11:05 -0800 Subject: [PATCH 22/25] updates --- x/auth/ante/unorderedtx/snapshotter.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/x/auth/ante/unorderedtx/snapshotter.go b/x/auth/ante/unorderedtx/snapshotter.go index 58a9ad04c3ef..f4b4021ebf72 100644 --- a/x/auth/ante/unorderedtx/snapshotter.go +++ b/x/auth/ante/unorderedtx/snapshotter.go @@ -56,9 +56,11 @@ func (s *Snapshotter) restore(height uint64, payloadReader snapshot.ExtensionPay // the payload should be the entire set of unordered transactions payload, err := payloadReader() if err != nil { - if !errors.Is(err, io.EOF) { - return err + if errors.Is(err, io.EOF) { + return io.ErrUnexpectedEOF } + + return err } var i int From 31cb216faa31d3f17c87edf71eff637f24ee24b2 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 2 Jan 2024 15:13:59 -0800 Subject: [PATCH 23/25] updates --- x/auth/ante/unorderedtx/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x/auth/ante/unorderedtx/manager.go b/x/auth/ante/unorderedtx/manager.go index f61b6a8fcba0..818a3eddb06f 100644 --- a/x/auth/ante/unorderedtx/manager.go +++ b/x/auth/ante/unorderedtx/manager.go @@ -130,7 +130,7 @@ func (m *Manager) OnInit() error { buf = make([]byte, 32+8) ) for { - n, err := r.Read(buf) + n, err := io.ReadFull(r, buf) if err != nil { if errors.Is(err, io.EOF) { break From 6673f2241489a11b36c5071dd8070dcd1292f26a Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 2 Jan 2024 15:15:38 -0800 Subject: [PATCH 24/25] updates --- simapp/app.go | 16 ++++++++-------- simapp/app_v2.go | 16 ++++++++-------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/simapp/app.go b/simapp/app.go index ff28043a47a3..7007d9a62efe 100644 --- a/simapp/app.go +++ b/simapp/app.go @@ -532,14 +532,14 @@ func NewSimApp( } // register custom snapshot extensions (if any) - // if manager := app.SnapshotManager(); manager != nil { - // err := manager.RegisterExtensions( - // unorderedtx.NewSnapshotter(app.UnorderedTxManager), - // ) - // if err != nil { - // panic(fmt.Errorf("failed to register snapshot extension: %s", err)) - // } - // } + if manager := app.SnapshotManager(); manager != nil { + err := manager.RegisterExtensions( + unorderedtx.NewSnapshotter(app.UnorderedTxManager), + ) + if err != nil { + panic(fmt.Errorf("failed to register snapshot extension: %s", err)) + } + } app.sm.RegisterStoreDecoders() diff --git a/simapp/app_v2.go b/simapp/app_v2.go index c494ba420fd5..0c4ea6f63ac8 100644 --- a/simapp/app_v2.go +++ b/simapp/app_v2.go @@ -272,14 +272,14 @@ func NewSimApp( } // register custom snapshot extensions (if any) - // if manager := app.SnapshotManager(); manager != nil { - // err := manager.RegisterExtensions( - // unorderedtx.NewSnapshotter(app.UnorderedTxManager), - // ) - // if err != nil { - // panic(fmt.Errorf("failed to register snapshot extension: %s", err)) - // } - // } + if manager := app.SnapshotManager(); manager != nil { + err := manager.RegisterExtensions( + unorderedtx.NewSnapshotter(app.UnorderedTxManager), + ) + if err != nil { + panic(fmt.Errorf("failed to register snapshot extension: %s", err)) + } + } if err := app.Load(loadLatest); err != nil { panic(err) From 9e6e5d6fff19a5e77e32d331344d53083537c6b8 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 2 Jan 2024 15:20:46 -0800 Subject: [PATCH 25/25] updates --- x/auth/ante/unorderedtx/manager.go | 14 +++++++------- x/auth/ante/unorderedtx/snapshotter.go | 16 +++++++++++++--- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/x/auth/ante/unorderedtx/manager.go b/x/auth/ante/unorderedtx/manager.go index 818a3eddb06f..14fbe018b83b 100644 --- a/x/auth/ante/unorderedtx/manager.go +++ b/x/auth/ante/unorderedtx/manager.go @@ -127,7 +127,7 @@ func (m *Manager) OnInit() error { var ( r = bufio.NewReader(f) - buf = make([]byte, 32+8) + buf = make([]byte, chunkSize) ) for { n, err := io.ReadFull(r, buf) @@ -143,9 +143,9 @@ func (m *Manager) OnInit() error { } var txHash TxHash - copy(txHash[:], buf[:32]) + copy(txHash[:], buf[:txHashSize]) - m.Add(txHash, binary.BigEndian.Uint64(buf[32:])) + m.Add(txHash, binary.BigEndian.Uint64(buf[txHashSize:])) } return nil @@ -274,12 +274,12 @@ func (m *Manager) batchReceive() (uint64, bool) { } func unorderedTxToBytes(txHash TxHash, ttl uint64) []byte { - chunk := make([]byte, 32+8) - copy(chunk[:32], txHash[:]) + chunk := make([]byte, chunkSize) + copy(chunk[:txHashSize], txHash[:]) - ttlBz := make([]byte, 8) + ttlBz := make([]byte, ttlSize) binary.BigEndian.PutUint64(ttlBz, ttl) - copy(chunk[32:], ttlBz) + copy(chunk[txHashSize:], ttlBz) return chunk } diff --git a/x/auth/ante/unorderedtx/snapshotter.go b/x/auth/ante/unorderedtx/snapshotter.go index f4b4021ebf72..5941a11a6888 100644 --- a/x/auth/ante/unorderedtx/snapshotter.go +++ b/x/auth/ante/unorderedtx/snapshotter.go @@ -8,6 +8,12 @@ import ( snapshot "cosmossdk.io/store/snapshots/types" ) +const ( + txHashSize = 32 + ttlSize = 8 + chunkSize = txHashSize + ttlSize +) + var _ snapshot.ExtensionSnapshotter = &Snapshotter{} const ( @@ -63,19 +69,23 @@ func (s *Snapshotter) restore(height uint64, payloadReader snapshot.ExtensionPay return err } + if len(payload)%chunkSize != 0 { + return errors.New("invalid unordered txs payload length") + } + var i int for i < len(payload) { var txHash TxHash - copy(txHash[:], payload[i:i+32]) + copy(txHash[:], payload[i:i+txHashSize]) - ttl := binary.BigEndian.Uint64(payload[i+32 : i+40]) + ttl := binary.BigEndian.Uint64(payload[i+txHashSize : i+chunkSize]) if height < ttl { // only add unordered transactions that are still valid, i.e. unexpired s.m.Add(txHash, ttl) } - i += 40 + i += chunkSize } return nil