Skip to content

Commit

Permalink
Maintain local copy of the nonce for each used address
Browse files Browse the repository at this point in the history
If multiple concurrent transactions are sent from the same address
nonce for some of them will be the same. It will result in the
"known transaction error" and such transaction will be dropped
from a queue.

There is 2 alternatives in solving such problem:
- adding an auto-retry to transaction queue, which is very complex
  and error prone, this way we can actually retry transactions
  that are already processed by ethereum
- maintain nonce locally

Second approach is a straightforward. We keep asking for a nonce
from upstream, but if our local nonce is higher we steak to it.
Our local nonce is updated only if transaction succeeds, so there is
no way to send out of order transaction.
  • Loading branch information
dshulyak committed Jan 6, 2018
1 parent d75f6f0 commit 58a039f
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 102 deletions.
26 changes: 22 additions & 4 deletions geth/transactions/addrlock.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// copy of go-ethereum/internal/ethapi/addrlock.go

package transactions

import (
Expand All @@ -9,8 +7,9 @@ import (
)

type AddrLocker struct {
mu sync.Mutex
locks map[common.Address]*sync.Mutex
mu sync.Mutex
locks map[common.Address]*sync.Mutex
localNonce map[common.Address]uint64
}

// lock returns the lock of the given address.
Expand All @@ -37,3 +36,22 @@ func (l *AddrLocker) LockAddr(address common.Address) {
func (l *AddrLocker) UnlockAddr(address common.Address) {
l.lock(address).Unlock()
}

// RunUpdate needs a better name
func (l *AddrLocker) RunUpdate(
addr common.Address,
f func(localNonce uint64) (upstreamNonce uint64, err error),
) error {
l.LockAddr(addr)
defer l.UnlockAddr(addr)
if l.localNonce == nil {
l.localNonce = make(map[common.Address]uint64)
}
local := l.localNonce[addr]
nonce, err := f(local)
if err != nil {
return err
}
l.localNonce[addr] = nonce + 1
return nil
}
144 changes: 74 additions & 70 deletions geth/transactions/txqueue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/log"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/transactions/queue"
)

Expand Down Expand Up @@ -119,19 +120,23 @@ func (m *Manager) CompleteTransaction(id common.QueuedTxID, password string) (ha
log.Warn("error getting a queued transaction", "err", err)
return hash, err
}
account, err := m.validateAccount(tx)
config, err := m.nodeManager.NodeConfig()
if err != nil {
return hash, err
}
account, err := m.validateAccount(config, tx, password)
if err != nil {
m.txDone(tx, hash, err)
return hash, err
}
// Send the transaction finally.
hash, err = m.completeTransaction(account, tx, password)
hash, err = m.completeTransaction(config, account, tx, password)
log.Info("finally completed transaction", "id", tx.ID, "hash", hash, "err", err)
m.txDone(tx, hash, err)
return hash, err
}

