From 2b21438ae3e52a284dc6d4708f2c547a8d324250 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Sat, 6 Jan 2018 13:19:40 +0200 Subject: [PATCH] Maintain local copy of the nonce for each used address If multiple concurrent transactions are sent from the same address nonce for some of them will be the same. It will result in the "known transaction error" and such transaction will be dropped from a queue. There is 2 alternatives in solving such problem: - adding an auto-retry to transaction queue, which is very complex and error prone, this way we can actually retry transactions that are already processed by ethereum - maintain nonce locally Second approach is a straightforward. We keep asking for a nonce from upstream, but if our local nonce is higher we steak to it. Our local nonce is updated only if transaction succeeds, so there is no way to send out of order transaction. --- geth/transactions/addrlock.go | 26 +++- geth/transactions/txqueue_manager.go | 144 +++++++++--------- geth/transactions/txqueue_manager_test.go | 96 +++++++++--- .../go-ethereum/internal/ethapi/api.go | 8 - 4 files changed, 172 insertions(+), 102 deletions(-) diff --git a/geth/transactions/addrlock.go b/geth/transactions/addrlock.go index 77da1f85e48..765e9467f9d 100644 --- a/geth/transactions/addrlock.go +++ b/geth/transactions/addrlock.go @@ -1,5 +1,3 @@ -// copy of go-ethereum/internal/ethapi/addrlock.go - package transactions import ( @@ -9,8 +7,9 @@ import ( ) type AddrLocker struct { - mu sync.Mutex - locks map[common.Address]*sync.Mutex + mu sync.Mutex + locks map[common.Address]*sync.Mutex + localNonce map[common.Address]uint64 } // lock returns the lock of the given address. @@ -37,3 +36,22 @@ func (l *AddrLocker) LockAddr(address common.Address) { func (l *AddrLocker) UnlockAddr(address common.Address) { l.lock(address).Unlock() } + +// RunUpdate needs a better name +func (l *AddrLocker) RunUpdate( + addr common.Address, + f func(localNonce uint64) (upstreamNonce uint64, err error), +) error { + l.LockAddr(addr) + defer l.UnlockAddr(addr) + if l.localNonce == nil { + l.localNonce = make(map[common.Address]uint64) + } + local := l.localNonce[addr] + nonce, err := f(local) + if err != nil { + return err + } + l.localNonce[addr] = nonce + 1 + return nil +} diff --git a/geth/transactions/txqueue_manager.go b/geth/transactions/txqueue_manager.go index f93a614481b..e134c2eed5f 100644 --- a/geth/transactions/txqueue_manager.go +++ b/geth/transactions/txqueue_manager.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/log" + "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/transactions/queue" ) @@ -119,19 +120,23 @@ func (m *Manager) CompleteTransaction(id common.QueuedTxID, password string) (ha log.Warn("error getting a queued transaction", "err", err) return hash, err } - account, err := m.validateAccount(tx) + config, err := m.nodeManager.NodeConfig() + if err != nil { + return hash, err + } + account, err := m.validateAccount(config, tx, password) if err != nil { m.txDone(tx, hash, err) return hash, err } // Send the transaction finally. - hash, err = m.completeTransaction(account, tx, password) + hash, err = m.completeTransaction(config, account, tx) log.Info("finally completed transaction", "id", tx.ID, "hash", hash, "err", err) m.txDone(tx, hash, err) return hash, err } -func (m *Manager) validateAccount(tx *common.QueuedTx) (*common.SelectedExtKey, error) { +func (m *Manager) validateAccount(config *params.NodeConfig, tx *common.QueuedTx, password string) (*common.SelectedExtKey, error) { selectedAccount, err := m.accountManager.SelectedAccount() if err != nil { log.Warn("failed to get a selected account", "err", err) @@ -142,85 +147,84 @@ func (m *Manager) validateAccount(tx *common.QueuedTx) (*common.SelectedExtKey, log.Warn("queued transaction does not belong to the selected account", "err", queue.ErrInvalidCompleteTxSender) return nil, queue.ErrInvalidCompleteTxSender } - return selectedAccount, nil -} - -func (m *Manager) completeTransaction(selectedAccount *common.SelectedExtKey, queuedTx *common.QueuedTx, password string) (hash gethcommon.Hash, err error) { - log.Info("complete transaction", "id", queuedTx.ID) - log.Info("verifying account password for transaction", "id", queuedTx.ID) - config, err := m.nodeManager.NodeConfig() - if err != nil { - return hash, err - } _, err = m.accountManager.VerifyAccountPassword(config.KeyStoreDir, selectedAccount.Address.String(), password) if err != nil { log.Warn("failed to verify account", "account", selectedAccount.Address.String(), "error", err.Error()) - return hash, err + return nil, err } + return selectedAccount, nil +} +func (m *Manager) completeTransaction(config *params.NodeConfig, selectedAccount *common.SelectedExtKey, queuedTx *common.QueuedTx) (hash gethcommon.Hash, err error) { + log.Info("complete transaction", "id", queuedTx.ID) // update transaction with nonce, gas price and gas estimates - ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) - defer cancel() - m.addrLock.LockAddr(queuedTx.Args.From) - defer m.addrLock.UnlockAddr(queuedTx.Args.From) - nonce, err := m.ethTxClient.PendingNonceAt(ctx, queuedTx.Args.From) - if err != nil { - return hash, err - } - args := queuedTx.Args - gasPrice := (*big.Int)(args.GasPrice) - if args.GasPrice == nil { + err = m.addrLock.RunUpdate(queuedTx.Args.From, func(localNonce uint64) (uint64, error) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + nonce, err := m.ethTxClient.PendingNonceAt(ctx, queuedTx.Args.From) + if err != nil { + return 0, err + } + if localNonce > nonce { + nonce = localNonce + } + args := queuedTx.Args + gasPrice := (*big.Int)(args.GasPrice) + if args.GasPrice == nil { + ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + gasPrice, err = m.ethTxClient.SuggestGasPrice(ctx) + if err != nil { + return 0, err + } + } + + chainID := big.NewInt(int64(config.NetworkID)) + data := []byte(args.Data) + value := (*big.Int)(args.Value) + toAddr := gethcommon.Address{} + if args.To != nil { + toAddr = *args.To + } ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout) defer cancel() - gasPrice, err = m.ethTxClient.SuggestGasPrice(ctx) + gas, err := m.ethTxClient.EstimateGas(ctx, ethereum.CallMsg{ + From: args.From, + To: args.To, + GasPrice: gasPrice, + Value: value, + Data: data, + }) if err != nil { - return hash, err + return 0, err + } + if gas.Cmp(big.NewInt(defaultGas)) == -1 { + log.Info("default gas will be used. estimated gas", gas, "is lower than", defaultGas) + gas = big.NewInt(defaultGas) } - } - chainID := big.NewInt(int64(config.NetworkID)) - data := []byte(args.Data) - value := (*big.Int)(args.Value) - toAddr := gethcommon.Address{} - if args.To != nil { - toAddr = *args.To - } - ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout) - defer cancel() - gas, err := m.ethTxClient.EstimateGas(ctx, ethereum.CallMsg{ - From: args.From, - To: args.To, - GasPrice: gasPrice, - Value: value, - Data: data, + log.Info( + "preparing raw transaction", + "from", args.From.Hex(), + "to", toAddr.Hex(), + "gas", gas, + "gasPrice", gasPrice, + "value", value, + ) + tx := types.NewTransaction(nonce, toAddr, value, gas, gasPrice, data) + signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), selectedAccount.AccountKey.PrivateKey) + if err != nil { + return 0, err + } + ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + if err := m.ethTxClient.SendTransaction(ctx, signedTx); err != nil { + return 0, err + } + hash = signedTx.Hash() + return nonce, nil }) - if err != nil { - return hash, err - } - if gas.Cmp(big.NewInt(defaultGas)) == -1 { - log.Info("default gas will be used. estimated gas", gas, "is lower than", defaultGas) - gas = big.NewInt(defaultGas) - } - - log.Info( - "preparing raw transaction", - "from", args.From.Hex(), - "to", toAddr.Hex(), - "gas", gas, - "gasPrice", gasPrice, - "value", value, - ) - tx := types.NewTransaction(nonce, toAddr, value, gas, gasPrice, data) - signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), selectedAccount.AccountKey.PrivateKey) - if err != nil { - return hash, err - } - ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout) - defer cancel() - if err := m.ethTxClient.SendTransaction(ctx, signedTx); err != nil { - return hash, err - } - return signedTx.Hash(), nil + return hash, nil } // CompleteTransactions instructs backend to complete sending of multiple transactions diff --git a/geth/transactions/txqueue_manager_test.go b/geth/transactions/txqueue_manager_test.go index 5c781671ae1..ffef5443f02 100644 --- a/geth/transactions/txqueue_manager_test.go +++ b/geth/transactions/txqueue_manager_test.go @@ -61,19 +61,19 @@ func (s *TxQueueTestSuite) TearDownTest() { s.client.Close() } -func (s *TxQueueTestSuite) setupTransactionPoolAPI(account *common.SelectedExtKey, nonce hexutil.Uint64, gas hexutil.Big, txErr error) { - s.txServiceMock.EXPECT().GetTransactionCount(gomock.Any(), account.Address, gethrpc.PendingBlockNumber).Return(&nonce, nil) - s.txServiceMock.EXPECT().GasPrice(gomock.Any()).Return(big.NewInt(10), nil) - s.txServiceMock.EXPECT().EstimateGas(gomock.Any(), gomock.Any()).Return(&gas, nil) - s.txServiceMock.EXPECT().SendRawTransaction(gomock.Any(), gomock.Any()).Return(gethcommon.Hash{}, txErr) +func (s *TxQueueTestSuite) setupTransactionPoolAPI(txCount int, account *common.SelectedExtKey, nonce hexutil.Uint64, gas hexutil.Big, txErr error) { + s.txServiceMock.EXPECT().GetTransactionCount(gomock.Any(), account.Address, gethrpc.PendingBlockNumber).Times(txCount).Return(&nonce, nil) + s.txServiceMock.EXPECT().GasPrice(gomock.Any()).Times(txCount).Return(big.NewInt(10), nil) + s.txServiceMock.EXPECT().EstimateGas(gomock.Any(), gomock.Any()).Times(txCount).Return(&gas, nil) + s.txServiceMock.EXPECT().SendRawTransaction(gomock.Any(), gomock.Any()).Times(txCount).Return(gethcommon.Hash{}, txErr) } -func (s *TxQueueTestSuite) setupStatusBackend(account *common.SelectedExtKey, password string) { +func (s *TxQueueTestSuite) setupStatusBackend(txCount int, account *common.SelectedExtKey, password string, passErr error) { nodeConfig, nodeErr := params.NewNodeConfig("/tmp", params.RopstenNetworkID, true) - s.nodeManagerMock.EXPECT().NodeConfig().Return(nodeConfig, nodeErr) - s.accountManagerMock.EXPECT().SelectedAccount().Return(account, nil) - s.accountManagerMock.EXPECT().VerifyAccountPassword(nodeConfig.KeyStoreDir, account.Address.String(), password).Return( - nil, nil) + s.nodeManagerMock.EXPECT().NodeConfig().Times(txCount).Return(nodeConfig, nodeErr) + s.accountManagerMock.EXPECT().SelectedAccount().Times(txCount).Return(account, nil) + s.accountManagerMock.EXPECT().VerifyAccountPassword(nodeConfig.KeyStoreDir, account.Address.String(), password).Times(txCount).Return( + nil, passErr) } func (s *TxQueueTestSuite) TestCompleteTransaction() { @@ -83,11 +83,11 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() { Address: common.FromAddress(TestConfig.Account1.Address), AccountKey: &keystore.Key{PrivateKey: key}, } - s.setupStatusBackend(account, password) + s.setupStatusBackend(1, account, password, nil) nonce := hexutil.Uint64(10) gas := hexutil.Big(*big.NewInt(defaultGas + 1)) - s.setupTransactionPoolAPI(account, nonce, gas, nil) + s.setupTransactionPoolAPI(1, account, nonce, gas, nil) txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) @@ -123,11 +123,11 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { Address: common.FromAddress(TestConfig.Account1.Address), AccountKey: &keystore.Key{PrivateKey: key}, } - s.setupStatusBackend(account, password) + s.setupStatusBackend(1, account, password, nil) nonce := hexutil.Uint64(10) gas := hexutil.Big(*big.NewInt(defaultGas + 1)) - s.setupTransactionPoolAPI(account, nonce, gas, nil) + s.setupTransactionPoolAPI(1, account, nonce, gas, nil) txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) txQueueManager.DisableNotificactions() @@ -177,6 +177,8 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { } func (s *TxQueueTestSuite) TestAccountMismatch() { + nodeConfig, nodeErr := params.NewNodeConfig("/tmp", params.RopstenNetworkID, true) + s.nodeManagerMock.EXPECT().NodeConfig().Return(nodeConfig, nodeErr) s.accountManagerMock.EXPECT().SelectedAccount().Return(&common.SelectedExtKey{ Address: common.FromAddress(TestConfig.Account2.Address), }, nil) @@ -209,11 +211,7 @@ func (s *TxQueueTestSuite) TestInvalidPassword() { Address: common.FromAddress(TestConfig.Account1.Address), AccountKey: &keystore.Key{PrivateKey: key}, } - s.setupStatusBackend(account, password) - - nonce := hexutil.Uint64(10) - gas := hexutil.Big(*big.NewInt(defaultGas + 1)) - s.setupTransactionPoolAPI(account, nonce, gas, keystore.ErrDecrypt) + s.setupStatusBackend(1, account, password, keystore.ErrDecrypt) txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) txQueueManager.DisableNotificactions() @@ -227,7 +225,6 @@ func (s *TxQueueTestSuite) TestInvalidPassword() { }) txQueueManager.QueueTransaction(tx) - _, err := txQueueManager.CompleteTransaction(tx.ID, password) s.Equal(err.Error(), keystore.ErrDecrypt.Error()) @@ -280,3 +277,62 @@ func (s *TxQueueTestSuite) TestCompletionTimedOut() { rst := txQueueManager.WaitForTransaction(tx, c) s.Equal(queue.ErrQueuedTxTimedOut, rst.Err) } + +// TestLocalNonce verifies that local nonce will be used unless +// upstream nonce is updated and higher than a local +// in test we will run 3 transaction with nonce zero returned by upstream +// node, after each call local nonce will be incremented +// then, we return higher nonce, as if another node was used to send 2 transactions +// upstream nonce will be equal to 5, we update our local counter to 5+1 +func (s *TxQueueTestSuite) TestLocalNone() { + txCount := 3 + password := TestConfig.Account1.Password + key, _ := crypto.GenerateKey() + account := &common.SelectedExtKey{ + Address: common.FromAddress(TestConfig.Account1.Address), + AccountKey: &keystore.Key{PrivateKey: key}, + } + s.setupStatusBackend(txCount+1, account, password, nil) + + nonce := hexutil.Uint64(0) + gas := hexutil.Big(*big.NewInt(defaultGas + 1)) + s.setupTransactionPoolAPI(txCount, account, nonce, gas, nil) + + manager := NewManager(s.nodeManagerMock, s.accountManagerMock) + manager.DisableNotificactions() + + manager.Start() + defer manager.Stop() + + for i := 0; i < txCount; i++ { + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ + From: common.FromAddress(TestConfig.Account1.Address), + To: common.ToAddress(TestConfig.Account2.Address), + }) + + c := manager.QueueTransaction(tx) + hash, err := manager.CompleteTransaction(tx.ID, password) + rst := manager.WaitForTransaction(tx, c) + // simple sanity checks + s.NoError(err) + s.Equal(rst.Err, err) + s.Equal(rst.Hash, hash) + s.Equal(uint64(i)+1, manager.addrLock.localNonce[tx.Args.From]) + } + nonce = hexutil.Uint64(5) + s.txServiceMock.EXPECT().GetTransactionCount(gomock.Any(), account.Address, gethrpc.PendingBlockNumber).Return(&nonce, nil) + s.txServiceMock.EXPECT().GasPrice(gomock.Any()).Return(big.NewInt(10), nil) + s.txServiceMock.EXPECT().EstimateGas(gomock.Any(), gomock.Any()).Return(&gas, nil) + s.txServiceMock.EXPECT().SendRawTransaction(gomock.Any(), gomock.Any()).Return(gethcommon.Hash{}, nil) + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ + From: common.FromAddress(TestConfig.Account1.Address), + To: common.ToAddress(TestConfig.Account2.Address), + }) + c := manager.QueueTransaction(tx) + hash, err := manager.CompleteTransaction(tx.ID, password) + rst := manager.WaitForTransaction(tx, c) + s.NoError(err) + s.Equal(rst.Err, err) + s.Equal(rst.Hash, hash) + s.Equal(uint64(nonce)+1, manager.addrLock.localNonce[tx.Args.From]) +} diff --git a/vendor/github.com/ethereum/go-ethereum/internal/ethapi/api.go b/vendor/github.com/ethereum/go-ethereum/internal/ethapi/api.go index 6e12e50077c..af7a5e4baa6 100644 --- a/vendor/github.com/ethereum/go-ethereum/internal/ethapi/api.go +++ b/vendor/github.com/ethereum/go-ethereum/internal/ethapi/api.go @@ -956,14 +956,6 @@ func (s *PublicTransactionPoolAPI) GetRawTransactionByBlockHashAndIndex(ctx cont // GetTransactionCount returns the number of transactions the given address has sent for the given block number func (s *PublicTransactionPoolAPI) GetTransactionCount(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (*hexutil.Uint64, error) { - // go-ethereum issue https://github.com/ethereum/go-ethereum/issues/2880 - if blockNr == rpc.PendingBlockNumber { - nonce, err := s.b.GetPoolNonce(ctx, address) - if err != nil { - return nil, err - } - return (*hexutil.Uint64)(&nonce), nil - } state, _, err := s.b.StateAndHeaderByNumber(ctx, blockNr) if state == nil || err != nil { return nil, err