Skip to content

Commit bcf7d47

Browse files
committed
Result of tx processing returned as QueuedTxResult
Currently it is quite easy to introduce concurrency issues while working with transaction object. For example, race issue will exist every time while transaction is processed in a separate goroutine and caller will try to check for an error before event to Done channel is sent. This change removes all the data that is updated on transaction and leaves it with ID, Args and Context (which is not used at the moment).
1 parent 820a67a commit bcf7d47

9 files changed

+97
-110
lines changed

geth/api/backend.go

+6-9
Original file line numberDiff line numberDiff line change
@@ -200,22 +200,19 @@ func (m *StatusBackend) CallRPC(inputJSON string) string {
200200
}
201201

202202
// SendTransaction creates a new transaction and waits until it's complete.
203-
func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (gethcommon.Hash, error) {
203+
func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (hash gethcommon.Hash, err error) {
204204
if ctx == nil {
205205
ctx = context.Background()
206206
}
207-
208207
tx := common.CreateTransaction(ctx, args)
209-
210208
if err := m.txQueueManager.QueueTransaction(tx); err != nil {
211-
return gethcommon.Hash{}, err
209+
return hash, err
212210
}
213-
214-
if err := m.txQueueManager.WaitForTransaction(tx); err != nil {
215-
return gethcommon.Hash{}, err
211+
rst := m.txQueueManager.WaitForTransaction(tx)
212+
if rst.Error != nil {
213+
return hash, rst.Error
216214
}
217-
218-
return tx.Hash, nil
215+
return rst.Hash, nil
219216
}
220217

221218
// CompleteTransaction instructs backend to complete sending of a given transaction

geth/common/types.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,9 @@ type QueuedTxID string
155155
// QueuedTx holds enough information to complete the queued transaction.
156156
type QueuedTx struct {
157157
ID QueuedTxID
158-
Hash common.Hash
159158
Context context.Context
160159
Args SendTxArgs
161-
Done chan struct{}
162-
Err error
160+
Result chan RawCompleteTransactionResult
163161
}
164162

165163
// SendTxArgs represents the arguments to submit a new transaction into the transaction pool.
@@ -203,7 +201,7 @@ type TxQueueManager interface {
203201
QueueTransaction(tx *QueuedTx) error
204202

205203
// WaitForTransactions blocks until transaction is completed, discarded or timed out.
206-
WaitForTransaction(tx *QueuedTx) error
204+
WaitForTransaction(tx *QueuedTx) RawCompleteTransactionResult
207205

208206
SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error)
209207

geth/common/types_mock.go

+2-12
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

geth/common/utils.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,8 @@ func Fatalf(reason interface{}, args ...interface{}) {
157157
func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx {
158158
return &QueuedTx{
159159
ID: QueuedTxID(uuid.New()),
160-
Hash: common.Hash{},
161160
Context: ctx,
162161
Args: args,
163-
Done: make(chan struct{}, 1),
162+
Result: make(chan RawCompleteTransactionResult, 1),
164163
}
165164
}

geth/transactions/notifications.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ type ReturnSendTransactionEvent struct {
5858
}
5959

6060
// NotifyOnReturn returns handler that processes responses from internal tx manager
61-
func NotifyOnReturn(queuedTx *common.QueuedTx) {
62-
if queuedTx.Err == nil {
61+
func NotifyOnReturn(queuedTx *common.QueuedTx, err error) {
62+
if err == nil {
6363
return
6464
}
6565

@@ -75,8 +75,8 @@ func NotifyOnReturn(queuedTx *common.QueuedTx) {
7575
ID: string(queuedTx.ID),
7676
Args: queuedTx.Args,
7777
MessageID: common.MessageIDFromContext(queuedTx.Context),
78-
ErrorMessage: queuedTx.Err.Error(),
79-
ErrorCode: sendTransactionErrorCode(queuedTx.Err),
78+
ErrorMessage: err.Error(),
79+
ErrorCode: sendTransactionErrorCode(err),
8080
},
8181
})
8282
}

geth/transactions/queue/queue.go

+2-7
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,6 @@ func (q *TxQueue) Reset() {
133133
// Enqueue enqueues incoming transaction
134134
func (q *TxQueue) Enqueue(tx *common.QueuedTx) error {
135135
log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID))
136-
if (tx.Hash != gethcommon.Hash{} || tx.Err != nil) {
137-
return ErrQueuedTxAlreadyProcessed
138-
}
139136

140137
log.Info("before enqueueTicker")
141138
q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item
@@ -205,18 +202,16 @@ func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) er
205202

206203
func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) {
207204
delete(q.inprogress, tx.ID)
208-
tx.Err = err
209205
// hash is updated only if err is nil, but transaction is not removed from a queue
210206
if err == nil {
207+
q.transactions[tx.ID].Result <- common.RawCompleteTransactionResult{Hash: hash, Error: err}
211208
q.remove(tx.ID)
212-
tx.Hash = hash
213-
tx.Done <- struct{}{}
214209
return
215210
}
216211
_, transient := transientErrs[err.Error()]
217212
if !transient {
213+
q.transactions[tx.ID].Result <- common.RawCompleteTransactionResult{Error: err}
218214
q.remove(tx.ID)
219-
tx.Done <- struct{}{}
220215
}
221216
}
222217

geth/transactions/queue/queue_test.go

+11-19
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,6 @@ func (s *QueueTestSuite) TestGetTransaction() {
5252
}
5353
}
5454

