-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathtransactionSubmission.go
393 lines (362 loc) · 13 KB
/
transactionSubmission.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
package aptos
import (
"fmt"
"sync"
"sync/atomic"
"github.com/aptos-labs/aptos-go-sdk/api"
)
// TransactionSubmissionType is the counter for an enum
type TransactionSubmissionType uint8
const (
// TransactionSubmissionTypeSingle represents a single signer transaction, no multi-agent and no-fee payer
TransactionSubmissionTypeSingle TransactionSubmissionType = iota
// TransactionSubmissionTypeMultiAgent represents a multi-agent or fee payer transaction
TransactionSubmissionTypeMultiAgent TransactionSubmissionType = iota
)
type TransactionBuildPayload struct {
Id uint64
Type TransactionSubmissionType
Inner TransactionPayload // The actual transaction payload
Options []any // This is a placeholder to allow future optional arguments
}
type TransactionBuildResponse struct {
Id uint64
Response RawTransactionImpl
Err error
}
type TransactionSubmissionRequest struct {
Id uint64
SignedTxn *SignedTransaction
}
type TransactionSubmissionResponse struct {
Id uint64
Response *api.SubmitTransactionResponse
Err error
}
type SequenceNumberTracker struct {
SequenceNumber atomic.Uint64
}
func (snt *SequenceNumberTracker) Increment() uint64 {
for {
seqNumber := snt.SequenceNumber.Load()
next := seqNumber + 1
ok := snt.SequenceNumber.CompareAndSwap(seqNumber, next)
if ok {
return seqNumber
}
}
}
func (snt *SequenceNumberTracker) Update(next uint64) uint64 {
return snt.SequenceNumber.Swap(next)
}
// BuildTransactions start a goroutine to process [TransactionPayload] and spit out [RawTransactionImpl].
func (client *Client) BuildTransactions(sender AccountAddress, payloads chan TransactionBuildPayload, responses chan TransactionBuildResponse, setSequenceNumber chan uint64, options ...any) (*RawTransaction, error) {
return client.BuildTransactions(sender, payloads, responses, setSequenceNumber, options...)
}
// BuildTransactions start a goroutine to process [TransactionPayload] and spit out [RawTransactionImpl].
func (rc *NodeClient) BuildTransactions(sender AccountAddress, payloads chan TransactionBuildPayload, responses chan TransactionBuildResponse, setSequenceNumber chan uint64, options ...any) {
// Initialize state
account, err := rc.Account(sender)
if err != nil {
responses <- TransactionBuildResponse{Err: err}
close(responses)
return
}
sequenceNumber, err := account.SequenceNumber()
if err != nil {
responses <- TransactionBuildResponse{Err: err}
close(responses)
return
}
snt := &SequenceNumberTracker{}
snt.SequenceNumber.Store(sequenceNumber)
optionsLast := len(options)
options = append(options, SequenceNumber(0))
for {
select {
case payload, ok := <-payloads:
// End if it's not closed
if !ok {
close(responses)
return
}
switch payload.Type {
case TransactionSubmissionTypeSingle:
curSequenceNumber := snt.Increment()
options[optionsLast] = SequenceNumber(curSequenceNumber)
txnResponse, err := rc.BuildTransaction(sender, payload.Inner, options...)
if err != nil {
responses <- TransactionBuildResponse{Err: err}
} else {
responses <- TransactionBuildResponse{Response: txnResponse}
}
case TransactionSubmissionTypeMultiAgent:
curSequenceNumber := snt.Increment()
options[optionsLast] = SequenceNumber(curSequenceNumber)
txnResponse, err := rc.BuildTransactionMultiAgent(sender, payload.Inner, options...)
if err != nil {
responses <- TransactionBuildResponse{Err: err}
} else {
responses <- TransactionBuildResponse{Response: txnResponse}
}
default:
// Skip the payload
}
case newSequenceNumber := <-setSequenceNumber:
// This can be used to update the sequence number at anytime
snt.Update(newSequenceNumber)
// TODO: We should periodically handle reconciliation of the sequence numbers, but this needs to know submission as well
}
}
}
// SubmitTransactions consumes signed transactions, submits to aptos-node, yields responses.
// closes output chan `responses` when input chan `signedTxns` is closed.
func (client *Client) SubmitTransactions(requests chan TransactionSubmissionRequest, responses chan TransactionSubmissionResponse) {
client.nodeClient.SubmitTransactions(requests, responses)
}
// SubmitTransactions consumes signed transactions, submits to aptos-node, yields responses.
// closes output chan `responses` when input chan `signedTxns` is closed.
func (rc *NodeClient) SubmitTransactions(requests chan TransactionSubmissionRequest, responses chan TransactionSubmissionResponse) {
defer close(responses)
for request := range requests {
response, err := rc.SubmitTransaction(request.SignedTxn)
if err != nil {
responses <- TransactionSubmissionResponse{Id: request.Id, Err: err}
} else {
responses <- TransactionSubmissionResponse{Id: request.Id, Response: response}
}
}
}
// BatchSubmitTransactions consumes signed transactions, submits to aptos-node, yields responses.
// closes output chan `responses` when input chan `signedTxns` is closed.
func (rc *NodeClient) BatchSubmitTransactions(requests chan TransactionSubmissionRequest, responses chan TransactionSubmissionResponse) {
defer close(responses)
inputs := make([]*SignedTransaction, 20)
ids := make([]uint64, 20)
i := uint32(0)
for request := range requests {
// Collect 20 inputs before submitting
// TODO: Handle a timeout or something associated for it
inputs[i] = request.SignedTxn
ids[i] = request.Id
if i >= 19 {
i = 0
response, err := rc.BatchSubmitTransaction(inputs)
// Process the responses
if err != nil {
// Error, means all failed
for j := uint32(0); j < i; j++ {
responses <- TransactionSubmissionResponse{Id: ids[j], Err: err}
}
} else {
// Partial failure, means we need to send errors for those that failed
// and responses for those that succeeded
for j := uint32(0); j < i; j++ {
failed := -1
for k := 0; k < len(response.TransactionFailures); k++ {
if response.TransactionFailures[k].TransactionIndex == j {
failed = k
break
}
}
if failed >= 0 {
responses <- TransactionSubmissionResponse{Id: ids[j], Response: nil}
} else {
responses <- TransactionSubmissionResponse{Id: ids[j], Err: fmt.Errorf("transaction failed: %s", response.TransactionFailures[failed].Error.Message)}
}
}
}
}
i++
}
}
// BuildSignAndSubmitTransactions starts up a goroutine to process transactions for a single [TransactionSender]
// Closes output chan `responses` on completion of input chan `payloads`.
func (client *Client) BuildSignAndSubmitTransactions(
sender TransactionSigner,
payloads chan TransactionBuildPayload,
responses chan TransactionSubmissionResponse,
buildOptions ...any,
) {
client.nodeClient.BuildSignAndSubmitTransactions(sender, payloads, responses, buildOptions...)
}
// BuildSignAndSubmitTransactions starts up a goroutine to process transactions for a single [TransactionSender]
// Closes output chan `responses` on completion of input chan `payloads`.
func (rc *NodeClient) BuildSignAndSubmitTransactions(
sender TransactionSigner,
payloads chan TransactionBuildPayload,
responses chan TransactionSubmissionResponse,
buildOptions ...any,
) {
singleSigner := func(rawTxn RawTransactionImpl) (*SignedTransaction, error) {
switch rawTxn := rawTxn.(type) {
case *RawTransaction:
return rawTxn.SignedTransaction(sender)
case *RawTransactionWithData:
switch rawTxn.Variant {
case MultiAgentRawTransactionWithDataVariant:
return nil, fmt.Errorf("multi agent not supported, please provide a signer function")
case MultiAgentWithFeePayerRawTransactionWithDataVariant:
return nil, fmt.Errorf("fee payer not supported, please provide a signer function")
default:
return nil, fmt.Errorf("unsupported rawTransactionWithData type")
}
default:
return nil, fmt.Errorf("unsupported rawTransactionImpl type")
}
}
rc.BuildSignAndSubmitTransactionsWithSignFunction(
sender.AccountAddress(),
payloads,
responses,
singleSigner,
buildOptions...,
)
}
// BuildSignAndSubmitTransactionsWithSignFunction allows for signing with a custom function
//
// Closes output chan `responses` on completion of input chan `payloads`.
//
// This enables the ability to do fee payer, and other approaches while staying concurrent
//
// func Example() {
// client := NewNodeClient()
//
// sender := NewEd25519Account()
// feePayer := NewEd25519Account()
//
// payloads := make(chan TransactionBuildPayload)
// responses := make(chan TransactionSubmissionResponse)
//
// signingFunc := func(rawTxn RawTransactionImpl) (*SignedTransaction, error) {
// switch rawTxn.(type) {
// case *RawTransaction:
// return nil, fmt.Errorf("only fee payer supported")
// case *RawTransactionWithData:
// rawTxnWithData := rawTxn.(*RawTransactionWithData)
// switch rawTxnWithData.Variant {
// case MultiAgentRawTransactionWithDataVariant:
// return nil, fmt.Errorf("multi agent not supported, please provide a fee payer function")
// case MultiAgentWithFeePayerRawTransactionWithDataVariant:
// rawTxnWithData.Sign(sender)
// txn, ok := rawTxnWithData.ToFeePayerTransaction()
// default:
// return nil, fmt.Errorf("unsupported rawTransactionWithData type")
// }
// default:
// return nil, fmt.Errorf("unsupported rawTransactionImpl type")
// }
// }
//
// // startup worker
// go client.BuildSignAndSubmitTransactionsWithSignFunction(
// sender,
// payloads,
// responses,
// signingFunc
// )
//
// // Here add payloads, and wiating on resposnes
//
// }
func (client *Client) BuildSignAndSubmitTransactionsWithSignFunction(
sender AccountAddress,
payloads chan TransactionBuildPayload,
responses chan TransactionSubmissionResponse,
sign func(rawTxn RawTransactionImpl) (*SignedTransaction, error),
buildOptions ...any,
) {
client.nodeClient.BuildSignAndSubmitTransactionsWithSignFunction(
sender,
payloads,
responses,
sign,
buildOptions...,
)
}
// BuildSignAndSubmitTransactionsWithSignFunction allows for signing with a custom function
//
// Closes output chan `responses` on completion of input chan `payloads`.
//
// This enables the ability to do fee payer, and other approaches while staying concurrent
//
// func Example() {
// client := NewNodeClient()
//
// sender := NewEd25519Account()
// feePayer := NewEd25519Account()
//
// payloads := make(chan TransactionBuildPayload)
// responses := make(chan TransactionSubmissionResponse)
//
// signingFunc := func(rawTxn RawTransactionImpl) (*SignedTransaction, error) {
// switch rawTxn.(type) {
// case *RawTransaction:
// return nil, fmt.Errorf("only fee payer supported")
// case *RawTransactionWithData:
// rawTxnWithData := rawTxn.(*RawTransactionWithData)
// switch rawTxnWithData.Variant {
// case MultiAgentRawTransactionWithDataVariant:
// return nil, fmt.Errorf("multi agent not supported, please provide a fee payer function")
// case MultiAgentWithFeePayerRawTransactionWithDataVariant:
// rawTxnWithData.Sign(sender)
// txn, ok := rawTxnWithData.ToFeePayerTransaction()
// default:
// return nil, fmt.Errorf("unsupported rawTransactionWithData type")
// }
// default:
// return nil, fmt.Errorf("unsupported rawTransactionImpl type")
// }
// }
//
// // startup worker
// go client.BuildSignAndSubmitTransactionsWithSignFunction(
// sender,
// payloads,
// responses,
// signingFunc
// )
//
// // Here add payloads, and wiating on resposnes
//
// }
func (rc *NodeClient) BuildSignAndSubmitTransactionsWithSignFunction(
sender AccountAddress,
payloads chan TransactionBuildPayload,
responses chan TransactionSubmissionResponse,
sign func(rawTxn RawTransactionImpl) (*SignedTransaction, error),
buildOptions ...any,
) {
// TODO: Make internal buffer size configurable with an optional parameter
// Set up the channel handling building transactions
buildResponses := make(chan TransactionBuildResponse, 20)
setSequenceNumber := make(chan uint64)
go rc.BuildTransactions(sender, payloads, buildResponses, setSequenceNumber, buildOptions...)
submissionRequests := make(chan TransactionSubmissionRequest, 20)
// Note that, I change this to BatchSubmitTransactions, and it caused no change in performance. The non-batched
// version is more flexible and gives actual responses. It is may be that with large payloads that batch more performant.
go rc.SubmitTransactions(submissionRequests, responses)
var wg sync.WaitGroup
for buildResponse := range buildResponses {
if buildResponse.Err != nil {
responses <- TransactionSubmissionResponse{Id: buildResponse.Id, Err: buildResponse.Err}
} else {
// TODO: replace this with a fixed number (configurable) of sign() workers
wg.Add(1)
go func() {
defer wg.Done()
signedTxn, err := sign(buildResponse.Response)
if err != nil {
responses <- TransactionSubmissionResponse{Id: buildResponse.Id, Err: err}
} else {
submissionRequests <- TransactionSubmissionRequest{
Id: buildResponse.Id,
SignedTxn: signedTxn,
}
}
}()
}
}
wg.Wait()
close(submissionRequests)
}