func (m *Manager) validateAccount(tx *common.QueuedTx) (*common.SelectedExtKey, error) {
func (m *Manager) validateAccount(config *params.NodeConfig, tx *common.QueuedTx, password string) (*common.SelectedExtKey, error) {
selectedAccount, err := m.accountManager.SelectedAccount()
if err != nil {
log.Warn("failed to get a selected account", "err", err)
Expand All @@ -142,85 +147,84 @@ func (m *Manager) validateAccount(tx *common.QueuedTx) (*common.SelectedExtKey,
log.Warn("queued transaction does not belong to the selected account", "err", queue.ErrInvalidCompleteTxSender)
return nil, queue.ErrInvalidCompleteTxSender
}
return selectedAccount, nil
}

func (m *Manager) completeTransaction(selectedAccount *common.SelectedExtKey, queuedTx *common.QueuedTx, password string) (hash gethcommon.Hash, err error) {
log.Info("complete transaction", "id", queuedTx.ID)
log.Info("verifying account password for transaction", "id", queuedTx.ID)
config, err := m.nodeManager.NodeConfig()
if err != nil {
return hash, err
}
_, err = m.accountManager.VerifyAccountPassword(config.KeyStoreDir, selectedAccount.Address.String(), password)
if err != nil {
log.Warn("failed to verify account", "account", selectedAccount.Address.String(), "error", err.Error())
return hash, err
return nil, err
}
return selectedAccount, nil
}

func (m *Manager) completeTransaction(config *params.NodeConfig, selectedAccount *common.SelectedExtKey, queuedTx *common.QueuedTx, password string) (hash gethcommon.Hash, err error) {
log.Info("complete transaction", "id", queuedTx.ID)
// update transaction with nonce, gas price and gas estimates
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
m.addrLock.LockAddr(queuedTx.Args.From)
defer m.addrLock.UnlockAddr(queuedTx.Args.From)
nonce, err := m.ethTxClient.PendingNonceAt(ctx, queuedTx.Args.From)
if err != nil {
return hash, err
}
args := queuedTx.Args
gasPrice := (*big.Int)(args.GasPrice)
if args.GasPrice == nil {
err = m.addrLock.RunUpdate(queuedTx.Args.From, func(localNonce uint64) (uint64, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
nonce, err := m.ethTxClient.PendingNonceAt(ctx, queuedTx.Args.From)
if err != nil {
return 0, err
}
if localNonce > nonce {
nonce = localNonce
}
args := queuedTx.Args
gasPrice := (*big.Int)(args.GasPrice)
if args.GasPrice == nil {
ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
gasPrice, err = m.ethTxClient.SuggestGasPrice(ctx)
if err != nil {
return 0, err
}
}

chainID := big.NewInt(int64(config.NetworkID))
data := []byte(args.Data)
value := (*big.Int)(args.Value)
toAddr := gethcommon.Address{}
if args.To != nil {
toAddr = *args.To
}
ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
gasPrice, err = m.ethTxClient.SuggestGasPrice(ctx)
gas, err := m.ethTxClient.EstimateGas(ctx, ethereum.CallMsg{
From: args.From,
To: args.To,
GasPrice: gasPrice,
Value: value,
Data: data,
})
if err != nil {
return hash, err
return 0, err
}
if gas.Cmp(big.NewInt(defaultGas)) == -1 {
log.Info("default gas will be used. estimated gas", gas, "is lower than", defaultGas)
gas = big.NewInt(defaultGas)
}
}

chainID := big.NewInt(int64(config.NetworkID))
data := []byte(args.Data)
value := (*big.Int)(args.Value)
toAddr := gethcommon.Address{}
if args.To != nil {
toAddr = *args.To
}
ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
gas, err := m.ethTxClient.EstimateGas(ctx, ethereum.CallMsg{
From: args.From,
To: args.To,
GasPrice: gasPrice,
Value: value,
Data: data,
log.Info(
"preparing raw transaction",
"from", args.From.Hex(),
"to", toAddr.Hex(),
"gas", gas,
"gasPrice", gasPrice,
"value", value,
)
tx := types.NewTransaction(nonce, toAddr, value, gas, gasPrice, data)
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), selectedAccount.AccountKey.PrivateKey)
if err != nil {
return 0, err
}
ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
if err := m.ethTxClient.SendTransaction(ctx, signedTx); err != nil {
return 0, err
}
hash = signedTx.Hash()
return nonce, nil
})
if err != nil {
return hash, err
}
if gas.Cmp(big.NewInt(defaultGas)) == -1 {
log.Info("default gas will be used. estimated gas", gas, "is lower than", defaultGas)
gas = big.NewInt(defaultGas)
}

log.Info(
"preparing raw transaction",
"from", args.From.Hex(),
"to", toAddr.Hex(),
"gas", gas,
"gasPrice", gasPrice,
"value", value,
)
tx := types.NewTransaction(nonce, toAddr, value, gas, gasPrice, data)
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), selectedAccount.AccountKey.PrivateKey)
if err != nil {
return hash, err
}
ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
if err := m.ethTxClient.SendTransaction(ctx, signedTx); err != nil {
return hash, err
}
return signedTx.Hash(), nil
return hash, nil
}

// CompleteTransactions instructs backend to complete sending of multiple transactions
Expand Down
96 changes: 76 additions & 20 deletions geth/transactions/txqueue_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@ func (s *TxQueueTestSuite) TearDownTest() {
s.client.Close()
}

