-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: [ADR-070] Unordered Transactions (1/2) #18641
Conversation
WalkthroughThe overall change introduces support for unordered transactions in the Cosmos SDK, which allows transactions to be processed out of sequence without nonce management by the client. This is achieved through new flags, updates to the transaction factory, and modifications to the AnteHandler decorators for signature verification and transaction management. New tests are added to ensure the integrity of the feature, and documentation is updated to guide through the upgrade process and usage. Changes
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit's AI:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
This is great stuff @alexanderbez Does this remove idempotency? If I sign a transfer and broadcast it 2 times, will it be sent twice? |
This is a great question. Assuming the transaction is identical, it would be misappropriate use to send an identical tx twice (e.g. a double spend via MITM attack). So what will happen is that CometBFT will reject the duplicate tx. From the SDK's perspective, even if the 2nd tx made it in somehow to the node, the app's mempool should reject it as well. Finally, as a measure of last resort, This will be clearly documented. |
@julienrbrt I've updated some tx related logic to support a new TxBody field -- unordered, e.g. |
client/v2 still uses client/tx for now, so nothing to change there :) |
Ok @tac0turtle @yihuang, this PR is at a point where the core logic is more or less complete (AnteHandler + map/manager implementation). The last critical bit left is how to handle node restarts, i.e. ensuring the map is durable. As I see it there are two viable options: Option A: Seed map from consensus block store
Option B: Seed map from application state, i.e. relying on store
@yihuang let me know if I'm missing other options? |
return ctx, errorsmod.Wrapf(sdkerrors.ErrInvalidRequest, "unordered tx ttl exceeds %d", d.maxUnOrderedTTL) | ||
} | ||
|
||
txHash := sha256.Sum256(ctx.TxBytes()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a better way to reuse the tx hash calculated by the cometbft?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, we do not have this information exposed on the types.Context
. I don't even think CometBFT provides it actually.
for { | ||
select { | ||
case <-ctx.Done(): | ||
return latestHeight, true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems when this timeout happens, it could returns 0, true
, which leads to the call of expiredTxs
with 0
as parameter, which will expires all tx hashes right?
I don't think we need this timeout context here at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which leads to the call of expiredTxs with 0 as parameter, which will expires all tx hashes right
If expiredTxs
is called with 0
, it'll essentially be a no-op, returning an empty []TxHash
. I.e. it wont purge anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only one minor suggestion for code changes.
@yihuang I responded to the comments. Pls let me know what you think :) |
Co-authored-by: yihuang <huang@crypto.com>
* Update the `App` constructor to create, load, and save the unordered transaction | ||
manager. | ||
|
||
```go | ||
func NewApp(...) *App { | ||
// ... | ||
|
||
// create, start, and load the unordered tx manager | ||
utxDataDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data") | ||
app.UnorderedTxManager = unorderedtx.NewManager(utxDataDir) | ||
app.UnorderedTxManager.Start() | ||
|
||
if err := app.UnorderedTxManager.OnInit(); err != nil { | ||
panic(fmt.Errorf("failed to initialize unordered tx manager: %w", err)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure that the UnorderedTxManager
is properly initialized in the App
constructor, as failure to do so will result in a panic. This is a critical step for the unordered transactions feature to function correctly.
utxDataDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data") | ||
app.UnorderedTxManager = unorderedtx.NewManager(utxDataDir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The review comment is correct in stating that the UnorderedTxManager
is initialized with a data directory and that there should be error handling for potential I/O issues. The script output shows that the NewManager
function in x/auth/ante/unorderedtx/manager.go
does attempt to create the directory if it does not exist. However, it does not handle errors that may occur during the directory creation, as the error from os.Mkdir
is ignored. This could lead to unhandled I/O errors if the directory creation fails.
- The directory creation is attempted but not properly error-handled in
NewManager
. - Potential I/O issues during directory creation are not addressed.
Analysis chain
The UnorderedTxManager
is being initialized with a data directory derived from the application options. Ensure that the directory is created if it does not exist and that error handling is in place for potential I/O issues.
Scripts Executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Verify that the data directory for UnorderedTxManager is created and handled correctly.
rg --type go 'NewManager' --files-with-matches
Length of output: 568
Script:
#!/bin/bash
# Check the implementation of NewManager in unorderedtx/manager.go for directory creation and error handling.
cat x/auth/ante/unorderedtx/manager.go
Length of output: 6834
// Manager contains the tx hash dictionary for duplicates checking, and expire | ||
// them when block production progresses. | ||
type Manager struct { | ||
// blockCh defines a channel to receive newly committed block heights | ||
blockCh chan uint64 | ||
// doneCh allows us to ensure the purgeLoop has gracefully terminated prior to closing | ||
doneCh chan struct{} | ||
|
||
// dataDir defines the directory to store unexpired unordered transactions | ||
// | ||
// XXX: Note, ideally we avoid the need to store unexpired unordered transactions | ||
// directly to file. However, store v1 does not allow such a primitive. But, | ||
// once store v2 is fully integrated, we can remove manual file handling and | ||
// store the unexpired unordered transactions directly to SS. | ||
// | ||
// Ref: https://github.com/cosmos/cosmos-sdk/issues/18467 | ||
dataDir string | ||
|
||
mu sync.RWMutex | ||
// txHashes defines a map from tx hash -> TTL value, which is used for duplicate | ||
// checking and replay protection, as well as purging the map when the TTL is | ||
// expired. | ||
txHashes map[TxHash]uint64 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Manager
struct is defined with fields for managing unordered transactions. It includes a channel for block heights, a done channel, a data directory, and a map for transaction hashes with their TTL. The struct seems well-structured for its purpose. However, the comment on lines 42-45 mentions a future improvement related to store v2. This should be tracked as a TODO to ensure it is not forgotten when store v2 is integrated.
func (m *Manager) Start() { | ||
go m.purgeLoop() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Start
method starts the purge loop in a separate goroutine. This is a potential source of non-determinism and should be carefully managed to ensure that the goroutine is properly synchronized with the rest of the application.
func (m *Manager) Add(txHash TxHash, ttl uint64) { | ||
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
|
||
m.txHashes[txHash] = ttl | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Add
method adds a transaction hash with its TTL to the map. The previous comment from coderabbitai[bot]
suggests enforcing a maximum TTL. This should be implemented to prevent TTL values that exceed the system's limits.
func (m *Manager) Add(txHash TxHash, ttl uint64) {
m.mu.Lock()
defer m.mu.Unlock()
+ if ttl > DefaultMaxUnOrderedTTL {
+ ttl = DefaultMaxUnOrderedTTL
+ }
m.txHashes[txHash] = ttl
}
Committable suggestion
❗ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
func (m *Manager) Add(txHash TxHash, ttl uint64) { | |
m.mu.Lock() | |
defer m.mu.Unlock() | |
m.txHashes[txHash] = ttl | |
} | |
func (m *Manager) Add(txHash TxHash, ttl uint64) { | |
m.mu.Lock() | |
defer m.mu.Unlock() | |
if ttl > DefaultMaxUnOrderedTTL { | |
ttl = DefaultMaxUnOrderedTTL | |
} | |
m.txHashes[txHash] = ttl | |
} |
// restore with height > ttl which should result in no unordered txs synced | ||
txm2 := unorderedtx.NewManager(dataDir) | ||
s2 := unorderedtx.NewSnapshotter(txm2) | ||
err = s2.RestoreExtension(200, unorderedtx.SnapshotFormat, pr) | ||
require.NoError(t, err) | ||
require.Empty(t, txm2.Size()) | ||
|
||
// restore with with height < ttl which should result in all unordered txs synced | ||
txm3 := unorderedtx.NewManager(dataDir) | ||
s3 := unorderedtx.NewSnapshotter(txm3) | ||
err = s3.RestoreExtension(50, unorderedtx.SnapshotFormat, pr) | ||
require.NoError(t, err) | ||
require.Equal(t, 100, txm3.Size()) | ||
|
||
for i := 0; i < 100; i++ { | ||
require.True(t, txm3.Contains([32]byte{byte(i)})) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test checks the scenario where the height is greater than the TTL, expecting no unordered transactions to be synced. This is a good test for boundary conditions, but it would be beneficial to also include a test where the height is exactly equal to the TTL to ensure that edge case is handled correctly.
// Add a test case where height == ttl
func TestUnorderedTxManager_SimpleSize(t *testing.T) { | ||
txm := unorderedtx.NewManager(t.TempDir()) | ||
defer func() { | ||
require.NoError(t, txm.Close()) | ||
}() | ||
|
||
txm.Start() | ||
|
||
txm.Add([32]byte{0xFF}, 100) | ||
txm.Add([32]byte{0xAA}, 100) | ||
txm.Add([32]byte{0xCC}, 100) | ||
|
||
require.Equal(t, 3, txm.Size()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test TestUnorderedTxManager_SimpleSize
checks the size of the manager after adding transactions. Ensure that the test covers scenarios where transactions are removed or expired to verify that the size is updated correctly.
// Add tests for removal and expiration of transactions.
func TestUnorderedTxManager_SimpleContains(t *testing.T) { | ||
txm := unorderedtx.NewManager(t.TempDir()) | ||
defer func() { | ||
require.NoError(t, txm.Close()) | ||
}() | ||
|
||
txm.Start() | ||
|
||
for i := 0; i < 10; i++ { | ||
txHash := [32]byte{byte(i)} | ||
txm.Add(txHash, 100) | ||
require.True(t, txm.Contains(txHash)) | ||
} | ||
|
||
for i := 10; i < 20; i++ { | ||
txHash := [32]byte{byte(i)} | ||
require.False(t, txm.Contains(txHash)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test TestUnorderedTxManager_SimpleContains
correctly checks if the manager contains certain transactions. It would be beneficial to also test the behavior when transactions are removed or have expired.
// Add tests for checking contains after transactions are removed or expired.
func TestUnorderedTxManager_CloseInit(t *testing.T) { | ||
dataDir := t.TempDir() | ||
txm := unorderedtx.NewManager(dataDir) | ||
txm.Start() | ||
|
||
// add a handful of unordered txs | ||
for i := 0; i < 100; i++ { | ||
txm.Add([32]byte{byte(i)}, 100) | ||
} | ||
|
||
// close the manager, which should flush all unexpired txs to file | ||
require.NoError(t, txm.Close()) | ||
|
||
// create a new manager, start it | ||
txm2 := unorderedtx.NewManager(dataDir) | ||
defer func() { | ||
require.NoError(t, txm2.Close()) | ||
}() | ||
|
||
// start and execute OnInit, which should load the unexpired txs from file | ||
txm2.Start() | ||
require.NoError(t, txm2.OnInit()) | ||
require.Equal(t, 100, txm2.Size()) | ||
|
||
for i := 0; i < 100; i++ { | ||
require.True(t, txm2.Contains([32]byte{byte(i)})) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test TestUnorderedTxManager_CloseInit
checks the persistence of transactions across manager instances. Ensure that the test covers scenarios where transactions expire between manager instances.
// Add tests for transaction expiration between manager instances.
func TestUnorderedTxManager_Flow(t *testing.T) { | ||
txm := unorderedtx.NewManager(t.TempDir()) | ||
defer func() { | ||
require.NoError(t, txm.Close()) | ||
}() | ||
|
||
txm.Start() | ||
|
||
// Seed the manager with a txs, some of which should eventually be purged and | ||
// the others will remain. Txs with TTL less than or equal to 50 should be purged. | ||
for i := 1; i <= 100; i++ { | ||
txHash := [32]byte{byte(i)} | ||
|
||
if i <= 50 { | ||
txm.Add(txHash, uint64(i)) | ||
} else { | ||
txm.Add(txHash, 100) | ||
} | ||
} | ||
|
||
// start a goroutine that mimics new blocks being made every 500ms | ||
doneBlockCh := make(chan bool) | ||
go func() { | ||
ticker := time.NewTicker(time.Millisecond * 500) | ||
defer ticker.Stop() | ||
|
||
var ( | ||
height uint64 = 1 | ||
i = 101 | ||
) | ||
for range ticker.C { | ||
txm.OnNewBlock(height) | ||
height++ | ||
|
||
if height > 51 { | ||
doneBlockCh <- true | ||
return | ||
} else { | ||
txm.Add([32]byte{byte(i)}, 50) | ||
} | ||
} | ||
}() | ||
|
||
// Eventually all the txs that should be expired by block 50 should be purged. | ||
// The remaining txs should remain. | ||
require.Eventually( | ||
t, | ||
func() bool { | ||
return txm.Size() == 50 | ||
}, | ||
2*time.Minute, | ||
5*time.Second, | ||
) | ||
|
||
<-doneBlockCh |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test TestUnorderedTxManager_Flow
simulates a more complex scenario with transactions being added and purged over time. Ensure that the test covers edge cases such as transactions expiring on the boundary of the TTL.
// Add edge case tests for transactions expiring on the boundary of the TTL.
func DefaultSigVerificationGasConsumer(meter storetypes.GasMeter, sig signing.SignatureV2, params types.Params) error { | ||
pubkey := sig.PubKey | ||
|
||
switch pubkey := pubkey.(type) { | ||
case *ed25519.PubKey: | ||
meter.ConsumeGas(params.SigVerifyCostED25519, "ante verify: ed25519") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous comment by coderabbitai
regarding the handling of ED25519 public keys is still valid. The code should be updated to reflect the commented intention of not returning an error for ED25519 public keys.
- return errorsmod.Wrap(sdkerrors.ErrInvalidPubKey, "ED25519 public keys are unsupported")
+ // return errorsmod.Wrap(sdkerrors.ErrInvalidPubKey, "ED25519 public keys are unsupported")
Committable suggestion
❗ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
func DefaultSigVerificationGasConsumer(meter storetypes.GasMeter, sig signing.SignatureV2, params types.Params) error { | |
pubkey := sig.PubKey | |
switch pubkey := pubkey.(type) { | |
case *ed25519.PubKey: | |
meter.ConsumeGas(params.SigVerifyCostED25519, "ante verify: ed25519") | |
func DefaultSigVerificationGasConsumer(meter storetypes.GasMeter, sig signing.SignatureV2, params types.Params) error { | |
pubkey := sig.PubKey | |
switch pubkey := pubkey.(type) { | |
case *ed25519.PubKey: | |
meter.ConsumeGas(params.SigVerifyCostED25519, "ante verify: ed25519") | |
// return errorsmod.Wrap(sdkerrors.ErrInvalidPubKey, "ED25519 public keys are unsupported") |
There is a test case in |
@@ -290,7 +290,7 @@ message TestUpdatedTxBody { | |||
repeated google.protobuf.Any messages = 1; | |||
string memo = 2; | |||
int64 timeout_height = 3; | |||
uint64 some_new_field = 4; | |||
uint64 some_new_field = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the field number of an existing field can break backward compatibility as serialized data will not be correctly understood by the new schema. Ensure that this change is intentional and that all consumers of this protobuf are updated accordingly.
@@ -290,7 +290,7 @@ message TestUpdatedTxBody { | |||
repeated google.protobuf.Any messages = 1; | |||
string memo = 2; | |||
int64 timeout_height = 3; | |||
uint64 some_new_field = 4; | |||
uint64 some_new_field = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@facundomedica why does this fix the issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because a new field was added with tag 4 to TxBody
which in TestUpdatedTxBody
is some_new_field
.
From what I understand, the rejecting of unknown fields happen by checking the tag numbers, so TxBody
didn't have a tag 4 but because TestUpdatedTxBody
did have it it was erroring (as expected). Now a field with tag 4 is expected, so to trigger the error of an unexpected field we have to modify the proto message used for this test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good find!! could you add a comment to not confuse future readers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a small comment on the test case 👌
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this was such a footgun -- never would've found out. Thanks!
Description
ref: #13009
ref: #18553
This PR introduces the implementation outlined in ADR-070 -- unordered transactions.
Note, this PR is one of two total PRs, which primarily deals with the core business logic such as the AnteHandler decorators and the manager(map) implementation.
A 2nd PR will be made that addresses state management of the manager (map). A proposal for such an implementation can be found here: #18739
Author Checklist
All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.
I have...
!
in the type prefix if API or client breaking changeCHANGELOG.md
Reviewers Checklist
All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.
I have...