Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring of TxQueue and Manager #530

Merged
merged 4 commits into from
Jan 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ mock: ##@other Regenerate mocks
mockgen -source=geth/mailservice/mailservice.go -destination=geth/mailservice/mailservice_mock.go -package=mailservice
mockgen -source=geth/common/notification.go -destination=geth/common/notification_mock.go -package=common -imports fcm=github.com/NaySoftware/go-fcm
mockgen -source=geth/notification/fcm/client.go -destination=geth/notification/fcm/client_mock.go -package=fcm -imports fcm=github.com/NaySoftware/go-fcm
mockgen -source=geth/txqueue/fake/txservice.go -destination=geth/txqueue/fake/mock.go -package=fake
mockgen -source=geth/transactions/fake/txservice.go -destination=geth/transactions/fake/mock.go -package=fake

test: test-unit-coverage ##@tests Run basic, short tests during development

Expand Down
6 changes: 3 additions & 3 deletions e2e/jail/jail_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/txqueue"
"github.com/status-im/status-go/geth/transactions"
. "github.com/status-im/status-go/testing"
"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -126,7 +126,7 @@ func (s *JailRPCTestSuite) TestContractDeployment() {
unmarshalErr := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(unmarshalErr, "cannot unmarshal JSON: %s", jsonEvent)

if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
s.T().Logf("transaction queued and will be completed shortly, id: %v", event["id"])

Expand Down Expand Up @@ -284,7 +284,7 @@ func (s *JailRPCTestSuite) TestJailVMPersistence() {
s.T().Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
return
}
if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
s.T().Logf("Transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string))

Expand Down
79 changes: 43 additions & 36 deletions e2e/transactions/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math/big"
"reflect"
"sync"
"testing"
"time"

Expand All @@ -18,7 +19,8 @@ import (
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/txqueue"
"github.com/status-im/status-go/geth/transactions"
"github.com/status-im/status-go/geth/transactions/queue"
. "github.com/status-im/status-go/testing"
"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -48,7 +50,7 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransaction() {
err := json.Unmarshal([]byte(rawSignal), &sg)
s.NoError(err)

if sg.Type == txqueue.EventTransactionQueued {
if sg.Type == transactions.EventTransactionQueued {
event := sg.Event.(map[string]interface{})
txID := event["id"].(string)
txHash, err = s.Backend.CompleteTransaction(common.QueuedTxID(txID), TestConfig.Account1.Password)
Expand Down Expand Up @@ -100,7 +102,7 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransactionUpstream() {
err := json.Unmarshal([]byte(rawSignal), &signalEnvelope)
s.NoError(err)

if signalEnvelope.Type == txqueue.EventTransactionQueued {
if signalEnvelope.Type == transactions.EventTransactionQueued {
event := signalEnvelope.Event.(map[string]interface{})
txID := event["id"].(string)

Expand Down Expand Up @@ -156,7 +158,7 @@ func (s *TransactionsTestSuite) TestSendContractTx() {
err = json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))

if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))

Expand All @@ -182,7 +184,7 @@ func (s *TransactionsTestSuite) TestSendContractTx() {
)
s.EqualError(
err,
txqueue.ErrInvalidCompleteTxSender.Error(),
queue.ErrInvalidCompleteTxSender.Error(),
fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]),
)

Expand Down Expand Up @@ -247,7 +249,7 @@ func (s *TransactionsTestSuite) TestSendEther() {
err = json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))

if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))

Expand All @@ -271,7 +273,7 @@ func (s *TransactionsTestSuite) TestSendEther() {
common.QueuedTxID(event["id"].(string)), TestConfig.Account1.Password)
s.EqualError(
err,
txqueue.ErrInvalidCompleteTxSender.Error(),
queue.ErrInvalidCompleteTxSender.Error(),
fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]),
)

Expand Down Expand Up @@ -330,7 +332,7 @@ func (s *TransactionsTestSuite) TestSendEtherTxUpstream() {
err = json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, "cannot unmarshal JSON: %s", jsonEvent)

if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))

Expand Down Expand Up @@ -387,7 +389,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() {
err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))

if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txID := common.QueuedTxID(event["id"].(string))
log.Info("transaction queued (will be failed and completed on the second call)", "id", txID)
Expand All @@ -407,7 +409,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() {
close(completeQueuedTransaction)
}

if envelope.Type == txqueue.EventTransactionFailed {
if envelope.Type == transactions.EventTransactionFailed {
event := envelope.Event.(map[string]interface{})
log.Info("transaction return event received", "id", event["id"].(string))

Expand Down Expand Up @@ -466,7 +468,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))