func (s *TxQueueTestSuite) setupTransactionPoolAPI(account *common.SelectedExtKey, nonce hexutil.Uint64, gas hexutil.Big, txErr error) {
s.txServiceMock.EXPECT().GetTransactionCount(gomock.Any(), account.Address, gethrpc.PendingBlockNumber).Return(&nonce, nil)
s.txServiceMock.EXPECT().GasPrice(gomock.Any()).Return(big.NewInt(10), nil)
s.txServiceMock.EXPECT().EstimateGas(gomock.Any(), gomock.Any()).Return(&gas, nil)
s.txServiceMock.EXPECT().SendRawTransaction(gomock.Any(), gomock.Any()).Return(gethcommon.Hash{}, txErr)
func (s *TxQueueTestSuite) setupTransactionPoolAPI(txCount int, account *common.SelectedExtKey, nonce hexutil.Uint64, gas hexutil.Big, txErr error) {
s.txServiceMock.EXPECT().GetTransactionCount(gomock.Any(), account.Address, gethrpc.PendingBlockNumber).Times(txCount).Return(&nonce, nil)
s.txServiceMock.EXPECT().GasPrice(gomock.Any()).Times(txCount).Return(big.NewInt(10), nil)
s.txServiceMock.EXPECT().EstimateGas(gomock.Any(), gomock.Any()).Times(txCount).Return(&gas, nil)
s.txServiceMock.EXPECT().SendRawTransaction(gomock.Any(), gomock.Any()).Times(txCount).Return(gethcommon.Hash{}, txErr)
}

func (s *TxQueueTestSuite) setupStatusBackend(account *common.SelectedExtKey, password string) {
func (s *TxQueueTestSuite) setupStatusBackend(txCount int, account *common.SelectedExtKey, password string, passErr error) {
nodeConfig, nodeErr := params.NewNodeConfig("/tmp", params.RopstenNetworkID, true)
s.nodeManagerMock.EXPECT().NodeConfig().Return(nodeConfig, nodeErr)
s.accountManagerMock.EXPECT().SelectedAccount().Return(account, nil)
s.accountManagerMock.EXPECT().VerifyAccountPassword(nodeConfig.KeyStoreDir, account.Address.String(), password).Return(
nil, nil)
s.nodeManagerMock.EXPECT().NodeConfig().Times(txCount).Return(nodeConfig, nodeErr)
s.accountManagerMock.EXPECT().SelectedAccount().Times(txCount).Return(account, nil)
s.accountManagerMock.EXPECT().VerifyAccountPassword(nodeConfig.KeyStoreDir, account.Address.String(), password).Times(txCount).Return(
nil, passErr)
}

func (s *TxQueueTestSuite) TestCompleteTransaction() {
Expand All @@ -83,11 +83,11 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() {
Address: common.FromAddress(TestConfig.Account1.Address),
AccountKey: &keystore.Key{PrivateKey: key},
}
s.setupStatusBackend(account, password)
s.setupStatusBackend(1, account, password, nil)

nonce := hexutil.Uint64(10)
gas := hexutil.Big(*big.NewInt(defaultGas + 1))
s.setupTransactionPoolAPI(account, nonce, gas, nil)
s.setupTransactionPoolAPI(1, account, nonce, gas, nil)

txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock)

Expand Down Expand Up @@ -123,11 +123,11 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() {
Address: common.FromAddress(TestConfig.Account1.Address),
AccountKey: &keystore.Key{PrivateKey: key},
}
s.setupStatusBackend(account, password)
s.setupStatusBackend(1, account, password, nil)

nonce := hexutil.Uint64(10)
gas := hexutil.Big(*big.NewInt(defaultGas + 1))
s.setupTransactionPoolAPI(account, nonce, gas, nil)
s.setupTransactionPoolAPI(1, account, nonce, gas, nil)

txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock)
txQueueManager.DisableNotificactions()
Expand Down Expand Up @@ -177,6 +177,8 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() {
}

func (s *TxQueueTestSuite) TestAccountMismatch() {
nodeConfig, nodeErr := params.NewNodeConfig("/tmp", params.RopstenNetworkID, true)
s.nodeManagerMock.EXPECT().NodeConfig().Return(nodeConfig, nodeErr)
s.accountManagerMock.EXPECT().SelectedAccount().Return(&common.SelectedExtKey{
Address: common.FromAddress(TestConfig.Account2.Address),
}, nil)
Expand Down Expand Up @@ -209,11 +211,7 @@ func (s *TxQueueTestSuite) TestInvalidPassword() {
Address: common.FromAddress(TestConfig.Account1.Address),
AccountKey: &keystore.Key{PrivateKey: key},
}
s.setupStatusBackend(account, password)

nonce := hexutil.Uint64(10)
gas := hexutil.Big(*big.NewInt(defaultGas + 1))
s.setupTransactionPoolAPI(account, nonce, gas, keystore.ErrDecrypt)
s.setupStatusBackend(1, account, password, keystore.ErrDecrypt)

txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock)
txQueueManager.DisableNotificactions()
Expand All @@ -227,7 +225,6 @@ func (s *TxQueueTestSuite) TestInvalidPassword() {
})