55-
func (s *QueueTestSuite) TestEnqueueProcessedTransaction() {
56-
// enqueue will fail if transaction with hash will be enqueued
57-
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{})
58-
tx.Hash = gethcommon.Hash{1}
59-
s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx))
60-
}
61-
6255
func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.QueuedTx {
6356
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{})
6457
s.NoError(s.queue.Enqueue(tx))
@@ -69,12 +62,12 @@ func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.Queue
6962
func (s *QueueTestSuite) TestDoneSuccess() {
7063
hash := gethcommon.Hash{1}
7164
tx := s.testDone(hash, nil)
72-
s.NoError(tx.Err)
73-
s.Equal(hash, tx.Hash)
74-
s.False(s.queue.Has(tx.ID))
7565
// event is sent only if transaction was removed from a queue
7666
select {
77-
case <-tx.Done:
67+
case rst := <-tx.Result:
68+
s.NoError(rst.Error)
69+
s.Equal(hash, rst.Hash)
70+
s.False(s.queue.Has(tx.ID))
7871
default:
7972
s.Fail("No event was sent to Done channel")
8073
}
@@ -84,23 +77,22 @@ func (s *QueueTestSuite) TestDoneTransientError() {
8477
hash := gethcommon.Hash{1}
8578
err := keystore.ErrDecrypt
8679
tx := s.testDone(hash, err)
87-
s.Equal(keystore.ErrDecrypt, tx.Err)
88-
s.NotEqual(hash, tx.Hash)
89-
s.Equal(gethcommon.Hash{}, tx.Hash)
9080
s.True(s.queue.Has(tx.ID))
81+
_, inp := s.queue.inprogress[tx.ID]
82+
s.False(inp)
9183
}
9284

9385
func (s *QueueTestSuite) TestDoneError() {
9486
hash := gethcommon.Hash{1}
9587
err := errors.New("test")
9688
tx := s.testDone(hash, err)
97-
s.Equal(err, tx.Err)
98-
s.NotEqual(hash, tx.Hash)
99-
s.Equal(gethcommon.Hash{}, tx.Hash)
100-
s.False(s.queue.Has(tx.ID))
10189
// event is sent only if transaction was removed from a queue
10290
select {
103-
case <-tx.Done:
91+
case rst := <-tx.Result:
92+
s.Equal(err, rst.Error)
93+
s.NotEqual(hash, rst.Hash)
94+
s.Equal(gethcommon.Hash{}, rst.Hash)
95+
s.False(s.queue.Has(tx.ID))
10496
default:
10597
s.Fail("No event was sent to Done channel")
10698
}

geth/transactions/txqueue_manager.go

+32-28
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,24 @@ const (
2525

2626
// Manager provides means to manage internal Status Backend (injected into LES)
2727
type Manager struct {
28-
nodeManager common.NodeManager
29-
accountManager common.AccountManager
30-
txQueue *queue.TxQueue
31-
ethTxClient EthTransactor
32-
addrLock *AddrLocker
33-
notify bool
28+
nodeManager common.NodeManager
29+
accountManager common.AccountManager
30+
txQueue *queue.TxQueue
31+
ethTxClient EthTransactor
32+
addrLock *AddrLocker
33+
notify bool
34+
sendCompletionTimeout time.Duration
3435
}
3536

3637
// NewManager returns a new Manager.
3738
func NewManager(nodeManager common.NodeManager, accountManager common.AccountManager) *Manager {
3839
return &Manager{
39-
nodeManager: nodeManager,
40-
accountManager: accountManager,
41-
txQueue: queue.NewQueue(),
42-
addrLock: &AddrLocker{},
43-
notify: true,
40+
nodeManager: nodeManager,
41+
accountManager: accountManager,
42+
txQueue: queue.NewQueue(),
43+
addrLock: &AddrLocker{},
44+
notify: true,
45+
sendCompletionTimeout: DefaultTxSendCompletionTimeout * time.Second,
4446
}
4547
}
4648

@@ -75,34 +77,38 @@ func (m *Manager) QueueTransaction(tx *common.QueuedTx) error {
7577
to = tx.Args.To.Hex()
7678
}
7779
log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to)
78-
err := m.txQueue.Enqueue(tx)
80+
if err := m.txQueue.Enqueue(tx); err != nil {
81+
return err
82+
}
7983
if m.notify {
8084
NotifyOnEnqueue(tx)
8185
}
82-
return err
86+
return nil
8387
}
8488

8589
func (m *Manager) txDone(tx *common.QueuedTx, hash gethcommon.Hash, err error) {
8690
m.txQueue.Done(tx.ID, hash, err) //nolint: errcheck
8791
if m.notify {
88-
NotifyOnReturn(tx)
92+
NotifyOnReturn(tx, err)
8993
}
9094
}
9195

9296
// WaitForTransaction adds a transaction to the queue and blocks
9397
// until it's completed, discarded or times out.
94-
func (m *Manager) WaitForTransaction(tx *common.QueuedTx) error {
98+
func (m *Manager) WaitForTransaction(tx *common.QueuedTx) common.RawCompleteTransactionResult {
9599
log.Info("wait for transaction", "id", tx.ID)
96100
// now wait up until transaction is:
97101
// - completed (via CompleteQueuedTransaction),
98102
// - discarded (via DiscardQueuedTransaction)
99103
// - or times out
100-
select {
101-
case <-tx.Done:
102-
case <-time.After(DefaultTxSendCompletionTimeout * time.Second):
103-
m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut)
104+
for {
105+
select {
106+
case rst := <-tx.Result:
107+
return rst
108+
case <-time.After(m.sendCompletionTimeout):
109+
m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut)
110+
}
104111
}
105-
return tx.Err
106112
}
107113

108114
// CompleteTransaction instructs backend to complete sending of a given transaction.
@@ -242,7 +248,7 @@ func (m *Manager) DiscardTransaction(id common.QueuedTxID) error {
242248
}
243249
err = m.txQueue.Done(id, gethcommon.Hash{}, queue.ErrQueuedTxDiscarded)
244250
if m.notify {
245-
NotifyOnReturn(tx)
251+
NotifyOnReturn(tx, queue.ErrQueuedTxDiscarded)
246252
}
247253
return err
248254
}
@@ -267,19 +273,17 @@ func (m *Manager) DiscardTransactions(ids []common.QueuedTxID) map[common.Queued
267273
// It accepts one param which is a slice with a map of transaction params.
268274
func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) {
269275
log.Info("SendTransactionRPCHandler called")
270-
271276
// TODO(adam): it's a hack to parse arguments as common.RPCCall can do that.
272277
// We should refactor parsing these params to a separate struct.
273278
rpcCall := common.RPCCall{Params: args}
274279
tx := common.CreateTransaction(ctx, rpcCall.ToSendTxArgs())
275-
276280
if err := m.QueueTransaction(tx); err != nil {
277281
return nil, err
278282
}
279-
280-
if err := m.WaitForTransaction(tx); err != nil {
281-
return nil, err
283+
rst := m.WaitForTransaction(tx)
284+
if rst.Error != nil {
285+
return nil, rst.Error
282286
}
283-
284-
return tx.Hash.Hex(), nil
287+
// handle empty hash
288+
return rst.Hash.Hex(), nil
285289
}

0 commit comments

Comments
 (0)