if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txID := common.QueuedTxID(event["id"].(string))
log.Info("transaction queued (will be discarded soon)", "id", txID)
Expand All @@ -488,12 +490,12 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
close(completeQueuedTransaction)
}

if envelope.Type == txqueue.EventTransactionFailed {
if envelope.Type == transactions.EventTransactionFailed {
event := envelope.Event.(map[string]interface{})
log.Info("transaction return event received", "id", event["id"].(string))

receivedErrMessage := event["error_message"].(string)
expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error()
expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
s.Equal(receivedErrMessage, expectedErrMessage)

receivedErrCode := event["error_code"].(string)
Expand All @@ -509,7 +511,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
To: common.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
})
s.EqualError(err, txqueue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded")
s.EqualError(err, queue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded")

select {
case <-completeQueuedTransaction:
Expand Down Expand Up @@ -543,7 +545,7 @@ func (s *TransactionsTestSuite) TestCompleteMultipleQueuedTransactions() {
err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))

if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txID := common.QueuedTxID(event["id"].(string))
log.Info("transaction queued (will be completed in a single call, once aggregated)", "id", txID)
Expand Down Expand Up @@ -640,7 +642,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
var envelope signal.Envelope
err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err)
if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txID := common.QueuedTxID(event["id"].(string))
log.Info("transaction queued (will be discarded soon)", "id", txID)
Expand All @@ -650,12 +652,12 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
txIDs <- txID
}

if envelope.Type == txqueue.EventTransactionFailed {
if envelope.Type == transactions.EventTransactionFailed {
event := envelope.Event.(map[string]interface{})
log.Info("transaction return event received", "id", event["id"].(string))

receivedErrMessage := event["error_message"].(string)
expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error()
expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
s.Equal(receivedErrMessage, expectedErrMessage)

receivedErrCode := event["error_code"].(string)
Expand All @@ -675,7 +677,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
To: common.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
})
s.EqualError(err, txqueue.ErrQueuedTxDiscarded.Error())
s.EqualError(err, queue.ErrQueuedTxDiscarded.Error())

s.True(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction returned hash, while it shouldn't")
}
Expand Down Expand Up @@ -747,7 +749,7 @@ func (s *TransactionsTestSuite) TestNonExistentQueuedTransactions() {
// try completing non-existing transaction
_, err := s.Backend.CompleteTransaction("some-bad-transaction-id", TestConfig.Account1.Password)
s.Error(err, "error expected and not received")
s.EqualError(err, txqueue.ErrQueuedTxIDNotFound.Error())
s.EqualError(err, queue.ErrQueuedTxIDNotFound.Error())
}

func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() {
Expand All @@ -756,6 +758,24 @@ func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() {

backend := s.LightEthereumService().StatusBackend
s.NotNil(backend)
var m sync.Mutex
txCount := 0
txIDs := [queue.DefaultTxQueueCap + 5 + 10]common.QueuedTxID{}

signal.SetDefaultNodeNotificationHandler(func(rawSignal string) {
var sg signal.Envelope
err := json.Unmarshal([]byte(rawSignal), &sg)
s.NoError(err)

if sg.Type == transactions.EventTransactionQueued {
event := sg.Event.(map[string]interface{})
txID := event["id"].(string)
m.Lock()
txIDs[txCount] = common.QueuedTxID(txID)
txCount++
m.Unlock()
}
})

// reset queue
s.Backend.TxQueueManager().TransactionQueue().Reset()
Expand All @@ -764,36 +784,23 @@ func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() {
s.NoError(s.Backend.AccountManager().SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password))

txQueue := s.Backend.TxQueueManager().TransactionQueue()
var i = 0
txIDs := [txqueue.DefaultTxQueueCap + 5 + 10]common.QueuedTxID{}
s.Backend.TxQueueManager().SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) {
log.Info("tx enqueued", "i", i+1, "queue size", txQueue.Count(), "id", queuedTx.ID)
txIDs[i] = queuedTx.ID
i++
})

s.Zero(txQueue.Count(), "transaction count should be zero")

for j := 0; j < 10; j++ {
go s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{}) // nolint: errcheck
}
time.Sleep(2 * time.Second) // FIXME(tiabc): more reliable synchronization to ensure all transactions are enqueued

log.Info(fmt.Sprintf("Number of transactions queued: %d. Queue size (shouldn't be more than %d): %d",
i, txqueue.DefaultTxQueueCap, txQueue.Count()))

time.Sleep(2 * time.Second)
s.Equal(10, txQueue.Count(), "transaction count should be 10")

for i := 0; i < txqueue.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines
for i := 0; i < queue.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines
go s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{}) // nolint: errcheck
}
time.Sleep(5 * time.Second)

