From 2c91b135a7d6a1e508f5d0e214ab2cdd8406ba3c Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 5 Jan 2018 22:58:17 +0200 Subject: [PATCH] Result of tx processing returned as QueuedTxResult Currently it is quite easy to introduce concurrency issues while working with transaction object. For example, race issue will exist every time while transaction is processed in a separate goroutine and caller will try to check for an error before event to Done channel is sent. Current change removes all the data that is updated on transaction and leaves it with ID, Args and Context (which is not used at the moment). Result of transaction will be sent to a channel that is returned when transaction is enqueued. QueuedTxResult has a Hash and Err fields. --- geth/api/backend.go | 12 ++--- geth/common/types.go | 12 +++-- geth/common/types_mock.go | 10 ++-- geth/common/utils.go | 2 - geth/transactions/notifications.go | 8 +-- geth/transactions/queue/queue.go | 25 +++++---- geth/transactions/queue/queue_test.go | 50 ++++++++---------- geth/transactions/txqueue_manager.go | 64 +++++++++++------------ geth/transactions/txqueue_manager_test.go | 54 +++++++++++-------- 9 files changed, 118 insertions(+), 119 deletions(-) diff --git a/geth/api/backend.go b/geth/api/backend.go index 6b27d72a381..f402e1830be 100644 --- a/geth/api/backend.go +++ b/geth/api/backend.go @@ -206,16 +206,14 @@ func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxA } tx := common.CreateTransaction(ctx, args) + c := m.txQueueManager.QueueTransaction(tx) - if err := m.txQueueManager.QueueTransaction(tx); err != nil { - return gethcommon.Hash{}, err + rst := m.txQueueManager.WaitForTransaction(tx, c) + if rst.Err != nil { + return gethcommon.Hash{}, rst.Err } - if err := m.txQueueManager.WaitForTransaction(tx); err != nil { - return gethcommon.Hash{}, err - } - - return tx.Hash, nil + return rst.Hash, nil } // CompleteTransaction instructs backend to complete sending of a given transaction diff --git a/geth/common/types.go b/geth/common/types.go index 0ab5e87fe1f..3158c0a733e 100644 --- a/geth/common/types.go +++ b/geth/common/types.go @@ -183,12 +183,14 @@ type QueuedTxID string // QueuedTx holds enough information to complete the queued transaction. type QueuedTx struct { ID QueuedTxID - Hash common.Hash Context context.Context Args SendTxArgs - Done chan struct{} - Discard chan struct{} - Err error +} + +type QueuedTxResult struct { + Hash common.Hash + Err error + Tx QueuedTx } // SendTxArgs represents the arguments to submit a new transaction into the transaction pool. @@ -232,7 +234,7 @@ type TxQueueManager interface { QueueTransaction(tx *QueuedTx) error // WaitForTransactions blocks until transaction is completed, discarded or timed out. - WaitForTransaction(tx *QueuedTx) error + WaitForTransaction(tx *QueuedTx, c <-chan QueuedTxResult) QueuedTxResult SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) diff --git a/geth/common/types_mock.go b/geth/common/types_mock.go index 6bc4dd9c814..27b90a43b6b 100644 --- a/geth/common/types_mock.go +++ b/geth/common/types_mock.go @@ -522,15 +522,15 @@ func (mr *MockTxQueueManagerMockRecorder) QueueTransaction(tx interface{}) *gomo } // WaitForTransaction mocks base method -func (m *MockTxQueueManager) WaitForTransaction(tx *QueuedTx) error { - ret := m.ctrl.Call(m, "WaitForTransaction", tx) - ret0, _ := ret[0].(error) +func (m *MockTxQueueManager) WaitForTransaction(tx *QueuedTx, c <-chan QueuedTxResult) QueuedTxResult { + ret := m.ctrl.Call(m, "WaitForTransaction", tx, c) + ret0, _ := ret[0].(QueuedTxResult) return ret0 } // WaitForTransaction indicates an expected call of WaitForTransaction -func (mr *MockTxQueueManagerMockRecorder) WaitForTransaction(tx interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForTransaction", reflect.TypeOf((*MockTxQueueManager)(nil).WaitForTransaction), tx) +func (mr *MockTxQueueManagerMockRecorder) WaitForTransaction(tx, c interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForTransaction", reflect.TypeOf((*MockTxQueueManager)(nil).WaitForTransaction), tx, c) } // SendTransactionRPCHandler mocks base method diff --git a/geth/common/utils.go b/geth/common/utils.go index 9d01681810d..d7e98719197 100644 --- a/geth/common/utils.go +++ b/geth/common/utils.go @@ -157,9 +157,7 @@ func Fatalf(reason interface{}, args ...interface{}) { func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx { return &QueuedTx{ ID: QueuedTxID(uuid.New()), - Hash: common.Hash{}, Context: ctx, Args: args, - Done: make(chan struct{}, 1), } } diff --git a/geth/transactions/notifications.go b/geth/transactions/notifications.go index d6c58a126c4..a7cf4bb30a6 100644 --- a/geth/transactions/notifications.go +++ b/geth/transactions/notifications.go @@ -58,8 +58,8 @@ type ReturnSendTransactionEvent struct { } // NotifyOnReturn returns handler that processes responses from internal tx manager -func NotifyOnReturn(queuedTx *common.QueuedTx) { - if queuedTx.Err == nil { +func NotifyOnReturn(queuedTx *common.QueuedTx, err error) { + if err == nil { return } @@ -75,8 +75,8 @@ func NotifyOnReturn(queuedTx *common.QueuedTx) { ID: string(queuedTx.ID), Args: queuedTx.Args, MessageID: common.MessageIDFromContext(queuedTx.Context), - ErrorMessage: queuedTx.Err.Error(), - ErrorCode: sendTransactionErrorCode(queuedTx.Err), + ErrorMessage: err.Error(), + ErrorCode: sendTransactionErrorCode(err), }, }) } diff --git a/geth/transactions/queue/queue.go b/geth/transactions/queue/queue.go index 2cc09193530..d02dbb09cd6 100644 --- a/geth/transactions/queue/queue.go +++ b/geth/transactions/queue/queue.go @@ -44,9 +44,10 @@ type empty struct{} // TxQueue is capped container that holds pending transactions type TxQueue struct { - mu sync.RWMutex // to guard transactions map - transactions map[common.QueuedTxID]*common.QueuedTx - inprogress map[common.QueuedTxID]empty + mu sync.RWMutex // to guard transactions map + transactions map[common.QueuedTxID]*common.QueuedTx + inprogress map[common.QueuedTxID]empty + subscriptions map[common.QueuedTxID]chan common.QueuedTxResult // TODO don't use another goroutine for eviction evictableIDs chan common.QueuedTxID @@ -63,6 +64,7 @@ func NewQueue() *TxQueue { return &TxQueue{ transactions: make(map[common.QueuedTxID]*common.QueuedTx), inprogress: make(map[common.QueuedTxID]empty), + subscriptions: make(map[common.QueuedTxID]chan common.QueuedTxResult), evictableIDs: make(chan common.QueuedTxID, DefaultTxQueueCap), // will be used to evict in FIFO enqueueTicker: make(chan struct{}), } @@ -130,11 +132,8 @@ func (q *TxQueue) Reset() { } // Enqueue enqueues incoming transaction -func (q *TxQueue) Enqueue(tx *common.QueuedTx) error { +func (q *TxQueue) Enqueue(tx *common.QueuedTx) <-chan common.QueuedTxResult { log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID)) - if (tx.Hash != gethcommon.Hash{}) { - return ErrQueuedTxAlreadyProcessed - } log.Info("before enqueueTicker") q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item @@ -144,11 +143,13 @@ func (q *TxQueue) Enqueue(tx *common.QueuedTx) error { q.mu.Lock() q.transactions[tx.ID] = tx + c := make(chan common.QueuedTxResult, 1) + q.subscriptions[tx.ID] = c q.mu.Unlock() // notify handler log.Info("calling txEnqueueHandler") - return nil + return c } // Get returns transaction by transaction identifier @@ -176,6 +177,7 @@ func (q *TxQueue) Remove(id common.QueuedTxID) { func (q *TxQueue) remove(id common.QueuedTxID) { delete(q.transactions, id) delete(q.inprogress, id) + delete(q.subscriptions, id) } // Done removes transaction from queue if no error or error is not transient @@ -193,19 +195,16 @@ func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) er func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) { delete(q.inprogress, tx.ID) - tx.Err = err // hash is updated only if err is nil if err == nil { + q.subscriptions[tx.ID] <- common.QueuedTxResult{Hash: hash, Err: err} q.remove(tx.ID) - tx.Hash = hash - tx.Err = err - tx.Done <- struct{}{} return } _, transient := transientErrs[err.Error()] if !transient { + q.subscriptions[tx.ID] <- common.QueuedTxResult{Err: err} q.remove(tx.ID) - tx.Done <- struct{}{} } } diff --git a/geth/transactions/queue/queue_test.go b/geth/transactions/queue/queue_test.go index 5f0f917e740..214c23aeea0 100644 --- a/geth/transactions/queue/queue_test.go +++ b/geth/transactions/queue/queue_test.go @@ -32,7 +32,7 @@ func (s *QueueTestSuite) TearDownTest() { func (s *QueueTestSuite) TestGetTransaction() { tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) + s.queue.Enqueue(tx) enquedTx, err := s.queue.Get(tx.ID) s.NoError(err) s.Equal(tx, enquedTx) @@ -42,29 +42,22 @@ func (s *QueueTestSuite) TestGetTransaction() { s.Equal(ErrQueuedTxInProgress, err) } -func (s *QueueTestSuite) TestGetProcessedTransaction() { - // enqueue will fail if transaction with hash will be enqueued +func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) (*common.QueuedTx, <-chan common.QueuedTxResult) { tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) - tx.Hash = gethcommon.Hash{1} - s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx)) -} - -func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.QueuedTx { - tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) + c := s.queue.Enqueue(tx) s.NoError(s.queue.Done(tx.ID, hash, err)) - return tx + return tx, c } func (s *QueueTestSuite) TestDoneSuccess() { hash := gethcommon.Hash{1} - tx := s.testDone(hash, nil) - s.NoError(tx.Err) - s.Equal(hash, tx.Hash) - s.False(s.queue.Has(tx.ID)) + tx, c := s.testDone(hash, nil) // event is sent only if transaction was removed from a queue select { - case <-tx.Done: + case rst := <-c: + s.NoError(rst.Err) + s.Equal(hash, rst.Hash) + s.False(s.queue.Has(tx.ID)) default: s.Fail("No event was sent to Done channel") } @@ -73,24 +66,23 @@ func (s *QueueTestSuite) TestDoneSuccess() { func (s *QueueTestSuite) TestDoneTransientError() { hash := gethcommon.Hash{1} err := keystore.ErrDecrypt - tx := s.testDone(hash, err) - s.Equal(keystore.ErrDecrypt, tx.Err) - s.NotEqual(hash, tx.Hash) - s.Equal(gethcommon.Hash{}, tx.Hash) + tx, _ := s.testDone(hash, err) s.True(s.queue.Has(tx.ID)) + _, inp := s.queue.inprogress[tx.ID] + s.False(inp) } func (s *QueueTestSuite) TestDoneError() { hash := gethcommon.Hash{1} err := errors.New("test") - tx := s.testDone(hash, err) - s.Equal(err, tx.Err) - s.NotEqual(hash, tx.Hash) - s.Equal(gethcommon.Hash{}, tx.Hash) - s.False(s.queue.Has(tx.ID)) + tx, c := s.testDone(hash, err) // event is sent only if transaction was removed from a queue select { - case <-tx.Done: + case rst := <-c: + s.Equal(err, rst.Err) + s.NotEqual(hash, rst.Hash) + s.Equal(gethcommon.Hash{}, rst.Hash) + s.False(s.queue.Has(tx.ID)) default: s.Fail("No event was sent to Done channel") } @@ -99,7 +91,7 @@ func (s *QueueTestSuite) TestDoneError() { func (s QueueTestSuite) TestMultipleDone() { hash := gethcommon.Hash{1} err := keystore.ErrDecrypt - tx := s.testDone(hash, err) + tx, _ := s.testDone(hash, err) s.NoError(s.queue.Done(tx.ID, hash, nil)) s.Equal(ErrQueuedTxIDNotFound, s.queue.Done(tx.ID, hash, errors.New("timeout"))) } @@ -111,11 +103,11 @@ func (s *QueueTestSuite) TestEviction() { if first == nil { first = tx } - s.NoError(s.queue.Enqueue(tx)) + s.queue.Enqueue(tx) } s.Equal(DefaultTxQueueCap, s.queue.Count()) tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) + s.queue.Enqueue(tx) s.Equal(DefaultTxQueueCap, s.queue.Count()) s.False(s.queue.Has(first.ID)) } diff --git a/geth/transactions/txqueue_manager.go b/geth/transactions/txqueue_manager.go index c973abdda02..f93a614481b 100644 --- a/geth/transactions/txqueue_manager.go +++ b/geth/transactions/txqueue_manager.go @@ -25,22 +25,24 @@ const ( // Manager provides means to manage internal Status Backend (injected into LES) type Manager struct { - nodeManager common.NodeManager - accountManager common.AccountManager - txQueue *queue.TxQueue - ethTxClient EthTransactor - addrLock *AddrLocker - notify bool + nodeManager common.NodeManager + accountManager common.AccountManager + txQueue *queue.TxQueue + ethTxClient EthTransactor + addrLock *AddrLocker + notify bool + sendCompletionTimeout time.Duration } // NewManager returns a new Manager. func NewManager(nodeManager common.NodeManager, accountManager common.AccountManager) *Manager { return &Manager{ - nodeManager: nodeManager, - accountManager: accountManager, - txQueue: queue.NewQueue(), - addrLock: &AddrLocker{}, - notify: true, + nodeManager: nodeManager, + accountManager: accountManager, + txQueue: queue.NewQueue(), + addrLock: &AddrLocker{}, + notify: true, + sendCompletionTimeout: DefaultTxSendCompletionTimeout * time.Second, } } @@ -69,40 +71,42 @@ func (m *Manager) TransactionQueue() common.TxQueue { } // QueueTransaction puts a transaction into the queue. -func (m *Manager) QueueTransaction(tx *common.QueuedTx) error { +func (m *Manager) QueueTransaction(tx *common.QueuedTx) <-chan common.QueuedTxResult { to := "" if tx.Args.To != nil { to = tx.Args.To.Hex() } log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to) - err := m.txQueue.Enqueue(tx) + c := m.txQueue.Enqueue(tx) if m.notify { NotifyOnEnqueue(tx) } - return err + return c } func (m *Manager) txDone(tx *common.QueuedTx, hash gethcommon.Hash, err error) { m.txQueue.Done(tx.ID, hash, err) //nolint: errcheck if m.notify { - NotifyOnReturn(tx) + NotifyOnReturn(tx, err) } } // WaitForTransaction adds a transaction to the queue and blocks // until it's completed, discarded or times out. -func (m *Manager) WaitForTransaction(tx *common.QueuedTx) error { +func (m *Manager) WaitForTransaction(tx *common.QueuedTx, c <-chan common.QueuedTxResult) common.QueuedTxResult { log.Info("wait for transaction", "id", tx.ID) // now wait up until transaction is: // - completed (via CompleteQueuedTransaction), // - discarded (via DiscardQueuedTransaction) // - or times out - select { - case <-tx.Done: - case <-time.After(DefaultTxSendCompletionTimeout * time.Second): - m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut) + for { + select { + case rst := <-c: + return rst + case <-time.After(m.sendCompletionTimeout): + m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut) + } } - return tx.Err } // CompleteTransaction instructs backend to complete sending of a given transaction. @@ -240,7 +244,7 @@ func (m *Manager) DiscardTransaction(id common.QueuedTxID) error { } err = m.txQueue.Done(id, gethcommon.Hash{}, queue.ErrQueuedTxDiscarded) if m.notify { - NotifyOnReturn(tx) + NotifyOnReturn(tx, queue.ErrQueuedTxDiscarded) } return err } @@ -265,19 +269,15 @@ func (m *Manager) DiscardTransactions(ids []common.QueuedTxID) map[common.Queued // It accepts one param which is a slice with a map of transaction params. func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) { log.Info("SendTransactionRPCHandler called") - // TODO(adam): it's a hack to parse arguments as common.RPCCall can do that. // We should refactor parsing these params to a separate struct. rpcCall := common.RPCCall{Params: args} tx := common.CreateTransaction(ctx, rpcCall.ToSendTxArgs()) - - if err := m.QueueTransaction(tx); err != nil { - return nil, err + c := m.QueueTransaction(tx) + rst := m.WaitForTransaction(tx, c) + if rst.Err != nil { + return nil, rst.Err } - - if err := m.WaitForTransaction(tx); err != nil { - return nil, err - } - - return tx.Hash.Hex(), nil + // handle empty hash + return rst.Hash.Hex(), nil } diff --git a/geth/transactions/txqueue_manager_test.go b/geth/transactions/txqueue_manager_test.go index 3e0af0efdaa..5c781671ae1 100644 --- a/geth/transactions/txqueue_manager_test.go +++ b/geth/transactions/txqueue_manager_test.go @@ -5,6 +5,7 @@ import ( "math/big" "sync" "testing" + "time" "github.com/ethereum/go-ethereum/accounts/keystore" gethcommon "github.com/ethereum/go-ethereum/common" @@ -98,8 +99,7 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + c := txQueueManager.QueueTransaction(tx) w := make(chan struct{}) go func() { @@ -108,10 +108,9 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() { close(w) }() - err = txQueueManager.WaitForTransaction(tx) - s.NoError(err) + rst := txQueueManager.WaitForTransaction(tx, c) // Check that error is assigned to the transaction. - s.NoError(tx.Err) + s.NoError(rst.Err) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) <-w @@ -141,8 +140,7 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + c := txQueueManager.QueueTransaction(tx) var wg sync.WaitGroup var mu sync.Mutex @@ -166,10 +164,9 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { }() } - err = txQueueManager.WaitForTransaction(tx) - s.NoError(err) + rst := txQueueManager.WaitForTransaction(tx, c) // Check that error is assigned to the transaction. - s.NoError(tx.Err) + s.NoError(rst.Err) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) @@ -195,10 +192,9 @@ func (s *TxQueueTestSuite) TestAccountMismatch() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + txQueueManager.QueueTransaction(tx) - _, err = txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password) + _, err := txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password) s.Equal(err, queue.ErrInvalidCompleteTxSender) // Transaction should stay in the queue as mismatched accounts @@ -230,10 +226,9 @@ func (s *TxQueueTestSuite) TestInvalidPassword() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + txQueueManager.QueueTransaction(tx) - _, err = txQueueManager.CompleteTransaction(tx.ID, password) + _, err := txQueueManager.CompleteTransaction(tx.ID, password) s.Equal(err.Error(), keystore.ErrDecrypt.Error()) // Transaction should stay in the queue as mismatched accounts @@ -253,8 +248,7 @@ func (s *TxQueueTestSuite) TestDiscardTransaction() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + c := txQueueManager.QueueTransaction(tx) w := make(chan struct{}) go func() { discardErr := txQueueManager.DiscardTransaction(tx.ID) @@ -262,11 +256,27 @@ func (s *TxQueueTestSuite) TestDiscardTransaction() { close(w) }() - err = txQueueManager.WaitForTransaction(tx) - s.Equal(queue.ErrQueuedTxDiscarded, err) - // Check that error is assigned to the transaction. - s.Equal(queue.ErrQueuedTxDiscarded, tx.Err) + rst := txQueueManager.WaitForTransaction(tx, c) + s.Equal(queue.ErrQueuedTxDiscarded, rst.Err) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) <-w } + +func (s *TxQueueTestSuite) TestCompletionTimedOut() { + txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) + txQueueManager.DisableNotificactions() + txQueueManager.sendCompletionTimeout = time.Nanosecond + + txQueueManager.Start() + defer txQueueManager.Stop() + + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ + From: common.FromAddress(TestConfig.Account1.Address), + To: common.ToAddress(TestConfig.Account2.Address), + }) + + c := txQueueManager.QueueTransaction(tx) + rst := txQueueManager.WaitForTransaction(tx, c) + s.Equal(queue.ErrQueuedTxTimedOut, rst.Err) +}