Skip to content

Commit d325ef7

Browse files
committed
Result of tx processing returned as QueuedTxResult
Currently it is quite easy to introduce concurrency issues while working with transaction object. For example, race issue will exist every time while transaction is processed in a separate goroutine and caller will try to check for an error before event to Done channel is sent. This change removes all the data that is updated on transaction and leaves it with ID, Args and Context (which is not used at the moment). Signed-off-by: Dmitry Shulyak <yashulyak@gmail.com>
1 parent 5383182 commit d325ef7

14 files changed

+176
-162
lines changed

e2e/transactions/transactions_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
495495
log.Info("transaction return event received", "id", event["id"].(string))
496496

497497
receivedErrMessage := event["error_message"].(string)
498-
expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
498+
expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error()
499499
s.Equal(receivedErrMessage, expectedErrMessage)
500500

501501
receivedErrCode := event["error_code"].(string)
@@ -511,7 +511,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
511511
To: common.ToAddress(TestConfig.Account2.Address),
512512
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
513513
})
514-
s.EqualError(err, queue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded")
514+
s.EqualError(err, transactions.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded")
515515

516516
select {
517517
case <-completeQueuedTransaction:
@@ -659,7 +659,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
659659
log.Info("transaction return event received", "id", event["id"].(string))
660660

661661
receivedErrMessage := event["error_message"].(string)
662-
expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
662+
expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error()
663663
s.Equal(receivedErrMessage, expectedErrMessage)
664664

665665
receivedErrCode := event["error_code"].(string)
@@ -681,7 +681,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
681681
To: common.ToAddress(TestConfig.Account2.Address),
682682
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
683683
})
684-
require.EqualError(err, queue.ErrQueuedTxDiscarded.Error())
684+
require.EqualError(err, transactions.ErrQueuedTxDiscarded.Error())
685685
require.Equal(gethcommon.Hash{}, txHashCheck, "transaction returned hash, while it shouldn't")
686686
}
687687

geth/api/api.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func (api *StatusAPI) CompleteTransaction(id common.QueuedTxID, password string)
172172
}
173173

