Skip to content

Commit 775fb83

Browse files
committed
General refactoring of txqueue and manager
Changes to TxQueue: - factored out into a separate module to ensure that only exported methods will be used - deleted EnqueuAsync as it is not used, it is always possible to restore it from git, if it will be ever needed - notifications moved to a manager, with the goal to simplify txqueue and separate concerns between manager and txqueue - simplified API for processing transactions After analyzing code I removed all redundant methods and currently all the processing can be done with Get and Done methods. - added inprogress map to store transactions that are taken for processing and removed Inprogress variable from transaction, the goal is to cleanup tx structure - removed Discard channel from transaction, transaction.Err can be used to understand if transaction was discarded or failed due to another error - transient errors stored as global variable of queue module for simplicity, it is unlikely that they will become dynamic Change to Manager: - simplified code that manages notifications, all handlers were removed and notifiers refactored to be simple functions that are called in two places, when transaction is queued and when manager finished waiting for transaction - notifications can be turned of by calling DisableNotifications on manager - made a function out of CreateTransaction method, as it can be used as a simple utility
1 parent 6567159 commit 775fb83

17 files changed

+432
-532
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ mock: ##@other Regenerate mocks
103103
mockgen -source=geth/mailservice/mailservice.go -destination=geth/mailservice/mailservice_mock.go -package=mailservice
104104
mockgen -source=geth/common/notification.go -destination=geth/common/notification_mock.go -package=common -imports fcm=github.com/NaySoftware/go-fcm
105105
mockgen -source=geth/notification/fcm/client.go -destination=geth/notification/fcm/client_mock.go -package=fcm -imports fcm=github.com/NaySoftware/go-fcm
106-
mockgen -source=geth/txqueue/fake/txservice.go -destination=geth/txqueue/fake/mock.go -package=fake
106+
mockgen -source=geth/transactions/fake/txservice.go -destination=geth/transactions/fake/mock.go -package=fake
107107

108108
test: test-unit-coverage ##@tests Run basic, short tests during development
109109