s.True(txQueue.Count() <= txqueue.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", txqueue.DefaultTxQueueCap, txqueue.DefaultTxQueueCap-1, txQueue.Count())
s.True(txQueue.Count() <= queue.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", queue.DefaultTxQueueCap, queue.DefaultTxQueueCap-1, txQueue.Count())

for _, txID := range txIDs {
txQueue.Remove(txID)
}

s.Zero(txQueue.Count(), "transaction count should be zero: %d", txQueue.Count())
}
12 changes: 3 additions & 9 deletions geth/api/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/status-im/status-go/geth/notification/fcm"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/txqueue"
"github.com/status-im/status-go/geth/transactions"
)

const (
Expand All @@ -38,7 +38,7 @@ func NewStatusBackend() *StatusBackend {

nodeManager := node.NewNodeManager()
accountManager := account.NewManager(nodeManager)
txQueueManager := txqueue.NewManager(nodeManager, accountManager)
txQueueManager := transactions.NewManager(nodeManager, accountManager)
jailManager := jail.New(nodeManager)
notificationManager := fcm.NewNotification(fcmServerKey)

Expand Down Expand Up @@ -205,7 +205,7 @@ func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxA
ctx = context.Background()
}

tx := m.txQueueManager.CreateTransaction(ctx, args)
tx := common.CreateTransaction(ctx, args)

if err := m.txQueueManager.QueueTransaction(tx); err != nil {
return gethcommon.Hash{}, err
Expand Down Expand Up @@ -247,11 +247,5 @@ func (m *StatusBackend) registerHandlers() error {

rpcClient.RegisterHandler("eth_accounts", m.accountManager.AccountsRPCHandler())
rpcClient.RegisterHandler("eth_sendTransaction", m.txQueueManager.SendTransactionRPCHandler)
m.txQueueManager.SetTransactionQueueHandler(m.txQueueManager.TransactionQueueHandler())
log.Info("Registered handler", "fn", "TransactionQueueHandler")

m.txQueueManager.SetTransactionReturnHandler(m.txQueueManager.TransactionReturnHandler())
log.Info("Registered handler", "fn", "TransactionReturnHandler")

return nil
}
38 changes: 6 additions & 32 deletions geth/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,12 @@ type QueuedTxID string

// QueuedTx holds enough information to complete the queued transaction.
type QueuedTx struct {
ID QueuedTxID
Hash common.Hash
Context context.Context
Args SendTxArgs
InProgress bool // true if transaction is being sent
Done chan struct{}
Discard chan struct{}
Err error
ID QueuedTxID
Hash common.Hash
Context context.Context
Args SendTxArgs
Done chan struct{}
Err error
}

// SendTxArgs represents the arguments to submit a new transaction into the transaction pool.
Expand All @@ -175,12 +173,6 @@ type SendTxArgs struct {
Nonce *hexutil.Uint64 `json:"nonce"`
}

// EnqueuedTxHandler is a function that receives queued/pending transactions, when they get queued
type EnqueuedTxHandler func(*QueuedTx)

// EnqueuedTxReturnHandler is a function that receives response when tx is complete (both on success and error)
type EnqueuedTxReturnHandler func(*QueuedTx, error)

// TxQueue is a queue of transactions.
type TxQueue interface {
// Remove removes a transaction from the queue.
Expand All @@ -207,32 +199,14 @@ type TxQueueManager interface {
// TransactionQueue returns a transaction queue.
TransactionQueue() TxQueue

// CreateTransactoin creates a new transaction.
CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx

// QueueTransaction adds a new transaction to the queue.
QueueTransaction(tx *QueuedTx) error

// WaitForTransactions blocks until transaction is completed, discarded or timed out.
WaitForTransaction(tx *QueuedTx) error

// NotifyOnQueuedTxReturn notifies a handler when a transaction returns.
NotifyOnQueuedTxReturn(queuedTx *QueuedTx, err error)

// TransactionQueueHandler returns handler that processes incoming tx queue requests
TransactionQueueHandler() func(queuedTx *QueuedTx)

// TODO(adam): might be not needed
SetTransactionQueueHandler(fn EnqueuedTxHandler)

// TODO(adam): might be not needed
SetTransactionReturnHandler(fn EnqueuedTxReturnHandler)

SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error)

// TransactionReturnHandler returns handler that processes responses from internal tx manager
TransactionReturnHandler() func(queuedTx *QueuedTx, err error)

// CompleteTransaction instructs backend to complete sending of a given transaction
CompleteTransaction(id QueuedTxID, password string) (common.Hash, error)

Expand Down
Loading