174174
// CompleteTransactions instructs backend to complete sending of multiple transactions
175-
func (api *StatusAPI) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult {
175+
func (api *StatusAPI) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult {
176176
return api.b.txQueueManager.CompleteTransactions(ids, password)
177177
}
178178

geth/api/backend.go

+8-11
Original file line numberDiff line numberDiff line change
@@ -200,22 +200,19 @@ func (m *StatusBackend) CallRPC(inputJSON string) string {
200200
}
201201

202202
// SendTransaction creates a new transaction and waits until it's complete.
203-
func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (gethcommon.Hash, error) {
203+
func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (hash gethcommon.Hash, err error) {
204204
if ctx == nil {
205205
ctx = context.Background()
206206
}
207-
208207
tx := common.CreateTransaction(ctx, args)
209-
210-
if err := m.txQueueManager.QueueTransaction(tx); err != nil {
211-
return gethcommon.Hash{}, err
208+
if err = m.txQueueManager.QueueTransaction(tx); err != nil {
209+
return hash, err
212210
}
213-
214-
if err := m.txQueueManager.WaitForTransaction(tx); err != nil {
215-
return gethcommon.Hash{}, err
211+
rst := m.txQueueManager.WaitForTransaction(tx)
212+
if rst.Error != nil {
213+
return hash, rst.Error
216214
}
217-
218-
return tx.Hash, nil
215+
return rst.Hash, nil
219216
}
220217

221218
// CompleteTransaction instructs backend to complete sending of a given transaction
@@ -224,7 +221,7 @@ func (m *StatusBackend) CompleteTransaction(id common.QueuedTxID, password strin
224221
}
225222

226223
// CompleteTransactions instructs backend to complete sending of multiple transactions
227-
func (m *StatusBackend) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult {
224+
func (m *StatusBackend) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult {
228225
return m.txQueueManager.CompleteTransactions(ids, password)
229226
}
230227

geth/common/types.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ type AccountManager interface {
141141
AddressToDecryptedAccount(address, password string) (accounts.Account, *keystore.Key, error)
142142
}
143143

144-
// RawCompleteTransactionResult is a JSON returned from transaction complete function (used internally)
145-
type RawCompleteTransactionResult struct {
144+
// TransactionResult is a JSON returned from transaction complete function (used internally)
145+
type TransactionResult struct {
146146
Hash common.Hash
147147
Error error
148148
}
@@ -158,11 +158,9 @@ type QueuedTxID string
158158
// QueuedTx holds enough information to complete the queued transaction.
159159
type QueuedTx struct {
160160
ID QueuedTxID
161-
Hash common.Hash
162161
Context context.Context
163162
Args SendTxArgs
164-
Done chan struct{}
165-
Err error
163+
Result chan TransactionResult
166164
}
167165

168166
// SendTxArgs represents the arguments to submit a new transaction into the transaction pool.

geth/common/utils.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,8 @@ func Fatalf(reason interface{}, args ...interface{}) {
157157
func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx {
158158
return &QueuedTx{
159159
ID: QueuedTxID(uuid.New()),
160-
Hash: common.Hash{},
161160
Context: ctx,
162161
Args: args,
163-
Done: make(chan struct{}),
162+
Result: make(chan TransactionResult, 1),
164163
}
165164
}

geth/transactions/errors.go

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package transactions
2+
3+
import "errors"
4+
5+
var (
6+
//ErrQueuedTxTimedOut - error transaction sending timed out
7+
ErrQueuedTxTimedOut = errors.New("transaction sending timed out")
8+
//ErrQueuedTxDiscarded - error transaction discarded
9+
ErrQueuedTxDiscarded = errors.New("transaction has been discarded")
10+
)

geth/transactions/fake/mock.go

+23-23
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

geth/transactions/fake/txservice.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import (
1111
)
1212

1313
// NewTestServer returns a mocked test server
14-
func NewTestServer(ctrl *gomock.Controller) (*rpc.Server, *MockFakePublicTransactionPoolAPI) {
14+
func NewTestServer(ctrl *gomock.Controller) (*rpc.Server, *MockPublicTransactionPoolAPI) {
1515
srv := rpc.NewServer()
16-
svc := NewMockFakePublicTransactionPoolAPI(ctrl)
16+
svc := NewMockPublicTransactionPoolAPI(ctrl)
1717
if err := srv.RegisterName("eth", svc); err != nil {
1818
panic(err)
1919
}

geth/transactions/notifications.go

+12-15
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
package transactions
22

33
import (
4-
"strconv"
5-
64
"github.com/ethereum/go-ethereum/accounts/keystore"
75
"github.com/status-im/status-go/geth/common"
86
"github.com/status-im/status-go/geth/signal"
9-
"github.com/status-im/status-go/geth/transactions/queue"
107
)
118

129
const (
@@ -30,10 +27,10 @@ const (
3027
)
3128

3229
var txReturnCodes = map[error]int{
33-
nil: SendTransactionNoErrorCode,
34-
keystore.ErrDecrypt: SendTransactionPasswordErrorCode,
35-
queue.ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode,
36-
queue.ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode,
30+
nil: SendTransactionNoErrorCode,
31+
keystore.ErrDecrypt: SendTransactionPasswordErrorCode,
32+
ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode,
33+
ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode,
3734
}
3835

3936
// SendTransactionEvent is a signal sent on a send transaction request
@@ -61,17 +58,17 @@ type ReturnSendTransactionEvent struct {
6158
Args common.SendTxArgs `json:"args"`
6259
MessageID string `json:"message_id"`
6360
ErrorMessage string `json:"error_message"`
64-
ErrorCode string `json:"error_code"`
61+
ErrorCode int `json:"error_code,string"`
6562
}
6663

6764
// NotifyOnReturn returns handler that processes responses from internal tx manager
68-
func NotifyOnReturn(queuedTx *common.QueuedTx) {
69-
// discard notifications with empty tx
70-
if queuedTx == nil {
65+
func NotifyOnReturn(queuedTx *common.QueuedTx, err error) {
66+
// we don't want to notify a user if tx was sent successfully
67+
if err == nil {
7168
return
7269
}
73-
// we don't want to notify a user if tx sent successfully
74-
if queuedTx.Err == nil {
70+
// discard notifications with empty tx
71+
if queuedTx == nil {
7572
return
7673
}
7774
signal.Send(signal.Envelope{
@@ -80,8 +77,8 @@ func NotifyOnReturn(queuedTx *common.QueuedTx) {
8077
ID: string(queuedTx.ID),
8178
Args: queuedTx.Args,
8279
MessageID: common.MessageIDFromContext(queuedTx.Context),
83-
ErrorMessage: queuedTx.Err.Error(),
84-
ErrorCode: strconv.Itoa(sendTransactionErrorCode(queuedTx.Err)),
80+
ErrorMessage: err.Error(),
81+
ErrorCode: sendTransactionErrorCode(err),
8582
},
8683
})
8784
}

geth/transactions/queue/queue.go

+14-17
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,12 @@ const (
1919
)
2020

2121
var (
22+
// ErrQueuedTxExist - transaction was already enqueued
23+
ErrQueuedTxExist = errors.New("transaction already exist in queue")
2224
//ErrQueuedTxIDNotFound - error transaction hash not found
2325
ErrQueuedTxIDNotFound = errors.New("transaction hash not found")
24-
//ErrQueuedTxTimedOut - error transaction sending timed out
25-
ErrQueuedTxTimedOut = errors.New("transaction sending timed out")
26-
//ErrQueuedTxDiscarded - error transaction discarded
27-
ErrQueuedTxDiscarded = errors.New("transaction has been discarded")
28-
//ErrQueuedTxInProgress - error transaction in progress
26+
//ErrQueuedTxInProgress - error transaction is in progress
2927
ErrQueuedTxInProgress = errors.New("transaction is in progress")
30-
//ErrQueuedTxAlreadyProcessed - error transaction has already processed
31-
ErrQueuedTxAlreadyProcessed = errors.New("transaction has been already processed")
3228
//ErrInvalidCompleteTxSender - error transaction with invalid sender
3329
ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it")
3430
)
@@ -133,15 +129,18 @@ func (q *TxQueue) Reset() {
133129
// Enqueue enqueues incoming transaction
134130
func (q *TxQueue) Enqueue(tx *common.QueuedTx) error {
135131
log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID))
136-
if (tx.Hash != gethcommon.Hash{} || tx.Err != nil) {
137-
return ErrQueuedTxAlreadyProcessed
132+
q.mu.RLock()
133+
if _, ok := q.transactions[tx.ID]; ok {
134+
q.mu.RUnlock()
135+
return ErrQueuedTxExist
138136
}
137+
q.mu.RUnlock()
139138

140-
log.Info("before enqueueTicker")
139+
// we can't hold a lock in this part
140+
log.Debug("notifying eviction loop")
141141
q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item
142-
log.Info("before evictableIDs")
143-
q.evictableIDs <- tx.ID // this will block when we hit DefaultTxQueueCap
144-
log.Info("after evictableIDs")
142+
q.evictableIDs <- tx.ID // this will block when we hit DefaultTxQueueCap
143+
log.Debug("notified eviction loop")
145144

146145
q.mu.Lock()
147146
q.transactions[tx.ID] = tx
@@ -204,17 +203,15 @@ func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) er
204203

205204
func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) {
206205
delete(q.inprogress, tx.ID)
207-
tx.Err = err
208206
// hash is updated only if err is nil, but transaction is not removed from a queue
209207
if err == nil {
208+
q.transactions[tx.ID].Result <- common.TransactionResult{Hash: hash, Error: err}
210209
q.remove(tx.ID)
211-
tx.Hash = hash
212-
close(tx.Done)
213210
return
214211
}
215212
if _, transient := transientErrs[err.Error()]; !transient {
213+
q.transactions[tx.ID].Result <- common.TransactionResult{Error: err}
216214
q.remove(tx.ID)
217-
close(tx.Done)
218215
}
219216
}
220217

0 commit comments

Comments
 (0)