From 0cc12138745e2b0372d039f2cc3b1e968bf9a421 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Wed, 3 Jan 2018 17:26:46 +0200 Subject: [PATCH 1/4] General refactoring of txqueue and manager Changes to TxQueue: - factored out into a separate module to ensure that only exported methods will be used - deleted EnqueuAsync as it is not used, it is always possible to restore it from git, if it will be ever needed - notifications moved to a manager, with the goal to simplify txqueue and separate concerns between manager and txqueue - simplified API for processing transactions After analyzing code I removed all redundant methods and currently all the processing can be done with Get and Done methods. - added inprogress map to store transactions that are taken for processing and removed Inprogress variable from transaction, the goal is to cleanup tx structure - removed Discard channel from transaction, transaction.Err can be used to understand if transaction was discarded or failed due to another error - transient errors stored as global variable of queue module for simplicity, it is unlikely that they will become dynamic Change to Manager: - simplified code that manages notifications, all handlers were removed and notifiers refactored to be simple functions that are called in two places, when transaction is queued and when manager finished waiting for transaction - notifications can be turned of by calling DisableNotifications on manager - made a function out of CreateTransaction method, as it can be used as a simple utility --- Makefile | 2 +- e2e/jail/jail_rpc_test.go | 6 +- e2e/transactions/transactions_test.go | 80 +++--- geth/api/backend.go | 12 +- geth/common/types.go | 38 +-- geth/common/types_mock.go | 76 +---- geth/common/utils.go | 12 + geth/{txqueue => transactions}/addrlock.go | 2 +- geth/{txqueue => transactions}/ethtxclient.go | 2 +- geth/{txqueue => transactions}/fake/mock.go | 2 +- .../fake/txservice.go | 0 geth/transactions/notifications.go | 89 ++++++ .../queue/queue.go} | 185 +++++-------- geth/transactions/queue/queue_test.go | 131 +++++++++ geth/{txqueue => transactions/queue}/utils.go | 2 +- .../txqueue_manager.go | 260 +++++------------- .../txqueue_manager_test.go | 89 ++---- lib/utils.go | 37 +-- 18 files changed, 470 insertions(+), 555 deletions(-) rename geth/{txqueue => transactions}/addrlock.go (98%) rename geth/{txqueue => transactions}/ethtxclient.go (99%) rename geth/{txqueue => transactions}/fake/mock.go (98%) rename geth/{txqueue => transactions}/fake/txservice.go (100%) create mode 100644 geth/transactions/notifications.go rename geth/{txqueue/txqueue.go => transactions/queue/queue.go} (56%) create mode 100644 geth/transactions/queue/queue_test.go rename geth/{txqueue => transactions/queue}/utils.go (97%) rename geth/{txqueue => transactions}/txqueue_manager.go (52%) rename geth/{txqueue => transactions}/txqueue_manager_test.go (75%) diff --git a/Makefile b/Makefile index 43d17db4d05..b848d490115 100644 --- a/Makefile +++ b/Makefile @@ -111,7 +111,7 @@ mock: ##@other Regenerate mocks mockgen -source=geth/mailservice/mailservice.go -destination=geth/mailservice/mailservice_mock.go -package=mailservice mockgen -source=geth/common/notification.go -destination=geth/common/notification_mock.go -package=common -imports fcm=github.com/NaySoftware/go-fcm mockgen -source=geth/notification/fcm/client.go -destination=geth/notification/fcm/client_mock.go -package=fcm -imports fcm=github.com/NaySoftware/go-fcm - mockgen -source=geth/txqueue/fake/txservice.go -destination=geth/txqueue/fake/mock.go -package=fake + mockgen -source=geth/transactions/fake/txservice.go -destination=geth/transactions/fake/mock.go -package=fake test: test-unit-coverage ##@tests Run basic, short tests during development diff --git a/e2e/jail/jail_rpc_test.go b/e2e/jail/jail_rpc_test.go index 2b9f99d87f9..96c7c4ed046 100644 --- a/e2e/jail/jail_rpc_test.go +++ b/e2e/jail/jail_rpc_test.go @@ -14,7 +14,7 @@ import ( "github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/signal" - "github.com/status-im/status-go/geth/txqueue" + "github.com/status-im/status-go/geth/transactions" . "github.com/status-im/status-go/testing" "github.com/stretchr/testify/suite" ) @@ -126,7 +126,7 @@ func (s *JailRPCTestSuite) TestContractDeployment() { unmarshalErr := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(unmarshalErr, "cannot unmarshal JSON: %s", jsonEvent) - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) s.T().Logf("transaction queued and will be completed shortly, id: %v", event["id"]) @@ -284,7 +284,7 @@ func (s *JailRPCTestSuite) TestJailVMPersistence() { s.T().Errorf("cannot unmarshal event's JSON: %s", jsonEvent) return } - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) s.T().Logf("Transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string)) diff --git a/e2e/transactions/transactions_test.go b/e2e/transactions/transactions_test.go index b3533d37dc0..740d8b9eef4 100644 --- a/e2e/transactions/transactions_test.go +++ b/e2e/transactions/transactions_test.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "reflect" + "sync" "testing" "time" @@ -18,7 +19,8 @@ import ( "github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/signal" - "github.com/status-im/status-go/geth/txqueue" + "github.com/status-im/status-go/geth/transactions" + "github.com/status-im/status-go/geth/transactions/queue" . "github.com/status-im/status-go/testing" "github.com/stretchr/testify/suite" ) @@ -48,7 +50,7 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransaction() { err := json.Unmarshal([]byte(rawSignal), &sg) s.NoError(err) - if sg.Type == txqueue.EventTransactionQueued { + if sg.Type == transactions.EventTransactionQueued { event := sg.Event.(map[string]interface{}) txID := event["id"].(string) txHash, err = s.Backend.CompleteTransaction(common.QueuedTxID(txID), TestConfig.Account1.Password) @@ -100,7 +102,7 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransactionUpstream() { err := json.Unmarshal([]byte(rawSignal), &signalEnvelope) s.NoError(err) - if signalEnvelope.Type == txqueue.EventTransactionQueued { + if signalEnvelope.Type == transactions.EventTransactionQueued { event := signalEnvelope.Event.(map[string]interface{}) txID := event["id"].(string) @@ -156,7 +158,7 @@ func (s *TransactionsTestSuite) TestSendContractTx() { err = json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string)) @@ -182,7 +184,7 @@ func (s *TransactionsTestSuite) TestSendContractTx() { ) s.EqualError( err, - txqueue.ErrInvalidCompleteTxSender.Error(), + queue.ErrInvalidCompleteTxSender.Error(), fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]), ) @@ -247,7 +249,7 @@ func (s *TransactionsTestSuite) TestSendEther() { err = json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string)) @@ -271,7 +273,7 @@ func (s *TransactionsTestSuite) TestSendEther() { common.QueuedTxID(event["id"].(string)), TestConfig.Account1.Password) s.EqualError( err, - txqueue.ErrInvalidCompleteTxSender.Error(), + queue.ErrInvalidCompleteTxSender.Error(), fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]), ) @@ -330,7 +332,7 @@ func (s *TransactionsTestSuite) TestSendEtherTxUpstream() { err = json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, "cannot unmarshal JSON: %s", jsonEvent) - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string)) @@ -387,7 +389,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() { err := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID := common.QueuedTxID(event["id"].(string)) log.Info("transaction queued (will be failed and completed on the second call)", "id", txID) @@ -407,7 +409,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() { close(completeQueuedTransaction) } - if envelope.Type == txqueue.EventTransactionFailed { + if envelope.Type == transactions.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) log.Info("transaction return event received", "id", event["id"].(string)) @@ -466,7 +468,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { err := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID := common.QueuedTxID(event["id"].(string)) log.Info("transaction queued (will be discarded soon)", "id", txID) @@ -488,12 +490,12 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { close(completeQueuedTransaction) } - if envelope.Type == txqueue.EventTransactionFailed { + if envelope.Type == transactions.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) log.Info("transaction return event received", "id", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error() + expectedErrMessage := queue.ErrQueuedTxDiscarded.Error() s.Equal(receivedErrMessage, expectedErrMessage) receivedErrCode := event["error_code"].(string) @@ -509,7 +511,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { To: common.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - s.EqualError(err, txqueue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded") + s.EqualError(err, queue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded") select { case <-completeQueuedTransaction: @@ -543,7 +545,7 @@ func (s *TransactionsTestSuite) TestCompleteMultipleQueuedTransactions() { err := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID := common.QueuedTxID(event["id"].(string)) log.Info("transaction queued (will be completed in a single call, once aggregated)", "id", txID) @@ -640,7 +642,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { var envelope signal.Envelope err := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err) - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID := common.QueuedTxID(event["id"].(string)) log.Info("transaction queued (will be discarded soon)", "id", txID) @@ -650,12 +652,12 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { txIDs <- txID } - if envelope.Type == txqueue.EventTransactionFailed { + if envelope.Type == transactions.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) log.Info("transaction return event received", "id", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error() + expectedErrMessage := queue.ErrQueuedTxDiscarded.Error() s.Equal(receivedErrMessage, expectedErrMessage) receivedErrCode := event["error_code"].(string) @@ -675,7 +677,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { To: common.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - s.EqualError(err, txqueue.ErrQueuedTxDiscarded.Error()) + s.EqualError(err, queue.ErrQueuedTxDiscarded.Error()) s.True(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction returned hash, while it shouldn't") } @@ -747,7 +749,7 @@ func (s *TransactionsTestSuite) TestNonExistentQueuedTransactions() { // try completing non-existing transaction _, err := s.Backend.CompleteTransaction("some-bad-transaction-id", TestConfig.Account1.Password) s.Error(err, "error expected and not received") - s.EqualError(err, txqueue.ErrQueuedTxIDNotFound.Error()) + s.EqualError(err, queue.ErrQueuedTxIDNotFound.Error()) } func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() { @@ -756,6 +758,24 @@ func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() { backend := s.LightEthereumService().StatusBackend s.NotNil(backend) + var m sync.Mutex + txCount := 0 + txIDs := [queue.DefaultTxQueueCap + 5 + 10]common.QueuedTxID{} + + signal.SetDefaultNodeNotificationHandler(func(rawSignal string) { + var sg signal.Envelope + err := json.Unmarshal([]byte(rawSignal), &sg) + s.NoError(err) + + if sg.Type == transactions.EventTransactionQueued { + event := sg.Event.(map[string]interface{}) + txID := event["id"].(string) + m.Lock() + txIDs[txCount] = common.QueuedTxID(txID) + txCount++ + m.Unlock() + } + }) // reset queue s.Backend.TxQueueManager().TransactionQueue().Reset() @@ -764,36 +784,26 @@ func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() { s.NoError(s.Backend.AccountManager().SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)) txQueue := s.Backend.TxQueueManager().TransactionQueue() - var i = 0 - txIDs := [txqueue.DefaultTxQueueCap + 5 + 10]common.QueuedTxID{} - s.Backend.TxQueueManager().SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) { - log.Info("tx enqueued", "i", i+1, "queue size", txQueue.Count(), "id", queuedTx.ID) - txIDs[i] = queuedTx.ID - i++ - }) - s.Zero(txQueue.Count(), "transaction count should be zero") for j := 0; j < 10; j++ { go s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{}) // nolint: errcheck } - time.Sleep(2 * time.Second) // FIXME(tiabc): more reliable synchronization to ensure all transactions are enqueued - - log.Info(fmt.Sprintf("Number of transactions queued: %d. Queue size (shouldn't be more than %d): %d", - i, txqueue.DefaultTxQueueCap, txQueue.Count())) + time.Sleep(2 * time.Second) + log.Info(fmt.Sprintf("Number of transactions sent: %d. Queue size (shouldn't be more than %d): %d", + txCount, queue.DefaultTxQueueCap, txQueue.Count())) s.Equal(10, txQueue.Count(), "transaction count should be 10") - for i := 0; i < txqueue.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines + for i := 0; i < queue.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines go s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{}) // nolint: errcheck } time.Sleep(5 * time.Second) - s.True(txQueue.Count() <= txqueue.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", txqueue.DefaultTxQueueCap, txqueue.DefaultTxQueueCap-1, txQueue.Count()) + s.True(txQueue.Count() <= queue.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", queue.DefaultTxQueueCap, queue.DefaultTxQueueCap-1, txQueue.Count()) for _, txID := range txIDs { txQueue.Remove(txID) } - s.Zero(txQueue.Count(), "transaction count should be zero: %d", txQueue.Count()) } diff --git a/geth/api/backend.go b/geth/api/backend.go index fdc61930cdb..6b27d72a381 100644 --- a/geth/api/backend.go +++ b/geth/api/backend.go @@ -13,7 +13,7 @@ import ( "github.com/status-im/status-go/geth/notification/fcm" "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/signal" - "github.com/status-im/status-go/geth/txqueue" + "github.com/status-im/status-go/geth/transactions" ) const ( @@ -38,7 +38,7 @@ func NewStatusBackend() *StatusBackend { nodeManager := node.NewNodeManager() accountManager := account.NewManager(nodeManager) - txQueueManager := txqueue.NewManager(nodeManager, accountManager) + txQueueManager := transactions.NewManager(nodeManager, accountManager) jailManager := jail.New(nodeManager) notificationManager := fcm.NewNotification(fcmServerKey) @@ -205,7 +205,7 @@ func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxA ctx = context.Background() } - tx := m.txQueueManager.CreateTransaction(ctx, args) + tx := common.CreateTransaction(ctx, args) if err := m.txQueueManager.QueueTransaction(tx); err != nil { return gethcommon.Hash{}, err @@ -247,11 +247,5 @@ func (m *StatusBackend) registerHandlers() error { rpcClient.RegisterHandler("eth_accounts", m.accountManager.AccountsRPCHandler()) rpcClient.RegisterHandler("eth_sendTransaction", m.txQueueManager.SendTransactionRPCHandler) - m.txQueueManager.SetTransactionQueueHandler(m.txQueueManager.TransactionQueueHandler()) - log.Info("Registered handler", "fn", "TransactionQueueHandler") - - m.txQueueManager.SetTransactionReturnHandler(m.txQueueManager.TransactionReturnHandler()) - log.Info("Registered handler", "fn", "TransactionReturnHandler") - return nil } diff --git a/geth/common/types.go b/geth/common/types.go index 2ceae371899..e81800fefdb 100644 --- a/geth/common/types.go +++ b/geth/common/types.go @@ -154,14 +154,12 @@ type QueuedTxID string // QueuedTx holds enough information to complete the queued transaction. type QueuedTx struct { - ID QueuedTxID - Hash common.Hash - Context context.Context - Args SendTxArgs - InProgress bool // true if transaction is being sent - Done chan struct{} - Discard chan struct{} - Err error + ID QueuedTxID + Hash common.Hash + Context context.Context + Args SendTxArgs + Done chan struct{} + Err error } // SendTxArgs represents the arguments to submit a new transaction into the transaction pool. @@ -175,12 +173,6 @@ type SendTxArgs struct { Nonce *hexutil.Uint64 `json:"nonce"` } -// EnqueuedTxHandler is a function that receives queued/pending transactions, when they get queued -type EnqueuedTxHandler func(*QueuedTx) - -// EnqueuedTxReturnHandler is a function that receives response when tx is complete (both on success and error) -type EnqueuedTxReturnHandler func(*QueuedTx, error) - // TxQueue is a queue of transactions. type TxQueue interface { // Remove removes a transaction from the queue. @@ -207,32 +199,14 @@ type TxQueueManager interface { // TransactionQueue returns a transaction queue. TransactionQueue() TxQueue - // CreateTransactoin creates a new transaction. - CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx - // QueueTransaction adds a new transaction to the queue. QueueTransaction(tx *QueuedTx) error // WaitForTransactions blocks until transaction is completed, discarded or timed out. WaitForTransaction(tx *QueuedTx) error - // NotifyOnQueuedTxReturn notifies a handler when a transaction returns. - NotifyOnQueuedTxReturn(queuedTx *QueuedTx, err error) - - // TransactionQueueHandler returns handler that processes incoming tx queue requests - TransactionQueueHandler() func(queuedTx *QueuedTx) - - // TODO(adam): might be not needed - SetTransactionQueueHandler(fn EnqueuedTxHandler) - - // TODO(adam): might be not needed - SetTransactionReturnHandler(fn EnqueuedTxReturnHandler) - SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) - // TransactionReturnHandler returns handler that processes responses from internal tx manager - TransactionReturnHandler() func(queuedTx *QueuedTx, err error) - // CompleteTransaction instructs backend to complete sending of a given transaction CompleteTransaction(id QueuedTxID, password string) (common.Hash, error) diff --git a/geth/common/types_mock.go b/geth/common/types_mock.go index eb00e8fb0f9..6bc4dd9c814 100644 --- a/geth/common/types_mock.go +++ b/geth/common/types_mock.go @@ -509,18 +509,6 @@ func (mr *MockTxQueueManagerMockRecorder) TransactionQueue() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionQueue", reflect.TypeOf((*MockTxQueueManager)(nil).TransactionQueue)) } -// CreateTransaction mocks base method -func (m *MockTxQueueManager) CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx { - ret := m.ctrl.Call(m, "CreateTransaction", ctx, args) - ret0, _ := ret[0].(*QueuedTx) - return ret0 -} - -// CreateTransaction indicates an expected call of CreateTransaction -func (mr *MockTxQueueManagerMockRecorder) CreateTransaction(ctx, args interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTransaction", reflect.TypeOf((*MockTxQueueManager)(nil).CreateTransaction), ctx, args) -} - // QueueTransaction mocks base method func (m *MockTxQueueManager) QueueTransaction(tx *QueuedTx) error { ret := m.ctrl.Call(m, "QueueTransaction", tx) @@ -545,48 +533,6 @@ func (mr *MockTxQueueManagerMockRecorder) WaitForTransaction(tx interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForTransaction", reflect.TypeOf((*MockTxQueueManager)(nil).WaitForTransaction), tx) } -// NotifyOnQueuedTxReturn mocks base method -func (m *MockTxQueueManager) NotifyOnQueuedTxReturn(queuedTx *QueuedTx, err error) { - m.ctrl.Call(m, "NotifyOnQueuedTxReturn", queuedTx, err) -} - -// NotifyOnQueuedTxReturn indicates an expected call of NotifyOnQueuedTxReturn -func (mr *MockTxQueueManagerMockRecorder) NotifyOnQueuedTxReturn(queuedTx, err interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyOnQueuedTxReturn", reflect.TypeOf((*MockTxQueueManager)(nil).NotifyOnQueuedTxReturn), queuedTx, err) -} - -// TransactionQueueHandler mocks base method -func (m *MockTxQueueManager) TransactionQueueHandler() func(*QueuedTx) { - ret := m.ctrl.Call(m, "TransactionQueueHandler") - ret0, _ := ret[0].(func(*QueuedTx)) - return ret0 -} - -// TransactionQueueHandler indicates an expected call of TransactionQueueHandler -func (mr *MockTxQueueManagerMockRecorder) TransactionQueueHandler() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionQueueHandler", reflect.TypeOf((*MockTxQueueManager)(nil).TransactionQueueHandler)) -} - -// SetTransactionQueueHandler mocks base method -func (m *MockTxQueueManager) SetTransactionQueueHandler(fn EnqueuedTxHandler) { - m.ctrl.Call(m, "SetTransactionQueueHandler", fn) -} - -// SetTransactionQueueHandler indicates an expected call of SetTransactionQueueHandler -func (mr *MockTxQueueManagerMockRecorder) SetTransactionQueueHandler(fn interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTransactionQueueHandler", reflect.TypeOf((*MockTxQueueManager)(nil).SetTransactionQueueHandler), fn) -} - -// SetTransactionReturnHandler mocks base method -func (m *MockTxQueueManager) SetTransactionReturnHandler(fn EnqueuedTxReturnHandler) { - m.ctrl.Call(m, "SetTransactionReturnHandler", fn) -} - -// SetTransactionReturnHandler indicates an expected call of SetTransactionReturnHandler -func (mr *MockTxQueueManagerMockRecorder) SetTransactionReturnHandler(fn interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTransactionReturnHandler", reflect.TypeOf((*MockTxQueueManager)(nil).SetTransactionReturnHandler), fn) -} - // SendTransactionRPCHandler mocks base method func (m *MockTxQueueManager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) { varargs := []interface{}{ctx} @@ -605,18 +551,6 @@ func (mr *MockTxQueueManagerMockRecorder) SendTransactionRPCHandler(ctx interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendTransactionRPCHandler", reflect.TypeOf((*MockTxQueueManager)(nil).SendTransactionRPCHandler), varargs...) } -// TransactionReturnHandler mocks base method -func (m *MockTxQueueManager) TransactionReturnHandler() func(*QueuedTx, error) { - ret := m.ctrl.Call(m, "TransactionReturnHandler") - ret0, _ := ret[0].(func(*QueuedTx, error)) - return ret0 -} - -// TransactionReturnHandler indicates an expected call of TransactionReturnHandler -func (mr *MockTxQueueManagerMockRecorder) TransactionReturnHandler() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionReturnHandler", reflect.TypeOf((*MockTxQueueManager)(nil).TransactionReturnHandler)) -} - // CompleteTransaction mocks base method func (m *MockTxQueueManager) CompleteTransaction(id QueuedTxID, password string) (common.Hash, error) { ret := m.ctrl.Call(m, "CompleteTransaction", id, password) @@ -666,6 +600,16 @@ func (mr *MockTxQueueManagerMockRecorder) DiscardTransactions(ids interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DiscardTransactions", reflect.TypeOf((*MockTxQueueManager)(nil).DiscardTransactions), ids) } +// DisableNotificactions mocks base method +func (m *MockTxQueueManager) DisableNotificactions() { + m.ctrl.Call(m, "DisableNotificactions") +} + +// DisableNotificactions indicates an expected call of DisableNotificactions +func (mr *MockTxQueueManagerMockRecorder) DisableNotificactions() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DisableNotificactions", reflect.TypeOf((*MockTxQueueManager)(nil).DisableNotificactions)) +} + // MockJailCell is a mock of JailCell interface type MockJailCell struct { ctrl *gomock.Controller diff --git a/geth/common/utils.go b/geth/common/utils.go index 8bedfd2af3a..01702d44644 100644 --- a/geth/common/utils.go +++ b/geth/common/utils.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" + "github.com/pborman/uuid" "github.com/status-im/status-go/geth/log" "github.com/status-im/status-go/static" ) @@ -151,3 +152,14 @@ func Fatalf(reason interface{}, args ...interface{}) { os.Exit(1) } + +// CreateTransaction returns a transaction object. +func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx { + return &QueuedTx{ + ID: QueuedTxID(uuid.New()), + Hash: common.Hash{}, + Context: ctx, + Args: args, + Done: make(chan struct{}, 1), + } +} diff --git a/geth/txqueue/addrlock.go b/geth/transactions/addrlock.go similarity index 98% rename from geth/txqueue/addrlock.go rename to geth/transactions/addrlock.go index dd08c7f4343..69a7a922963 100644 --- a/geth/txqueue/addrlock.go +++ b/geth/transactions/addrlock.go @@ -1,6 +1,6 @@ // copy of go-ethereum/internal/ethapi/addrlock.go -package txqueue +package transactions import ( "sync" diff --git a/geth/txqueue/ethtxclient.go b/geth/transactions/ethtxclient.go similarity index 99% rename from geth/txqueue/ethtxclient.go rename to geth/transactions/ethtxclient.go index baf240e96da..0f49e72902c 100644 --- a/geth/txqueue/ethtxclient.go +++ b/geth/transactions/ethtxclient.go @@ -1,4 +1,4 @@ -package txqueue +package transactions import ( "context" diff --git a/geth/txqueue/fake/mock.go b/geth/transactions/fake/mock.go similarity index 98% rename from geth/txqueue/fake/mock.go rename to geth/transactions/fake/mock.go index 0e002858ec1..6b4218978d1 100644 --- a/geth/txqueue/fake/mock.go +++ b/geth/transactions/fake/mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: geth/txqueue/fake/txservice.go +// Source: geth/transactions/fake/txservice.go // Package fake is a generated GoMock package. package fake diff --git a/geth/txqueue/fake/txservice.go b/geth/transactions/fake/txservice.go similarity index 100% rename from geth/txqueue/fake/txservice.go rename to geth/transactions/fake/txservice.go diff --git a/geth/transactions/notifications.go b/geth/transactions/notifications.go new file mode 100644 index 00000000000..d6c58a126c4 --- /dev/null +++ b/geth/transactions/notifications.go @@ -0,0 +1,89 @@ +package transactions + +import ( + "github.com/ethereum/go-ethereum/accounts/keystore" + "github.com/status-im/status-go/geth/common" + "github.com/status-im/status-go/geth/log" + "github.com/status-im/status-go/geth/signal" + "github.com/status-im/status-go/geth/transactions/queue" +) + +const ( + // EventTransactionQueued is triggered when send transaction request is queued + EventTransactionQueued = "transaction.queued" + // EventTransactionFailed is triggered when send transaction request fails + EventTransactionFailed = "transaction.failed" + + SendTransactionNoErrorCode = "0" + SendTransactionDefaultErrorCode = "1" + SendTransactionPasswordErrorCode = "2" + SendTransactionTimeoutErrorCode = "3" + SendTransactionDiscardedErrorCode = "4" +) + +var txReturnCodes = map[error]string{ // deliberately strings, in case more meaningful codes are to be returned + nil: SendTransactionNoErrorCode, + keystore.ErrDecrypt: SendTransactionPasswordErrorCode, + queue.ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode, + queue.ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode, +} + +// SendTransactionEvent is a signal sent on a send transaction request +type SendTransactionEvent struct { + ID string `json:"id"` + Args common.SendTxArgs `json:"args"` + MessageID string `json:"message_id"` +} + +// NotifyOnEnqueue returns handler that processes incoming tx queue requests +func NotifyOnEnqueue(queuedTx *common.QueuedTx) { + log.Info("calling TransactionQueueHandler") + signal.Send(signal.Envelope{ + Type: EventTransactionQueued, + Event: SendTransactionEvent{ + ID: string(queuedTx.ID), + Args: queuedTx.Args, + MessageID: common.MessageIDFromContext(queuedTx.Context), + }, + }) +} + +// ReturnSendTransactionEvent is a JSON returned whenever transaction send is returned +type ReturnSendTransactionEvent struct { + ID string `json:"id"` + Args common.SendTxArgs `json:"args"` + MessageID string `json:"message_id"` + ErrorMessage string `json:"error_message"` + ErrorCode string `json:"error_code"` +} + +// NotifyOnReturn returns handler that processes responses from internal tx manager +func NotifyOnReturn(queuedTx *common.QueuedTx) { + if queuedTx.Err == nil { + return + } + + // discard notifications with empty tx + if queuedTx == nil { + return + } + + // error occurred, signal up to application + signal.Send(signal.Envelope{ + Type: EventTransactionFailed, + Event: ReturnSendTransactionEvent{ + ID: string(queuedTx.ID), + Args: queuedTx.Args, + MessageID: common.MessageIDFromContext(queuedTx.Context), + ErrorMessage: queuedTx.Err.Error(), + ErrorCode: sendTransactionErrorCode(queuedTx.Err), + }, + }) +} + +func sendTransactionErrorCode(err error) string { + if code, ok := txReturnCodes[err]; ok { + return code + } + return SendTxDefaultErrorCode +} diff --git a/geth/txqueue/txqueue.go b/geth/transactions/queue/queue.go similarity index 56% rename from geth/txqueue/txqueue.go rename to geth/transactions/queue/queue.go index 05fd4826a9f..45223e04e10 100644 --- a/geth/txqueue/txqueue.go +++ b/geth/transactions/queue/queue.go @@ -1,4 +1,4 @@ -package txqueue +package queue import ( "errors" @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/keystore" gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/status-im/status-go/geth/account" "github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/log" ) @@ -15,10 +16,6 @@ import ( const ( // DefaultTxQueueCap defines how many items can be queued. DefaultTxQueueCap = int(35) - // DefaultTxSendQueueCap defines how many items can be passed to sendTransaction() w/o blocking. - DefaultTxSendQueueCap = int(70) - // DefaultTxSendCompletionTimeout defines how many seconds to wait before returning result in sentTransaction(). - DefaultTxSendCompletionTimeout = 300 ) var ( @@ -36,33 +33,39 @@ var ( ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it") ) +// remove from queue on any error (except for transient ones) and propagate +// defined as map[string]bool because errors from ethclient returned wrapped as jsonError +var transientErrs = map[string]bool{ + keystore.ErrDecrypt.Error(): true, // wrong password + ErrInvalidCompleteTxSender.Error(): true, // completing tx create from another account + account.ErrNoAccountSelected.Error(): true, // account not selected +} + +type empty struct{} + // TxQueue is capped container that holds pending transactions type TxQueue struct { - transactions map[common.QueuedTxID]*common.QueuedTx - mu sync.RWMutex // to guard transactions map + mu sync.RWMutex // to guard transactions map + transactions map[common.QueuedTxID]*common.QueuedTx + inprogress map[common.QueuedTxID]empty + + // TODO(dshulyak) research why eviction is done in separate goroutine evictableIDs chan common.QueuedTxID enqueueTicker chan struct{} - incomingPool chan *common.QueuedTx // when this channel is closed, all queue channels processing must cease (incoming queue, processing queued items etc) stopped chan struct{} stoppedGroup sync.WaitGroup // to make sure that all routines are stopped - - // when items are enqueued notify subscriber - txEnqueueHandler common.EnqueuedTxHandler - - // when tx is returned (either successfully or with error) notify subscriber - txReturnHandler common.EnqueuedTxReturnHandler } // NewTransactionQueue make new transaction queue -func NewTransactionQueue() *TxQueue { +func NewQueue() *TxQueue { log.Info("initializing transaction queue") return &TxQueue{ transactions: make(map[common.QueuedTxID]*common.QueuedTx), + inprogress: make(map[common.QueuedTxID]empty), evictableIDs: make(chan common.QueuedTxID, DefaultTxQueueCap), // will be used to evict in FIFO enqueueTicker: make(chan struct{}), - incomingPool: make(chan *common.QueuedTx, DefaultTxSendQueueCap), } } @@ -75,10 +78,8 @@ func (q *TxQueue) Start() { } q.stopped = make(chan struct{}) - q.stoppedGroup.Add(2) - + q.stoppedGroup.Add(1) go q.evictionLoop() - go q.enqueueLoop() } // Stop stops transaction enqueue and eviction loops @@ -100,7 +101,7 @@ func (q *TxQueue) Stop() { func (q *TxQueue) evictionLoop() { defer HaltOnPanic() evict := func() { - if len(q.transactions) >= DefaultTxQueueCap { // eviction is required to accommodate another/last item + if q.Count() >= DefaultTxQueueCap { // eviction is required to accommodate another/last item q.Remove(<-q.evictableIDs) } } @@ -119,26 +120,6 @@ func (q *TxQueue) evictionLoop() { } } -// enqueueLoop process incoming enqueue requests -func (q *TxQueue) enqueueLoop() { - defer HaltOnPanic() - - // enqueue incoming transactions - for { - select { - case queuedTx := <-q.incomingPool: - log.Info("transaction enqueued requested", "tx", queuedTx.ID) - err := q.Enqueue(queuedTx) - log.Warn("transaction enqueued error", "tx", err) - log.Info("transaction enqueued", "tx", queuedTx.ID) - case <-q.stopped: - log.Info("transaction queue's enqueue loop stopped") - q.stoppedGroup.Done() - return - } - } -} - // Reset is to be used in tests only, as it simply creates new transaction map, w/o any cleanup of the previous one func (q *TxQueue) Reset() { q.mu.Lock() @@ -146,22 +127,14 @@ func (q *TxQueue) Reset() { q.transactions = make(map[common.QueuedTxID]*common.QueuedTx) q.evictableIDs = make(chan common.QueuedTxID, DefaultTxQueueCap) -} - -// EnqueueAsync enqueues incoming transaction in async manner, returns as soon as possible -func (q *TxQueue) EnqueueAsync(tx *common.QueuedTx) error { - q.incomingPool <- tx - - return nil + q.inprogress = make(map[common.QueuedTxID]empty) } // Enqueue enqueues incoming transaction func (q *TxQueue) Enqueue(tx *common.QueuedTx) error { log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID)) - - if q.txEnqueueHandler == nil { //discard, until handler is provided - log.Info("there is no txEnqueueHandler") - return nil + if (tx.Hash != gethcommon.Hash{} || tx.Err != nil) { + return ErrQueuedTxAlreadyProcessed } log.Info("before enqueueTicker") @@ -176,8 +149,6 @@ func (q *TxQueue) Enqueue(tx *common.QueuedTx) error { // notify handler log.Info("calling txEnqueueHandler") - q.txEnqueueHandler(tx) - return nil } @@ -189,7 +160,21 @@ func (q *TxQueue) Get(id common.QueuedTxID) (*common.QueuedTx, error) { if tx, ok := q.transactions[id]; ok { return tx, nil } + return nil, ErrQueuedTxIDNotFound +} + +// LockInprogress returns transcation and locks it as inprogress +func (q *TxQueue) LockInprogress(id common.QueuedTxID) (*common.QueuedTx, error) { + q.mu.Lock() + defer q.mu.Unlock() + if tx, ok := q.transactions[id]; ok { + if _, inprogress := q.inprogress[id]; inprogress { + return tx, ErrQueuedTxInProgress + } + q.inprogress[id] = empty{} + return tx, nil + } return nil, ErrQueuedTxIDNotFound } @@ -197,42 +182,48 @@ func (q *TxQueue) Get(id common.QueuedTxID) (*common.QueuedTx, error) { func (q *TxQueue) Remove(id common.QueuedTxID) { q.mu.Lock() defer q.mu.Unlock() + q.remove(id) +} +func (q *TxQueue) remove(id common.QueuedTxID) { delete(q.transactions, id) + delete(q.inprogress, id) } -// StartProcessing marks a transaction as in progress. It's thread-safe and -// prevents from processing the same transaction multiple times. -func (q *TxQueue) StartProcessing(tx *common.QueuedTx) error { +// Done removes transaction from queue if no error or error is not transient +// and notify subscribers +func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) error { q.mu.Lock() defer q.mu.Unlock() - - if tx.Hash != (gethcommon.Hash{}) || tx.Err != nil { - return ErrQueuedTxAlreadyProcessed + if tx, ok := q.transactions[id]; !ok { + return ErrQueuedTxIDNotFound + } else { + q.done(tx, hash, err) } - - if tx.InProgress { - return ErrQueuedTxInProgress - } - - tx.InProgress = true - return nil } -// StopProcessing removes the "InProgress" flag from the transaction. -func (q *TxQueue) StopProcessing(tx *common.QueuedTx) { - q.mu.Lock() - defer q.mu.Unlock() - - tx.InProgress = false +func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) { + delete(q.inprogress, tx.ID) + tx.Err = err + // hash is updated only if err is nil, but transaction is not removed from a queue + if err == nil { + q.remove(tx.ID) + tx.Hash = hash + tx.Done <- struct{}{} + return + } + _, transient := transientErrs[err.Error()] + if !transient { + q.remove(tx.ID) + tx.Done <- struct{}{} + } } // Count returns number of currently queued transactions func (q *TxQueue) Count() int { q.mu.RLock() defer q.mu.RUnlock() - return len(q.transactions) } @@ -240,54 +231,6 @@ func (q *TxQueue) Count() int { func (q *TxQueue) Has(id common.QueuedTxID) bool { q.mu.RLock() defer q.mu.RUnlock() - _, ok := q.transactions[id] - return ok } - -// SetEnqueueHandler sets callback handler, that is triggered on enqueue operation -func (q *TxQueue) SetEnqueueHandler(fn common.EnqueuedTxHandler) { - q.txEnqueueHandler = fn -} - -// SetTxReturnHandler sets callback handler, that is triggered when transaction is finished executing -func (q *TxQueue) SetTxReturnHandler(fn common.EnqueuedTxReturnHandler) { - q.txReturnHandler = fn -} - -// NotifyOnQueuedTxReturn is invoked when transaction is ready to return -// Transaction can be in error state, or executed successfully at this point. -func (q *TxQueue) NotifyOnQueuedTxReturn(queuedTx *common.QueuedTx, err error) { - if q == nil { - return - } - - // discard, if transaction is not found - if queuedTx == nil { - return - } - - // on success, remove item from the queue and stop propagating - if err == nil { - q.Remove(queuedTx.ID) - return - } - - // error occurred, send upward notification - if q.txReturnHandler == nil { // discard, until handler is provided - return - } - - // remove from queue on any error (except for transient ones) and propagate - transientErrs := map[error]bool{ - keystore.ErrDecrypt: true, // wrong password - ErrInvalidCompleteTxSender: true, // completing tx create from another account - } - if !transientErrs[err] { // remove only on unrecoverable errors - q.Remove(queuedTx.ID) - } - - // notify handler - q.txReturnHandler(queuedTx, err) -} diff --git a/geth/transactions/queue/queue_test.go b/geth/transactions/queue/queue_test.go new file mode 100644 index 00000000000..1292f1483a9 --- /dev/null +++ b/geth/transactions/queue/queue_test.go @@ -0,0 +1,131 @@ +package queue + +import ( + "context" + "errors" + "testing" + + "github.com/ethereum/go-ethereum/accounts/keystore" + gethcommon "github.com/ethereum/go-ethereum/common" + + "github.com/status-im/status-go/geth/common" + "github.com/stretchr/testify/suite" +) + +func TestQueueTestSuite(t *testing.T) { + suite.Run(t, new(QueueTestSuite)) +} + +type QueueTestSuite struct { + suite.Suite + queue *TxQueue +} + +func (s *QueueTestSuite) SetupTest() { + s.queue = NewQueue() + s.queue.Start() +} + +func (s *QueueTestSuite) TearDownTest() { + s.queue.Stop() +} + +func (s *QueueTestSuite) TestLockInprogressTransaction() { + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) + s.NoError(s.queue.Enqueue(tx)) + enquedTx, err := s.queue.LockInprogress(tx.ID) + s.NoError(err) + s.Equal(tx, enquedTx) + + // verify that tx was marked as being inprogress + _, err = s.queue.LockInprogress(tx.ID) + s.Equal(ErrQueuedTxInProgress, err) +} + +func (s *QueueTestSuite) TestGetTransaction() { + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) + s.NoError(s.queue.Enqueue(tx)) + for i := 2; i > 0; i-- { + enquedTx, err := s.queue.Get(tx.ID) + s.NoError(err) + s.Equal(tx, enquedTx) + } +} + +func (s *QueueTestSuite) TestEnqueueProcessedTransaction() { + // enqueue will fail if transaction with hash will be enqueued + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) + tx.Hash = gethcommon.Hash{1} + s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx)) +} + +func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.QueuedTx { + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) + s.NoError(s.queue.Enqueue(tx)) + s.NoError(s.queue.Done(tx.ID, hash, err)) + return tx +} + +func (s *QueueTestSuite) TestDoneSuccess() { + hash := gethcommon.Hash{1} + tx := s.testDone(hash, nil) + s.NoError(tx.Err) + s.Equal(hash, tx.Hash) + s.False(s.queue.Has(tx.ID)) + // event is sent only if transaction was removed from a queue + select { + case <-tx.Done: + default: + s.Fail("No event was sent to Done channel") + } +} + +func (s *QueueTestSuite) TestDoneTransientError() { + hash := gethcommon.Hash{1} + err := keystore.ErrDecrypt + tx := s.testDone(hash, err) + s.Equal(keystore.ErrDecrypt, tx.Err) + s.NotEqual(hash, tx.Hash) + s.Equal(gethcommon.Hash{}, tx.Hash) + s.True(s.queue.Has(tx.ID)) +} + +func (s *QueueTestSuite) TestDoneError() { + hash := gethcommon.Hash{1} + err := errors.New("test") + tx := s.testDone(hash, err) + s.Equal(err, tx.Err) + s.NotEqual(hash, tx.Hash) + s.Equal(gethcommon.Hash{}, tx.Hash) + s.False(s.queue.Has(tx.ID)) + // event is sent only if transaction was removed from a queue + select { + case <-tx.Done: + default: + s.Fail("No event was sent to Done channel") + } +} + +func (s QueueTestSuite) TestMultipleDone() { + hash := gethcommon.Hash{1} + err := keystore.ErrDecrypt + tx := s.testDone(hash, err) + s.NoError(s.queue.Done(tx.ID, hash, nil)) + s.Equal(ErrQueuedTxIDNotFound, s.queue.Done(tx.ID, hash, errors.New("timeout"))) +} + +func (s *QueueTestSuite) TestEviction() { + var first *common.QueuedTx + for i := 0; i < DefaultTxQueueCap; i++ { + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) + if first == nil { + first = tx + } + s.NoError(s.queue.Enqueue(tx)) + } + s.Equal(DefaultTxQueueCap, s.queue.Count()) + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) + s.NoError(s.queue.Enqueue(tx)) + s.Equal(DefaultTxQueueCap, s.queue.Count()) + s.False(s.queue.Has(first.ID)) +} diff --git a/geth/txqueue/utils.go b/geth/transactions/queue/utils.go similarity index 97% rename from geth/txqueue/utils.go rename to geth/transactions/queue/utils.go index a1b858c4f3d..46659e33007 100644 --- a/geth/txqueue/utils.go +++ b/geth/transactions/queue/utils.go @@ -1,4 +1,4 @@ -package txqueue +package queue import ( "errors" diff --git a/geth/txqueue/txqueue_manager.go b/geth/transactions/txqueue_manager.go similarity index 52% rename from geth/txqueue/txqueue_manager.go rename to geth/transactions/txqueue_manager.go index 4e359fbec6a..1e25d21acf4 100644 --- a/geth/txqueue/txqueue_manager.go +++ b/geth/transactions/txqueue_manager.go @@ -1,4 +1,4 @@ -package txqueue +package transactions import ( "context" @@ -6,53 +6,31 @@ import ( "time" ethereum "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/accounts/keystore" gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/pborman/uuid" "github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/log" - "github.com/status-im/status-go/geth/signal" + "github.com/status-im/status-go/geth/transactions/queue" ) const ( - // EventTransactionQueued is triggered when send transaction request is queued - EventTransactionQueued = "transaction.queued" - - // EventTransactionFailed is triggered when send transaction request fails - EventTransactionFailed = "transaction.failed" - // SendTxDefaultErrorCode is sent by default, when error is not nil, but type is unknown/unexpected. SendTxDefaultErrorCode = SendTransactionDefaultErrorCode + // DefaultTxSendCompletionTimeout defines how many seconds to wait before returning result in sentTransaction(). + DefaultTxSendCompletionTimeout = 300 - defaultGas = 90000 - + defaultGas = 90000 defaultTimeout = time.Minute ) -// Send transaction response codes -const ( - SendTransactionNoErrorCode = "0" - SendTransactionDefaultErrorCode = "1" - SendTransactionPasswordErrorCode = "2" - SendTransactionTimeoutErrorCode = "3" - SendTransactionDiscardedErrorCode = "4" -) - -var txReturnCodes = map[error]string{ // deliberately strings, in case more meaningful codes are to be returned - nil: SendTransactionNoErrorCode, - keystore.ErrDecrypt: SendTransactionPasswordErrorCode, - ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode, - ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode, -} - // Manager provides means to manage internal Status Backend (injected into LES) type Manager struct { nodeManager common.NodeManager accountManager common.AccountManager - txQueue *TxQueue + txQueue *queue.TxQueue ethTxClient EthTransactor addrLock *AddrLocker + notify bool } // NewManager returns a new Manager. @@ -60,11 +38,18 @@ func NewManager(nodeManager common.NodeManager, accountManager common.AccountMan return &Manager{ nodeManager: nodeManager, accountManager: accountManager, - txQueue: NewTransactionQueue(), + txQueue: queue.NewQueue(), addrLock: &AddrLocker{}, + notify: true, } } +// DisableNotifications turns off notifications on enqueue and return of tx. +// it is not thread safe and must be called only before manager is started. +func (m *Manager) DisableNotificactions() { + m.notify = false +} + // Start starts accepting new transactions into the queue. func (m *Manager) Start() { log.Info("start Manager") @@ -83,18 +68,6 @@ func (m *Manager) TransactionQueue() common.TxQueue { return m.txQueue } -// CreateTransaction returns a transaction object. -func (m *Manager) CreateTransaction(ctx context.Context, args common.SendTxArgs) *common.QueuedTx { - return &common.QueuedTx{ - ID: common.QueuedTxID(uuid.New()), - Hash: gethcommon.Hash{}, - Context: ctx, - Args: args, - Done: make(chan struct{}, 1), - Discard: make(chan struct{}, 1), - } -} - // QueueTransaction puts a transaction into the queue. func (m *Manager) QueueTransaction(tx *common.QueuedTx) error { to := "" @@ -102,108 +75,91 @@ func (m *Manager) QueueTransaction(tx *common.QueuedTx) error { to = tx.Args.To.Hex() } log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to) + err := m.txQueue.Enqueue(tx) + if m.notify { + NotifyOnEnqueue(tx) + } + return err +} - return m.txQueue.Enqueue(tx) +func (m *Manager) txDone(tx *common.QueuedTx, hash gethcommon.Hash, err error) { + m.txQueue.Done(tx.ID, hash, err) //nolint: errcheck + if m.notify { + NotifyOnReturn(tx) + } } // WaitForTransaction adds a transaction to the queue and blocks // until it's completed, discarded or times out. func (m *Manager) WaitForTransaction(tx *common.QueuedTx) error { log.Info("wait for transaction", "id", tx.ID) - // now wait up until transaction is: // - completed (via CompleteQueuedTransaction), // - discarded (via DiscardQueuedTransaction) // - or times out select { case <-tx.Done: - m.NotifyOnQueuedTxReturn(tx, tx.Err) - return tx.Err - case <-tx.Discard: - m.NotifyOnQueuedTxReturn(tx, ErrQueuedTxDiscarded) - return ErrQueuedTxDiscarded case <-time.After(DefaultTxSendCompletionTimeout * time.Second): - m.NotifyOnQueuedTxReturn(tx, ErrQueuedTxTimedOut) - return ErrQueuedTxTimedOut + m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut) } -} - -// NotifyOnQueuedTxReturn calls a handler when a transaction resolves. -func (m *Manager) NotifyOnQueuedTxReturn(queuedTx *common.QueuedTx, err error) { - m.txQueue.NotifyOnQueuedTxReturn(queuedTx, err) + return tx.Err } // CompleteTransaction instructs backend to complete sending of a given transaction. // TODO(adam): investigate a possible bug that calling this method multiple times with the same Transaction ID // results in sending multiple transactions. -func (m *Manager) CompleteTransaction(id common.QueuedTxID, password string) (gethcommon.Hash, error) { +func (m *Manager) CompleteTransaction(id common.QueuedTxID, password string) (hash gethcommon.Hash, err error) { log.Info("complete transaction", "id", id) - - queuedTx, err := m.txQueue.Get(id) + tx, err := m.txQueue.LockInprogress(id) if err != nil { - log.Warn("could not get a queued transaction", "err", err) - return gethcommon.Hash{}, err + log.Warn("error getting a queued transaction", "err", err) + return hash, err } - - err = m.txQueue.StartProcessing(queuedTx) + account, err := m.validateAccount(tx) if err != nil { - return gethcommon.Hash{}, err + m.txDone(tx, hash, err) + return hash, err } - defer m.txQueue.StopProcessing(queuedTx) + // Send the transaction finally. + hash, err = m.completeTransaction(tx, account, password) + log.Info("finally completed transaction", "id", tx.ID, "hash", hash, "err", err) + m.txDone(tx, hash, err) + return hash, err +} +func (m *Manager) validateAccount(tx *common.QueuedTx) (*common.SelectedExtKey, error) { selectedAccount, err := m.accountManager.SelectedAccount() if err != nil { log.Warn("failed to get a selected account", "err", err) - return gethcommon.Hash{}, err + return nil, err } - // make sure that only account which created the tx can complete it - if queuedTx.Args.From.Hex() != selectedAccount.Address.Hex() { - log.Warn("queued transaction does not belong to the selected account", "err", ErrInvalidCompleteTxSender) - m.NotifyOnQueuedTxReturn(queuedTx, ErrInvalidCompleteTxSender) - return gethcommon.Hash{}, ErrInvalidCompleteTxSender + if tx.Args.From.Hex() != selectedAccount.Address.Hex() { + log.Warn("queued transaction does not belong to the selected account", "err", queue.ErrInvalidCompleteTxSender) + return nil, queue.ErrInvalidCompleteTxSender } - // Send the transaction finally. - hash, err := m.completeTransaction(queuedTx, selectedAccount, password) - - // when incorrect sender tries to complete the account, - // notify and keep tx in queue (so that correct sender can complete) - if err == keystore.ErrDecrypt { - log.Warn("failed to complete transaction", "err", err) - m.NotifyOnQueuedTxReturn(queuedTx, err) - return hash, err - } - - log.Info("finally completed transaction", "id", queuedTx.ID, "hash", hash, "err", err) - - queuedTx.Hash = hash - queuedTx.Err = err - queuedTx.Done <- struct{}{} - - return hash, err + return selectedAccount, nil } -func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount *common.SelectedExtKey, password string) (gethcommon.Hash, error) { +func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount *common.SelectedExtKey, password string) (hash gethcommon.Hash, err error) { log.Info("complete transaction", "id", queuedTx.ID) - var emptyHash gethcommon.Hash + log.Info("verifying account password for transaction", "id", queuedTx.ID) config, err := m.nodeManager.NodeConfig() if err != nil { - return emptyHash, err + return hash, err } _, err = m.accountManager.VerifyAccountPassword(config.KeyStoreDir, selectedAccount.Address.String(), password) if err != nil { log.Warn("failed to verify account", "account", selectedAccount.Address.String(), "error", err.Error()) - return emptyHash, err + return hash, err } - - // update transaction with nonce, gas price and gas estimates - ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) - defer cancel() m.addrLock.LockAddr(queuedTx.Args.From) defer m.addrLock.UnlockAddr(queuedTx.Args.From) + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() nonce, err := m.ethTxClient.PendingNonceAt(ctx, queuedTx.Args.From) if err != nil { - return emptyHash, err + return hash, err } args := queuedTx.Args gasPrice := (*big.Int)(args.GasPrice) @@ -212,7 +168,7 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount defer cancel() gasPrice, err = m.ethTxClient.SuggestGasPrice(ctx) if err != nil { - return emptyHash, err + return hash, err } } @@ -223,6 +179,7 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount if args.To != nil { toAddr = *args.To } + gas := (*big.Int)(args.Gas) if args.Gas == nil { ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout) @@ -235,7 +192,7 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount Data: data, }) if err != nil { - return emptyHash, err + return hash, err } if gas.Cmp(big.NewInt(defaultGas)) == -1 { log.Info("default gas will be used. estimated gas", gas, "is lower than", defaultGas) @@ -254,12 +211,12 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount tx := types.NewTransaction(nonce, toAddr, value, gas, gasPrice, data) signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), selectedAccount.AccountKey.PrivateKey) if err != nil { - return emptyHash, err + return hash, err } ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout) defer cancel() if err := m.ethTxClient.SendTransaction(ctx, signedTx); err != nil { - return emptyHash, err + return hash, err } return signedTx.Hash(), nil } @@ -267,7 +224,6 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount // CompleteTransactions instructs backend to complete sending of multiple transactions func (m *Manager) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult { results := make(map[common.QueuedTxID]common.RawCompleteTransactionResult) - for _, txID := range ids { txHash, txErr := m.CompleteTransaction(txID, password) results[txID] = common.RawCompleteTransactionResult{ @@ -275,25 +231,20 @@ func (m *Manager) CompleteTransactions(ids []common.QueuedTxID, password string) Error: txErr, } } - return results } // DiscardTransaction discards a given transaction from transaction queue func (m *Manager) DiscardTransaction(id common.QueuedTxID) error { - queuedTx, err := m.txQueue.Get(id) + tx, err := m.txQueue.Get(id) if err != nil { return err } - - // remove from queue, before notifying SendTransaction - m.txQueue.Remove(queuedTx.ID) - - // allow SendTransaction to return - queuedTx.Err = ErrQueuedTxDiscarded - queuedTx.Discard <- struct{}{} // sendTransaction() waits on this, notify so that it can return - - return nil + err = m.txQueue.Done(id, gethcommon.Hash{}, queue.ErrQueuedTxDiscarded) + if m.notify { + NotifyOnReturn(tx) + } + return err } // DiscardTransactions discards given multiple transactions from transaction queue @@ -312,84 +263,6 @@ func (m *Manager) DiscardTransactions(ids []common.QueuedTxID) map[common.Queued return results } -// SendTransactionEvent is a signal sent on a send transaction request -type SendTransactionEvent struct { - ID string `json:"id"` - Args common.SendTxArgs `json:"args"` - MessageID string `json:"message_id"` -} - -// TransactionQueueHandler returns handler that processes incoming tx queue requests -func (m *Manager) TransactionQueueHandler() func(queuedTx *common.QueuedTx) { - return func(queuedTx *common.QueuedTx) { - log.Info("calling TransactionQueueHandler") - signal.Send(signal.Envelope{ - Type: EventTransactionQueued, - Event: SendTransactionEvent{ - ID: string(queuedTx.ID), - Args: queuedTx.Args, - MessageID: common.MessageIDFromContext(queuedTx.Context), - }, - }) - } -} - -// SetTransactionQueueHandler sets a handler that will be called -// when a new transaction is enqueued. -func (m *Manager) SetTransactionQueueHandler(fn common.EnqueuedTxHandler) { - m.txQueue.SetEnqueueHandler(fn) -} - -// ReturnSendTransactionEvent is a JSON returned whenever transaction send is returned -type ReturnSendTransactionEvent struct { - ID string `json:"id"` - Args common.SendTxArgs `json:"args"` - MessageID string `json:"message_id"` - ErrorMessage string `json:"error_message"` - ErrorCode string `json:"error_code"` -} - -// TransactionReturnHandler returns handler that processes responses from internal tx manager -func (m *Manager) TransactionReturnHandler() func(queuedTx *common.QueuedTx, err error) { - return func(queuedTx *common.QueuedTx, err error) { - if err == nil { - return - } - - // discard notifications with empty tx - if queuedTx == nil { - return - } - - // error occurred, signal up to application - signal.Send(signal.Envelope{ - Type: EventTransactionFailed, - Event: ReturnSendTransactionEvent{ - ID: string(queuedTx.ID), - Args: queuedTx.Args, - MessageID: common.MessageIDFromContext(queuedTx.Context), - ErrorMessage: err.Error(), - ErrorCode: m.sendTransactionErrorCode(err), - }, - }) - } -} - -func (m *Manager) sendTransactionErrorCode(err error) string { - if code, ok := txReturnCodes[err]; ok { - return code - } - - return SendTxDefaultErrorCode -} - -// SetTransactionReturnHandler sets a handler that will be called -// when a transaction is about to return or when a recoverable error occurred. -// Recoverable error is, for instance, wrong password. -func (m *Manager) SetTransactionReturnHandler(fn common.EnqueuedTxReturnHandler) { - m.txQueue.SetTxReturnHandler(fn) -} - // SendTransactionRPCHandler is a handler for eth_sendTransaction method. // It accepts one param which is a slice with a map of transaction params. func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) { @@ -398,8 +271,7 @@ func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interfa // TODO(adam): it's a hack to parse arguments as common.RPCCall can do that. // We should refactor parsing these params to a separate struct. rpcCall := common.RPCCall{Params: args} - - tx := m.CreateTransaction(ctx, rpcCall.ToSendTxArgs()) + tx := common.CreateTransaction(ctx, rpcCall.ToSendTxArgs()) if err := m.QueueTransaction(tx); err != nil { return nil, err diff --git a/geth/txqueue/txqueue_manager_test.go b/geth/transactions/txqueue_manager_test.go similarity index 75% rename from geth/txqueue/txqueue_manager_test.go rename to geth/transactions/txqueue_manager_test.go index c4294a76fa0..3fa4af31430 100644 --- a/geth/txqueue/txqueue_manager_test.go +++ b/geth/transactions/txqueue_manager_test.go @@ -1,4 +1,4 @@ -package txqueue +package transactions import ( "context" @@ -18,7 +18,8 @@ import ( "github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/rpc" - "github.com/status-im/status-go/geth/txqueue/fake" + "github.com/status-im/status-go/geth/transactions/fake" + "github.com/status-im/status-go/geth/transactions/queue" . "github.com/status-im/status-go/testing" ) @@ -93,21 +94,10 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() { txQueueManager.Start() defer txQueueManager.Stop() - tx := txQueueManager.CreateTransaction(context.Background(), common.SendTxArgs{ + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ From: common.FromAddress(TestConfig.Account1.Address), To: common.ToAddress(TestConfig.Account2.Address), }) - - // TransactionQueueHandler is required to enqueue a transaction. - txQueueManager.SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) { - s.Equal(tx.ID, queuedTx.ID) - }) - - txQueueManager.SetTransactionReturnHandler(func(queuedTx *common.QueuedTx, err error) { - s.Equal(tx.ID, queuedTx.ID) - s.NoError(err) - }) - err := txQueueManager.QueueTransaction(tx) s.NoError(err) @@ -142,25 +132,15 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { s.setupTransactionPoolAPI(account, nonce, gas, nil) txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) - + txQueueManager.DisableNotificactions() txQueueManager.Start() defer txQueueManager.Stop() - tx := txQueueManager.CreateTransaction(context.Background(), common.SendTxArgs{ + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ From: common.FromAddress(TestConfig.Account1.Address), To: common.ToAddress(TestConfig.Account2.Address), }) - // TransactionQueueHandler is required to enqueue a transaction. - txQueueManager.SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) { - s.Equal(tx.ID, queuedTx.ID) - }) - - txQueueManager.SetTransactionReturnHandler(func(queuedTx *common.QueuedTx, err error) { - s.Equal(tx.ID, queuedTx.ID) - s.NoError(err) - }) - err := txQueueManager.QueueTransaction(tx) s.NoError(err) @@ -179,7 +159,7 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { mu.Lock() if err == nil { completedTx++ - } else if err == ErrQueuedTxInProgress { + } else if err == queue.ErrQueuedTxInProgress { inprogressTx++ } else { s.Fail("tx failed with unexpected error: ", err.Error()) @@ -207,33 +187,21 @@ func (s *TxQueueTestSuite) TestAccountMismatch() { }, nil) txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) + txQueueManager.DisableNotificactions() txQueueManager.Start() defer txQueueManager.Stop() - tx := txQueueManager.CreateTransaction(context.Background(), common.SendTxArgs{ + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ From: common.FromAddress(TestConfig.Account1.Address), To: common.ToAddress(TestConfig.Account2.Address), }) - // TransactionQueueHandler is required to enqueue a transaction. - txQueueManager.SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) { - s.Equal(tx.ID, queuedTx.ID) - }) - - // Missmatched address is a recoverable error, that's why - // the return handler is called. - txQueueManager.SetTransactionReturnHandler(func(queuedTx *common.QueuedTx, err error) { - s.Equal(tx.ID, queuedTx.ID) - s.Equal(ErrInvalidCompleteTxSender, err) - s.Nil(tx.Err) - }) - err := txQueueManager.QueueTransaction(tx) s.NoError(err) _, err = txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password) - s.Equal(err, ErrInvalidCompleteTxSender) + s.Equal(err, queue.ErrInvalidCompleteTxSender) // Transaction should stay in the queue as mismatched accounts // is a recoverable error. @@ -250,28 +218,15 @@ func (s *TxQueueTestSuite) TestInvalidPassword() { s.setupStatusBackend(account, password, keystore.ErrDecrypt) txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) - + txQueueManager.DisableNotificactions() txQueueManager.Start() defer txQueueManager.Stop() - tx := txQueueManager.CreateTransaction(context.Background(), common.SendTxArgs{ + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ From: common.FromAddress(TestConfig.Account1.Address), To: common.ToAddress(TestConfig.Account2.Address), }) - // TransactionQueueHandler is required to enqueue a transaction. - txQueueManager.SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) { - s.Equal(tx.ID, queuedTx.ID) - }) - - // Missmatched address is a revocable error, that's why - // the return handler is called. - txQueueManager.SetTransactionReturnHandler(func(queuedTx *common.QueuedTx, err error) { - s.Equal(tx.ID, queuedTx.ID) - s.Equal(keystore.ErrDecrypt, err) - s.Nil(tx.Err) - }) - err := txQueueManager.QueueTransaction(tx) s.NoError(err) @@ -285,39 +240,29 @@ func (s *TxQueueTestSuite) TestInvalidPassword() { func (s *TxQueueTestSuite) TestDiscardTransaction() { txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) + txQueueManager.DisableNotificactions() txQueueManager.Start() defer txQueueManager.Stop() - tx := txQueueManager.CreateTransaction(context.Background(), common.SendTxArgs{ + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ From: common.FromAddress(TestConfig.Account1.Address), To: common.ToAddress(TestConfig.Account2.Address), }) - // TransactionQueueHandler is required to enqueue a transaction. - txQueueManager.SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) { - s.Equal(tx.ID, queuedTx.ID) - }) - - txQueueManager.SetTransactionReturnHandler(func(queuedTx *common.QueuedTx, err error) { - s.Equal(tx.ID, queuedTx.ID) - s.Equal(ErrQueuedTxDiscarded, err) - }) - err := txQueueManager.QueueTransaction(tx) s.NoError(err) w := make(chan struct{}) go func() { - err := txQueueManager.DiscardTransaction(tx.ID) - s.NoError(err) + s.NoError(txQueueManager.DiscardTransaction(tx.ID)) close(w) }() err = txQueueManager.WaitForTransaction(tx) - s.Equal(ErrQueuedTxDiscarded, err) + s.Equal(queue.ErrQueuedTxDiscarded, err) // Check that error is assigned to the transaction. - s.Equal(ErrQueuedTxDiscarded, tx.Err) + s.Equal(queue.ErrQueuedTxDiscarded, tx.Err) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) s.NoError(WaitClosed(w, time.Second)) diff --git a/lib/utils.go b/lib/utils.go index fde588c17ba..c8dd7a56a22 100644 --- a/lib/utils.go +++ b/lib/utils.go @@ -34,7 +34,8 @@ import ( "github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/signal" - "github.com/status-im/status-go/geth/txqueue" + "github.com/status-im/status-go/geth/transactions" + "github.com/status-im/status-go/geth/transactions/queue" "github.com/status-im/status-go/static" . "github.com/status-im/status-go/testing" //nolint: golint ) @@ -793,7 +794,7 @@ func testCompleteTransaction(t *testing.T) bool { t.Errorf("cannot unmarshal event's JSON: %s. Error %q", jsonEvent, err) return } - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) t.Logf("transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string)) @@ -871,7 +872,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent) return } - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID = event["id"].(string) t.Logf("transaction queued (will be completed in a single call, once aggregated): {id: %s}\n", txID) @@ -918,7 +919,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc } results := resultsStruct.Results - if len(results) != (testTxCount+1) || results["invalid-tx-id"].Error != txqueue.ErrQueuedTxIDNotFound.Error() { + if len(results) != (testTxCount+1) || results["invalid-tx-id"].Error != queue.ErrQueuedTxIDNotFound.Error() { t.Errorf("cannot complete txs: %v", results) return } @@ -1004,7 +1005,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent) return } - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID = event["id"].(string) t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txID) @@ -1029,7 +1030,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo // try completing discarded transaction _, err := statusAPI.CompleteTransaction(common.QueuedTxID(txID), TestConfig.Account1.Password) - if err != txqueue.ErrQueuedTxIDNotFound { + if err != queue.ErrQueuedTxIDNotFound { t.Error("expects tx not found, but call to CompleteTransaction succeeded") return } @@ -1043,19 +1044,19 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo completeQueuedTransaction <- struct{}{} // so that timeout is aborted } - if envelope.Type == txqueue.EventTransactionFailed { + if envelope.Type == transactions.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) t.Logf("transaction return event received: {id: %s}\n", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error() + expectedErrMessage := queue.ErrQueuedTxDiscarded.Error() if receivedErrMessage != expectedErrMessage { t.Errorf("unexpected error message received: got %v", receivedErrMessage) return } receivedErrCode := event["error_code"].(string) - if receivedErrCode != txqueue.SendTransactionDiscardedErrorCode { + if receivedErrCode != transactions.SendTransactionDiscardedErrorCode { t.Errorf("unexpected error code received: got %v", receivedErrCode) return } @@ -1070,7 +1071,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo To: common.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - if err != txqueue.ErrQueuedTxDiscarded { + if err != queue.ErrQueuedTxDiscarded { t.Errorf("expected error not thrown: %v", err) return false } @@ -1117,7 +1118,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent) return } - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID = event["id"].(string) t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txID) @@ -1130,19 +1131,19 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl txIDs <- txID } - if envelope.Type == txqueue.EventTransactionFailed { + if envelope.Type == transactions.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) t.Logf("transaction return event received: {id: %s}\n", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error() + expectedErrMessage := queue.ErrQueuedTxDiscarded.Error() if receivedErrMessage != expectedErrMessage { t.Errorf("unexpected error message received: got %v", receivedErrMessage) return } receivedErrCode := event["error_code"].(string) - if receivedErrCode != txqueue.SendTransactionDiscardedErrorCode { + if receivedErrCode != transactions.SendTransactionDiscardedErrorCode { t.Errorf("unexpected error code received: got %v", receivedErrCode) return } @@ -1161,7 +1162,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl To: common.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - if err != txqueue.ErrQueuedTxDiscarded { + if err != queue.ErrQueuedTxDiscarded { t.Errorf("expected error not thrown: %v", err) return } @@ -1192,7 +1193,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl } discardResults := discardResultsStruct.Results - if len(discardResults) != 1 || discardResults["invalid-tx-id"].Error != txqueue.ErrQueuedTxIDNotFound.Error() { + if len(discardResults) != 1 || discardResults["invalid-tx-id"].Error != queue.ErrQueuedTxIDNotFound.Error() { t.Errorf("cannot discard txs: %v", discardResults) return } @@ -1214,7 +1215,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl t.Errorf("tx id not set in result: expected id is %s", txID) return } - if txResult.Error != txqueue.ErrQueuedTxIDNotFound.Error() { + if txResult.Error != queue.ErrQueuedTxIDNotFound.Error() { t.Errorf("invalid error for %s", txResult.Hash) return } @@ -1431,7 +1432,7 @@ func startTestNode(t *testing.T) <-chan struct{} { return } - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { } if envelope.Type == signal.EventNodeStarted { t.Log("Node started, but we wait till it be ready") From 9db329ad07757032d5621b789bc2ce8bff913463 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Tue, 23 Jan 2018 13:05:23 +0200 Subject: [PATCH 2/4] Address review comments --- e2e/transactions/transactions_test.go | 3 --- geth/common/utils.go | 2 +- geth/transactions/notifications.go | 32 +++++++++++++-------------- geth/transactions/queue/queue.go | 11 +++++---- geth/transactions/queue/queue_test.go | 7 ++++-- geth/transactions/txqueue_manager.go | 2 +- lib/utils.go | 4 ++-- 7 files changed, 30 insertions(+), 31 deletions(-) diff --git a/e2e/transactions/transactions_test.go b/e2e/transactions/transactions_test.go index 740d8b9eef4..ea56a025c57 100644 --- a/e2e/transactions/transactions_test.go +++ b/e2e/transactions/transactions_test.go @@ -790,9 +790,6 @@ func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() { go s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{}) // nolint: errcheck } time.Sleep(2 * time.Second) - - log.Info(fmt.Sprintf("Number of transactions sent: %d. Queue size (shouldn't be more than %d): %d", - txCount, queue.DefaultTxQueueCap, txQueue.Count())) s.Equal(10, txQueue.Count(), "transaction count should be 10") for i := 0; i < queue.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines diff --git a/geth/common/utils.go b/geth/common/utils.go index 01702d44644..c4ad1d152b9 100644 --- a/geth/common/utils.go +++ b/geth/common/utils.go @@ -160,6 +160,6 @@ func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx { Hash: common.Hash{}, Context: ctx, Args: args, - Done: make(chan struct{}, 1), + Done: make(chan struct{}), } } diff --git a/geth/transactions/notifications.go b/geth/transactions/notifications.go index d6c58a126c4..dc1ad93da56 100644 --- a/geth/transactions/notifications.go +++ b/geth/transactions/notifications.go @@ -1,9 +1,10 @@ package transactions import ( + "strconv" + "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/status-im/status-go/geth/common" - "github.com/status-im/status-go/geth/log" "github.com/status-im/status-go/geth/signal" "github.com/status-im/status-go/geth/transactions/queue" ) @@ -13,15 +14,17 @@ const ( EventTransactionQueued = "transaction.queued" // EventTransactionFailed is triggered when send transaction request fails EventTransactionFailed = "transaction.failed" +) - SendTransactionNoErrorCode = "0" - SendTransactionDefaultErrorCode = "1" - SendTransactionPasswordErrorCode = "2" - SendTransactionTimeoutErrorCode = "3" - SendTransactionDiscardedErrorCode = "4" +const ( + SendTransactionNoErrorCode = iota + SendTransactionDefaultErrorCode + SendTransactionPasswordErrorCode + SendTransactionTimeoutErrorCode + SendTransactionDiscardedErrorCode ) -var txReturnCodes = map[error]string{ // deliberately strings, in case more meaningful codes are to be returned +var txReturnCodes = map[error]int{ nil: SendTransactionNoErrorCode, keystore.ErrDecrypt: SendTransactionPasswordErrorCode, queue.ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode, @@ -37,7 +40,6 @@ type SendTransactionEvent struct { // NotifyOnEnqueue returns handler that processes incoming tx queue requests func NotifyOnEnqueue(queuedTx *common.QueuedTx) { - log.Info("calling TransactionQueueHandler") signal.Send(signal.Envelope{ Type: EventTransactionQueued, Event: SendTransactionEvent{ @@ -59,16 +61,14 @@ type ReturnSendTransactionEvent struct { // NotifyOnReturn returns handler that processes responses from internal tx manager func NotifyOnReturn(queuedTx *common.QueuedTx) { - if queuedTx.Err == nil { - return - } - // discard notifications with empty tx if queuedTx == nil { return } - - // error occurred, signal up to application + // we don't want to notify a user if tx sent successfully + if queuedTx.Err == nil { + return + } signal.Send(signal.Envelope{ Type: EventTransactionFailed, Event: ReturnSendTransactionEvent{ @@ -76,12 +76,12 @@ func NotifyOnReturn(queuedTx *common.QueuedTx) { Args: queuedTx.Args, MessageID: common.MessageIDFromContext(queuedTx.Context), ErrorMessage: queuedTx.Err.Error(), - ErrorCode: sendTransactionErrorCode(queuedTx.Err), + ErrorCode: strconv.Itoa(sendTransactionErrorCode(queuedTx.Err)), }, }) } -func sendTransactionErrorCode(err error) string { +func sendTransactionErrorCode(err error) int { if code, ok := txReturnCodes[err]; ok { return code } diff --git a/geth/transactions/queue/queue.go b/geth/transactions/queue/queue.go index 45223e04e10..38f6810648a 100644 --- a/geth/transactions/queue/queue.go +++ b/geth/transactions/queue/queue.go @@ -59,7 +59,7 @@ type TxQueue struct { } // NewTransactionQueue make new transaction queue -func NewQueue() *TxQueue { +func New() *TxQueue { log.Info("initializing transaction queue") return &TxQueue{ transactions: make(map[common.QueuedTxID]*common.QueuedTx), @@ -163,7 +163,7 @@ func (q *TxQueue) Get(id common.QueuedTxID) (*common.QueuedTx, error) { return nil, ErrQueuedTxIDNotFound } -// LockInprogress returns transcation and locks it as inprogress +// LockInprogress returns transation and locks it as inprogress func (q *TxQueue) LockInprogress(id common.QueuedTxID) (*common.QueuedTx, error) { q.mu.Lock() defer q.mu.Unlock() @@ -210,13 +210,12 @@ func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) { if err == nil { q.remove(tx.ID) tx.Hash = hash - tx.Done <- struct{}{} + close(tx.Done) return } - _, transient := transientErrs[err.Error()] - if !transient { + if _, transient := transientErrs[err.Error()]; !transient { q.remove(tx.ID) - tx.Done <- struct{}{} + close(tx.Done) } } diff --git a/geth/transactions/queue/queue_test.go b/geth/transactions/queue/queue_test.go index 1292f1483a9..60abd89d1c2 100644 --- a/geth/transactions/queue/queue_test.go +++ b/geth/transactions/queue/queue_test.go @@ -22,7 +22,7 @@ type QueueTestSuite struct { } func (s *QueueTestSuite) SetupTest() { - s.queue = NewQueue() + s.queue = New() s.queue.Start() } @@ -57,6 +57,10 @@ func (s *QueueTestSuite) TestEnqueueProcessedTransaction() { tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) tx.Hash = gethcommon.Hash{1} s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx)) + + tx = common.CreateTransaction(context.Background(), common.SendTxArgs{}) + tx.Err = errors.New("error") + s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx)) } func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.QueuedTx { @@ -85,7 +89,6 @@ func (s *QueueTestSuite) TestDoneTransientError() { err := keystore.ErrDecrypt tx := s.testDone(hash, err) s.Equal(keystore.ErrDecrypt, tx.Err) - s.NotEqual(hash, tx.Hash) s.Equal(gethcommon.Hash{}, tx.Hash) s.True(s.queue.Has(tx.ID)) } diff --git a/geth/transactions/txqueue_manager.go b/geth/transactions/txqueue_manager.go index 1e25d21acf4..c1a6b6b69e7 100644 --- a/geth/transactions/txqueue_manager.go +++ b/geth/transactions/txqueue_manager.go @@ -38,7 +38,7 @@ func NewManager(nodeManager common.NodeManager, accountManager common.AccountMan return &Manager{ nodeManager: nodeManager, accountManager: accountManager, - txQueue: queue.NewQueue(), + txQueue: queue.New(), addrLock: &AddrLocker{}, notify: true, } diff --git a/lib/utils.go b/lib/utils.go index c8dd7a56a22..216b5b0d584 100644 --- a/lib/utils.go +++ b/lib/utils.go @@ -1056,7 +1056,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo } receivedErrCode := event["error_code"].(string) - if receivedErrCode != transactions.SendTransactionDiscardedErrorCode { + if receivedErrCode != strconv.Itoa(transactions.SendTransactionDiscardedErrorCode) { t.Errorf("unexpected error code received: got %v", receivedErrCode) return } @@ -1143,7 +1143,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl } receivedErrCode := event["error_code"].(string) - if receivedErrCode != transactions.SendTransactionDiscardedErrorCode { + if receivedErrCode != strconv.Itoa(transactions.SendTransactionDiscardedErrorCode) { t.Errorf("unexpected error code received: got %v", receivedErrCode) return } From e7171b552e0ee51c16f99e0625420f9854e43ec0 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Wed, 24 Jan 2018 13:12:46 +0200 Subject: [PATCH 3/4] Lint warnings --- geth/transactions/notifications.go | 5 +++++ geth/transactions/queue/queue.go | 10 +++++----- geth/transactions/txqueue_manager.go | 4 ++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/geth/transactions/notifications.go b/geth/transactions/notifications.go index dc1ad93da56..18dc08b1fa3 100644 --- a/geth/transactions/notifications.go +++ b/geth/transactions/notifications.go @@ -17,10 +17,15 @@ const ( ) const ( + // SendTransactionNoErrorCode is sent when no error occured. SendTransactionNoErrorCode = iota + // SendTransactionDefaultErrorCode is every case when there is no special tx return code. SendTransactionDefaultErrorCode + // SendTransactionPasswordErrorCode is sent when account failed verification. SendTransactionPasswordErrorCode + // SendTransactionTimeoutErrorCode is sent when tx is timed out. SendTransactionTimeoutErrorCode + // SendTransactionDiscardedErrorCode is sent when tx was discarded. SendTransactionDiscardedErrorCode ) diff --git a/geth/transactions/queue/queue.go b/geth/transactions/queue/queue.go index 38f6810648a..b1543fd76b5 100644 --- a/geth/transactions/queue/queue.go +++ b/geth/transactions/queue/queue.go @@ -58,7 +58,7 @@ type TxQueue struct { stoppedGroup sync.WaitGroup // to make sure that all routines are stopped } -// NewTransactionQueue make new transaction queue +// New creates a transaction queue. func New() *TxQueue { log.Info("initializing transaction queue") return &TxQueue{ @@ -163,7 +163,7 @@ func (q *TxQueue) Get(id common.QueuedTxID) (*common.QueuedTx, error) { return nil, ErrQueuedTxIDNotFound } -// LockInprogress returns transation and locks it as inprogress +// LockInprogress returns transaction and locks it as inprogress func (q *TxQueue) LockInprogress(id common.QueuedTxID) (*common.QueuedTx, error) { q.mu.Lock() defer q.mu.Unlock() @@ -195,11 +195,11 @@ func (q *TxQueue) remove(id common.QueuedTxID) { func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) error { q.mu.Lock() defer q.mu.Unlock() - if tx, ok := q.transactions[id]; !ok { + tx, ok := q.transactions[id] + if !ok { return ErrQueuedTxIDNotFound - } else { - q.done(tx, hash, err) } + q.done(tx, hash, err) return nil } diff --git a/geth/transactions/txqueue_manager.go b/geth/transactions/txqueue_manager.go index c1a6b6b69e7..a1a9b1ebaeb 100644 --- a/geth/transactions/txqueue_manager.go +++ b/geth/transactions/txqueue_manager.go @@ -44,8 +44,8 @@ func NewManager(nodeManager common.NodeManager, accountManager common.AccountMan } } -// DisableNotifications turns off notifications on enqueue and return of tx. -// it is not thread safe and must be called only before manager is started. +// DisableNotificactions turns off notifications on enqueue and return of tx. +// It is not thread safe and must be called only before manager is started. func (m *Manager) DisableNotificactions() { m.notify = false } From 052999cc9edf9db8eb2b3aae8771451e65a31393 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Wed, 24 Jan 2018 13:33:30 +0200 Subject: [PATCH 4/4] Rework lock inprogress method --- geth/transactions/notifications.go | 2 +- geth/transactions/queue/queue.go | 13 ++++++------- geth/transactions/queue/queue_test.go | 6 +++--- geth/transactions/txqueue_manager.go | 8 +++++--- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/geth/transactions/notifications.go b/geth/transactions/notifications.go index 18dc08b1fa3..d307951ffeb 100644 --- a/geth/transactions/notifications.go +++ b/geth/transactions/notifications.go @@ -17,7 +17,7 @@ const ( ) const ( - // SendTransactionNoErrorCode is sent when no error occured. + // SendTransactionNoErrorCode is sent when no error occurred. SendTransactionNoErrorCode = iota // SendTransactionDefaultErrorCode is every case when there is no special tx return code. SendTransactionDefaultErrorCode diff --git a/geth/transactions/queue/queue.go b/geth/transactions/queue/queue.go index b1543fd76b5..eea69544263 100644 --- a/geth/transactions/queue/queue.go +++ b/geth/transactions/queue/queue.go @@ -163,19 +163,18 @@ func (q *TxQueue) Get(id common.QueuedTxID) (*common.QueuedTx, error) { return nil, ErrQueuedTxIDNotFound } -// LockInprogress returns transaction and locks it as inprogress -func (q *TxQueue) LockInprogress(id common.QueuedTxID) (*common.QueuedTx, error) { +// LockInprogress returns error if transaction is already inprogress. +func (q *TxQueue) LockInprogress(id common.QueuedTxID) error { q.mu.Lock() defer q.mu.Unlock() - - if tx, ok := q.transactions[id]; ok { + if _, ok := q.transactions[id]; ok { if _, inprogress := q.inprogress[id]; inprogress { - return tx, ErrQueuedTxInProgress + return ErrQueuedTxInProgress } q.inprogress[id] = empty{} - return tx, nil + return nil } - return nil, ErrQueuedTxIDNotFound + return ErrQueuedTxIDNotFound } // Remove removes transaction by transaction identifier diff --git a/geth/transactions/queue/queue_test.go b/geth/transactions/queue/queue_test.go index 60abd89d1c2..e156e6b9129 100644 --- a/geth/transactions/queue/queue_test.go +++ b/geth/transactions/queue/queue_test.go @@ -33,13 +33,13 @@ func (s *QueueTestSuite) TearDownTest() { func (s *QueueTestSuite) TestLockInprogressTransaction() { tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) s.NoError(s.queue.Enqueue(tx)) - enquedTx, err := s.queue.LockInprogress(tx.ID) + enquedTx, err := s.queue.Get(tx.ID) s.NoError(err) + s.NoError(s.queue.LockInprogress(tx.ID)) s.Equal(tx, enquedTx) // verify that tx was marked as being inprogress - _, err = s.queue.LockInprogress(tx.ID) - s.Equal(ErrQueuedTxInProgress, err) + s.Equal(ErrQueuedTxInProgress, s.queue.LockInprogress(tx.ID)) } func (s *QueueTestSuite) TestGetTransaction() { diff --git a/geth/transactions/txqueue_manager.go b/geth/transactions/txqueue_manager.go index a1a9b1ebaeb..bb1ade9c450 100644 --- a/geth/transactions/txqueue_manager.go +++ b/geth/transactions/txqueue_manager.go @@ -106,15 +106,17 @@ func (m *Manager) WaitForTransaction(tx *common.QueuedTx) error { } // CompleteTransaction instructs backend to complete sending of a given transaction. -// TODO(adam): investigate a possible bug that calling this method multiple times with the same Transaction ID -// results in sending multiple transactions. func (m *Manager) CompleteTransaction(id common.QueuedTxID, password string) (hash gethcommon.Hash, err error) { log.Info("complete transaction", "id", id) - tx, err := m.txQueue.LockInprogress(id) + tx, err := m.txQueue.Get(id) if err != nil { log.Warn("error getting a queued transaction", "err", err) return hash, err } + if err := m.txQueue.LockInprogress(id); err != nil { + log.Warn("can't process transaction", "err", err) + return hash, err + } account, err := m.validateAccount(tx) if err != nil { m.txDone(tx, hash, err)