e2e/jail/jail_rpc_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"github.com/status-im/status-go/geth/common"
1515
"github.com/status-im/status-go/geth/params"
1616
"github.com/status-im/status-go/geth/signal"
17-
"github.com/status-im/status-go/geth/txqueue"
17+
"github.com/status-im/status-go/geth/transactions"
1818
. "github.com/status-im/status-go/testing"
1919
"github.com/stretchr/testify/suite"
2020
)
@@ -126,7 +126,7 @@ func (s *JailRPCTestSuite) TestContractDeployment() {
126126
unmarshalErr := json.Unmarshal([]byte(jsonEvent), &envelope)
127127
s.NoError(unmarshalErr, "cannot unmarshal JSON: %s", jsonEvent)
128128

129-
if envelope.Type == txqueue.EventTransactionQueued {
129+
if envelope.Type == transactions.EventTransactionQueued {
130130
event := envelope.Event.(map[string]interface{})
131131
s.T().Logf("transaction queued and will be completed shortly, id: %v", event["id"])
132132

@@ -282,7 +282,7 @@ func (s *JailRPCTestSuite) TestJailVMPersistence() {
282282
s.T().Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
283283
return
284284
}
285-
if envelope.Type == txqueue.EventTransactionQueued {
285+
if envelope.Type == transactions.EventTransactionQueued {
286286
event := envelope.Event.(map[string]interface{})
287287
s.T().Logf("Transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string))
288288

e2e/transactions/transactions_test.go

+45-35
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"math/big"
88
"reflect"
9+
"sync"
910
"testing"
1011
"time"
1112

@@ -18,7 +19,8 @@ import (
1819
"github.com/status-im/status-go/geth/common"
1920
"github.com/status-im/status-go/geth/params"
2021
"github.com/status-im/status-go/geth/signal"
21-
"github.com/status-im/status-go/geth/txqueue"
22+
"github.com/status-im/status-go/geth/transactions"
23+
"github.com/status-im/status-go/geth/transactions/queue"
2224
. "github.com/status-im/status-go/testing"
2325
"github.com/stretchr/testify/suite"
2426
)
@@ -48,7 +50,7 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransaction() {
4850
err := json.Unmarshal([]byte(rawSignal), &sg)
4951
s.NoError(err)
5052

51-
if sg.Type == txqueue.EventTransactionQueued {
53+
if sg.Type == transactions.EventTransactionQueued {
5254
event := sg.Event.(map[string]interface{})
5355
txID := event["id"].(string)
5456
txHash, err = s.Backend.CompleteTransaction(common.QueuedTxID(txID), TestConfig.Account1.Password)
@@ -100,7 +102,7 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransactionUpstream() {
100102
err := json.Unmarshal([]byte(rawSignal), &signalEnvelope)
101103
s.NoError(err)
102104

103-
if signalEnvelope.Type == txqueue.EventTransactionQueued {
105+
if signalEnvelope.Type == transactions.EventTransactionQueued {
104106
event := signalEnvelope.Event.(map[string]interface{})
105107
txID := event["id"].(string)
106108

@@ -156,7 +158,7 @@ func (s *TransactionsTestSuite) TestSendContractTx() {
156158
err = json.Unmarshal([]byte(jsonEvent), &envelope)
157159
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
158160

159-
if envelope.Type == txqueue.EventTransactionQueued {
161+
if envelope.Type == transactions.EventTransactionQueued {
160162
event := envelope.Event.(map[string]interface{})
161163
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))
162164

@@ -182,7 +184,7 @@ func (s *TransactionsTestSuite) TestSendContractTx() {
182184
)
183185
s.EqualError(
184186
err,
185-
txqueue.ErrInvalidCompleteTxSender.Error(),
187+
queue.ErrInvalidCompleteTxSender.Error(),
186188
fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]),
187189
)
188190

@@ -247,7 +249,7 @@ func (s *TransactionsTestSuite) TestSendEther() {
247249
err = json.Unmarshal([]byte(jsonEvent), &envelope)
248250
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
249251

250-
if envelope.Type == txqueue.EventTransactionQueued {
252+
if envelope.Type == transactions.EventTransactionQueued {
251253
event := envelope.Event.(map[string]interface{})
252254
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))
253255

@@ -271,7 +273,7 @@ func (s *TransactionsTestSuite) TestSendEther() {
271273
common.QueuedTxID(event["id"].(string)), TestConfig.Account1.Password)
272274
s.EqualError(
273275
err,
274-
txqueue.ErrInvalidCompleteTxSender.Error(),
276+
queue.ErrInvalidCompleteTxSender.Error(),
275277
fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]),
276278
)
277279

@@ -330,7 +332,7 @@ func (s *TransactionsTestSuite) TestSendEtherTxUpstream() {
330332
err = json.Unmarshal([]byte(jsonEvent), &envelope)
331333
s.NoError(err, "cannot unmarshal JSON: %s", jsonEvent)
332334

333-
if envelope.Type == txqueue.EventTransactionQueued {
335+
if envelope.Type == transactions.EventTransactionQueued {
334336
event := envelope.Event.(map[string]interface{})
335337
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))
336338

@@ -387,7 +389,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() {
387389
err := json.Unmarshal([]byte(jsonEvent), &envelope)
388390
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
389391

390-
if envelope.Type == txqueue.EventTransactionQueued {
392+
if envelope.Type == transactions.EventTransactionQueued {
391393
event := envelope.Event.(map[string]interface{})
392394
txID := common.QueuedTxID(event["id"].(string))
393395
log.Info("transaction queued (will be failed and completed on the second call)", "id", txID)
@@ -407,7 +409,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() {
407409
close(completeQueuedTransaction)
408410
}
409411

410-
if envelope.Type == txqueue.EventTransactionFailed {
412+
if envelope.Type == transactions.EventTransactionFailed {
411413
event := envelope.Event.(map[string]interface{})
412414
log.Info("transaction return event received", "id", event["id"].(string))
413415

@@ -466,7 +468,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
466468
err := json.Unmarshal([]byte(jsonEvent), &envelope)
467469
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
468470

469-
if envelope.Type == txqueue.EventTransactionQueued {
471+
if envelope.Type == transactions.EventTransactionQueued {
470472
event := envelope.Event.(map[string]interface{})
471473
txID := common.QueuedTxID(event["id"].(string))
472474
log.Info("transaction queued (will be discarded soon)", "id", txID)
@@ -488,12 +490,12 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
488490
close(completeQueuedTransaction)
489491
}
490492

491-
if envelope.Type == txqueue.EventTransactionFailed {
493+
if envelope.Type == transactions.EventTransactionFailed {
492494
event := envelope.Event.(map[string]interface{})
493495
log.Info("transaction return event received", "id", event["id"].(string))
494496

495497
receivedErrMessage := event["error_message"].(string)
496-
expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error()
498+
expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
497499
s.Equal(receivedErrMessage, expectedErrMessage)
498500

499501
receivedErrCode := event["error_code"].(string)
@@ -509,7 +511,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
509511
To: common.ToAddress(TestConfig.Account2.Address),
510512
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
511513
})
512-
s.EqualError(err, txqueue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded")
514+
s.EqualError(err, queue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded")
513515

514516
select {
515517
case <-completeQueuedTransaction:
@@ -543,7 +545,7 @@ func (s *TransactionsTestSuite) TestCompleteMultipleQueuedTransactions() {
543545
err := json.Unmarshal([]byte(jsonEvent), &envelope)
544546
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
545547

546-
if envelope.Type == txqueue.EventTransactionQueued {
548+
if envelope.Type == transactions.EventTransactionQueued {
547549
event := envelope.Event.(map[string]interface{})
548550
txID := common.QueuedTxID(event["id"].(string))
549551
log.Info("transaction queued (will be completed in a single call, once aggregated)", "id", txID)
@@ -640,7 +642,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
640642
var envelope signal.Envelope
641643
err := json.Unmarshal([]byte(jsonEvent), &envelope)
642644
s.NoError(err)
643-
if envelope.Type == txqueue.EventTransactionQueued {
645+
if envelope.Type == transactions.EventTransactionQueued {
644646
event := envelope.Event.(map[string]interface{})
645647
txID := common.QueuedTxID(event["id"].(string))
646648
log.Info("transaction queued (will be discarded soon)", "id", txID)
@@ -650,12 +652,12 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
650652
txIDs <- txID
651653
}
652654

653-
if envelope.Type == txqueue.EventTransactionFailed {
655+
if envelope.Type == transactions.EventTransactionFailed {
654656
event := envelope.Event.(map[string]interface{})
655657
log.Info("transaction return event received", "id", event["id"].(string))
656658

657659
receivedErrMessage := event["error_message"].(string)
658-
expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error()
660+
expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
659661
s.Equal(receivedErrMessage, expectedErrMessage)
660662

661663
receivedErrCode := event["error_code"].(string)
@@ -675,7 +677,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
675677
To: common.ToAddress(TestConfig.Account2.Address),
676678
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
677679
})
678-
s.EqualError(err, txqueue.ErrQueuedTxDiscarded.Error())
680+
s.EqualError(err, queue.ErrQueuedTxDiscarded.Error())
679681

680682
s.True(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction returned hash, while it shouldn't")
681683
}
@@ -747,7 +749,7 @@ func (s *TransactionsTestSuite) TestNonExistentQueuedTransactions() {
747749
// try completing non-existing transaction
748750
_, err := s.Backend.CompleteTransaction("some-bad-transaction-id", TestConfig.Account1.Password)
749751
s.Error(err, "error expected and not received")
750-
s.EqualError(err, txqueue.ErrQueuedTxIDNotFound.Error())
752+
s.EqualError(err, queue.ErrQueuedTxIDNotFound.Error())
751753
}
752754

753755
func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() {
@@ -756,6 +758,24 @@ func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() {
756758

757759
backend := s.LightEthereumService().StatusBackend
758760
s.NotNil(backend)
761+
var m sync.Mutex
762+
txCount := 0
763+
txIDs := [queue.DefaultTxQueueCap + 5 + 10]common.QueuedTxID{}
764+
765+
signal.SetDefaultNodeNotificationHandler(func(rawSignal string) {
766+
var sg signal.Envelope
767+
err := json.Unmarshal([]byte(rawSignal), &sg)
768+
s.NoError(err)
769+
770+
if sg.Type == transactions.EventTransactionQueued {
771+
event := sg.Event.(map[string]interface{})
772+
txID := event["id"].(string)
773+
m.Lock()
774+
txIDs[txCount] = common.QueuedTxID(txID)
775+
txCount++
776+
m.Unlock()
777+
}
778+
})
759779

760780
// reset queue
761781
s.Backend.TxQueueManager().TransactionQueue().Reset()
@@ -764,36 +784,26 @@ func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() {
764784
s.NoError(s.Backend.AccountManager().SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password))
765785

766786
txQueue := s.Backend.TxQueueManager().TransactionQueue()
767-
var i = 0
768-
txIDs := [txqueue.DefaultTxQueueCap + 5 + 10]common.QueuedTxID{}
769-
s.Backend.TxQueueManager().SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) {
770-
log.Info("tx enqueued", "i", i+1, "queue size", txQueue.Count(), "id", queuedTx.ID)
771-
txIDs[i] = queuedTx.ID
772-
i++
773-
})
774-
775787
s.Zero(txQueue.Count(), "transaction count should be zero")
776788

777789
for j := 0; j < 10; j++ {
778790
go s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{}) // nolint: errcheck
779791
}
780-
time.Sleep(2 * time.Second) // FIXME(tiabc): more reliable synchronization to ensure all transactions are enqueued
781-
782-
log.Info(fmt.Sprintf("Number of transactions queued: %d. Queue size (shouldn't be more than %d): %d",
783-
i, txqueue.DefaultTxQueueCap, txQueue.Count()))
792+
time.Sleep(2 * time.Second)
784793

794+
log.Info(fmt.Sprintf("Number of transactions sent: %d. Queue size (shouldn't be more than %d): %d",
795+
txCount, queue.DefaultTxQueueCap, txQueue.Count()))
785796
s.Equal(10, txQueue.Count(), "transaction count should be 10")
786797

787-
for i := 0; i < txqueue.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines
798+
for i := 0; i < queue.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines
788799
go s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{}) // nolint: errcheck
789800
}
790801
time.Sleep(5 * time.Second)
791802

792-
s.True(txQueue.Count() <= txqueue.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", txqueue.DefaultTxQueueCap, txqueue.DefaultTxQueueCap-1, txQueue.Count())
803+
s.True(txQueue.Count() <= queue.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", queue.DefaultTxQueueCap, queue.DefaultTxQueueCap-1, txQueue.Count())
793804

794805
for _, txID := range txIDs {
795806
txQueue.Remove(txID)
796807
}
797-
798808
s.Zero(txQueue.Count(), "transaction count should be zero: %d", txQueue.Count())
799809
}

geth/api/backend.go

+3-9
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"github.com/status-im/status-go/geth/notification/fcm"
1414
"github.com/status-im/status-go/geth/params"
1515
"github.com/status-im/status-go/geth/signal"
16-
"github.com/status-im/status-go/geth/txqueue"
16+
"github.com/status-im/status-go/geth/transactions"
1717
)
1818

1919
const (
@@ -38,7 +38,7 @@ func NewStatusBackend() *StatusBackend {
3838

3939
nodeManager := node.NewNodeManager()
4040
accountManager := account.NewManager(nodeManager)
41-
txQueueManager := txqueue.NewManager(nodeManager, accountManager)
41+
txQueueManager := transactions.NewManager(nodeManager, accountManager)
4242
jailManager := jail.New(nodeManager)
4343
notificationManager := fcm.NewNotification(fcmServerKey)
4444

@@ -205,7 +205,7 @@ func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxA
205205
ctx = context.Background()
206206
}
207207

208-
tx := m.txQueueManager.CreateTransaction(ctx, args)
208+
tx := common.CreateTransaction(ctx, args)
209209

210210
if err := m.txQueueManager.QueueTransaction(tx); err != nil {
211211
return gethcommon.Hash{}, err
@@ -247,11 +247,5 @@ func (m *StatusBackend) registerHandlers() error {
247247

248248
rpcClient.RegisterHandler("eth_accounts", m.accountManager.AccountsRPCHandler())
249249
rpcClient.RegisterHandler("eth_sendTransaction", m.txQueueManager.SendTransactionRPCHandler)
250-
m.txQueueManager.SetTransactionQueueHandler(m.txQueueManager.TransactionQueueHandler())
251-
log.Info("Registered handler", "fn", "TransactionQueueHandler")
252-
253-
m.txQueueManager.SetTransactionReturnHandler(m.txQueueManager.TransactionReturnHandler())
254-
log.Info("Registered handler", "fn", "TransactionReturnHandler")
255-
256250
return nil
257251
}

0 commit comments

Comments
 (0)