txQueueManager.QueueTransaction(tx)

_, err := txQueueManager.CompleteTransaction(tx.ID, password)
s.Equal(err.Error(), keystore.ErrDecrypt.Error())

Expand Down Expand Up @@ -280,3 +277,62 @@ func (s *TxQueueTestSuite) TestCompletionTimedOut() {
rst := txQueueManager.WaitForTransaction(tx, c)
s.Equal(queue.ErrQueuedTxTimedOut, rst.Err)
}

// TestLocalNonce verifies that local nonce will be used unless
// upstream nonce is updated and higher than a local
// in test we will run 3 transaction with nonce zero returned by upstream
// node, after each call local nonce will be incremented
// then, we return higher nonce, as if another node was used to send 2 transactions
// upstream nonce will be equal to 5, we update our local counter to 5+1
func (s *TxQueueTestSuite) TestLocalNone() {
txCount := 3
password := TestConfig.Account1.Password
key, _ := crypto.GenerateKey()
account := &common.SelectedExtKey{
Address: common.FromAddress(TestConfig.Account1.Address),
AccountKey: &keystore.Key{PrivateKey: key},
}
s.setupStatusBackend(txCount+1, account, password, nil)

nonce := hexutil.Uint64(0)
gas := hexutil.Big(*big.NewInt(defaultGas + 1))
s.setupTransactionPoolAPI(txCount, account, nonce, gas, nil)

manager := NewManager(s.nodeManagerMock, s.accountManagerMock)
manager.DisableNotificactions()

manager.Start()
defer manager.Stop()

for i := 0; i < txCount; i++ {
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{
From: common.FromAddress(TestConfig.Account1.Address),
To: common.ToAddress(TestConfig.Account2.Address),
})

c := manager.QueueTransaction(tx)
hash, err := manager.CompleteTransaction(tx.ID, password)
rst := manager.WaitForTransaction(tx, c)
// simple sanity checks
s.NoError(err)
s.Equal(rst.Err, err)
s.Equal(rst.Hash, hash)
s.Equal(uint64(i)+1, manager.addrLock.localNonce[tx.Args.From])
}
nonce = hexutil.Uint64(5)
s.txServiceMock.EXPECT().GetTransactionCount(gomock.Any(), account.Address, gethrpc.PendingBlockNumber).Return(&nonce, nil)
s.txServiceMock.EXPECT().GasPrice(gomock.Any()).Return(big.NewInt(10), nil)
s.txServiceMock.EXPECT().EstimateGas(gomock.Any(), gomock.Any()).Return(&gas, nil)
s.txServiceMock.EXPECT().SendRawTransaction(gomock.Any(), gomock.Any()).Return(gethcommon.Hash{}, nil)
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{
From: common.FromAddress(TestConfig.Account1.Address),
To: common.ToAddress(TestConfig.Account2.Address),
})
c := manager.QueueTransaction(tx)
hash, err := manager.CompleteTransaction(tx.ID, password)
rst := manager.WaitForTransaction(tx, c)
s.NoError(err)
s.Equal(rst.Err, err)
s.Equal(rst.Hash, hash)
s.Equal(uint64(nonce)+1, manager.addrLock.localNonce[tx.Args.From])
}
8 changes: 0 additions & 8 deletions vendor/github.com/ethereum/go-ethereum/internal/ethapi/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 58a039f

Please sign in to comment.