Skip to content

Commit

Permalink
(feat) Refactored the TX broadcasting logic. Added a new function to …
Browse files Browse the repository at this point in the history
…allow using the Sync broadcast without passing though the SDK mechanism to ensure the TX is included in one of the next blocks (CHAIN-277)
  • Loading branch information
aarmoa committed Jan 15, 2025
1 parent 48e6836 commit c16c302
Show file tree
Hide file tree
Showing 12 changed files with 200 additions and 164 deletions.
206 changes: 86 additions & 120 deletions client/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type ChainClient interface {
SimulateMsg(clientCtx client.Context, msgs ...sdk.Msg) (*txtypes.SimulateResponse, error)
AsyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error)
SyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error)
BroadcastMsg(broadcastMode txtypes.BroadcastMode, msgs ...sdk.Msg) (*txtypes.BroadcastTxRequest, *txtypes.BroadcastTxResponse, error)

// Build signed tx with given accNum and accSeq, useful for offline siging
// If simulate is set to false, initialGas will be used
Expand Down Expand Up @@ -681,35 +682,6 @@ func (c *chainClient) GetAccount(ctx context.Context, address string) (*authtype
return res, err
}

// SyncBroadcastMsg sends Tx to chain and waits until Tx is included in block.
func (c *chainClient) SyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
c.syncMux.Lock()
defer c.syncMux.Unlock()

sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
res, err := c.broadcastTx(c.ctx, c.txFactory, true, msgs...)

if err != nil {
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", sequence)
res, err = c.broadcastTx(c.ctx, c.txFactory, true, msgs...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed synchronously broadcast messages:", string(resJSON))
return nil, err
}
}

return res, nil
}

func (c *chainClient) GetFeeDiscountInfo(ctx context.Context, account string) (*exchangetypes.QueryFeeDiscountAccountInfoResponse, error) {
req := &exchangetypes.QueryFeeDiscountAccountInfoRequest{
Account: account,
Expand Down Expand Up @@ -746,36 +718,6 @@ func (c *chainClient) SimulateMsg(clientCtx client.Context, msgs ...sdk.Msg) (*t
return simRes, nil
}

// AsyncBroadcastMsg sends Tx to chain and doesn't wait until Tx is included in block. This method
// cannot be used for rapid Tx sending, it is expected that you wait for transaction status with
// external tools. If you want sdk to wait for it, use SyncBroadcastMsg.
func (c *chainClient) AsyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
c.syncMux.Lock()
defer c.syncMux.Unlock()

sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
res, err := c.broadcastTx(c.ctx, c.txFactory, false, msgs...)
if err != nil {
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", sequence)
res, err = c.broadcastTx(c.ctx, c.txFactory, false, msgs...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed to asynchronously broadcast messagess:", string(resJSON))
return nil, err
}
}

return res, nil
}

func (c *chainClient) BuildSignedTx(clientCtx client.Context, accNum, accSeq, initialGas uint64, msgs ...sdk.Msg) ([]byte, error) {
txf := NewTxFactory(clientCtx).WithSequence(accSeq).WithAccountNumber(accNum).WithGas(initialGas)
return c.buildSignedTx(clientCtx, txf, msgs...)
Expand Down Expand Up @@ -890,57 +832,23 @@ func (c *chainClient) AsyncBroadcastSignedTx(txBytes []byte) (*txtypes.Broadcast
func (c *chainClient) broadcastTx(
clientCtx client.Context,
txf tx.Factory,
await bool,
broadcastMode txtypes.BroadcastMode,
msgs ...sdk.Msg,
) (*txtypes.BroadcastTxResponse, error) {
) (*txtypes.BroadcastTxRequest, *txtypes.BroadcastTxResponse, error) {
txBytes, err := c.buildSignedTx(clientCtx, txf, msgs...)
if err != nil {
err = errors.Wrap(err, "failed to build signed Tx")
return nil, err
return nil, nil, err
}

req := txtypes.BroadcastTxRequest{
TxBytes: txBytes,
Mode: txtypes.BroadcastMode_BROADCAST_MODE_SYNC,
Mode: broadcastMode,
}

res, err := common.ExecuteCall(context.Background(), c.network.ChainCookieAssistant, c.txClient.BroadcastTx, &req)
if err != nil || res.TxResponse.Code != 0 || !await {
return res, err
}

awaitCtx, cancelFn := context.WithTimeout(context.Background(), defaultBroadcastTimeout)
defer cancelFn()

txHash, _ := hex.DecodeString(res.TxResponse.TxHash)
t := time.NewTimer(defaultBroadcastStatusPoll)

for {
select {
case <-awaitCtx.Done():
err := errors.Wrapf(ErrTimedOut, "%s", res.TxResponse.TxHash)
t.Stop()
return nil, err
case <-t.C:
resultTx, err := clientCtx.Client.Tx(awaitCtx, txHash, false)
if err != nil {
if errRes := client.CheckCometError(err, txBytes); errRes != nil {
return &txtypes.BroadcastTxResponse{TxResponse: errRes}, err
}

t.Reset(defaultBroadcastStatusPoll)
continue

} else if resultTx.Height > 0 {
resResultTx := sdk.NewResponseResultTx(resultTx, res.TxResponse.Tx, res.TxResponse.Timestamp)
res = &txtypes.BroadcastTxResponse{TxResponse: resResultTx}
t.Stop()
return res, err
}
return &req, res, err

t.Reset(defaultBroadcastStatusPoll)
}
}
}

// QueueBroadcastMsg enqueues a list of messages. Messages will added to the queue
Expand Down Expand Up @@ -970,28 +878,7 @@ func (c *chainClient) runBatchBroadcast() {
msgBatch := make([]sdk.Msg, 0, msgCommitBatchSizeLimit)

submitBatch := func(toSubmit []sdk.Msg) {
c.syncMux.Lock()
defer c.syncMux.Unlock()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("broadcastTx with nonce", sequence)
res, err := c.broadcastTx(c.ctx, c.txFactory, true, toSubmit...)
if err != nil {
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", sequence)
res, err = c.broadcastTx(c.ctx, c.txFactory, true, toSubmit...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(toSubmit)).WithError(err).Errorln("failed to broadcast messages batch:", string(resJSON))
return
}
}
res, err := c.SyncBroadcastMsg(toSubmit...)

if res.TxResponse.Code != 0 {
err = errors.Errorf("error %d (%s): %s", res.TxResponse.Code, res.TxResponse.Codespace, res.TxResponse.RawLog)
Expand Down Expand Up @@ -2651,3 +2538,82 @@ func (c *chainClient) FetchVouchersForAddress(ctx context.Context, address strin
func (c *chainClient) GetNetwork() common.Network {
return c.network
}

// SyncBroadcastMsg sends Tx to chain and waits until Tx is included in block.
func (c *chainClient) SyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
req, res, err := c.BroadcastMsg(txtypes.BroadcastMode_BROADCAST_MODE_SYNC, msgs...)

if err != nil || res.TxResponse.Code != 0 {
return res, err
}

awaitCtx, cancelFn := context.WithTimeout(context.Background(), defaultBroadcastTimeout)
defer cancelFn()

txHash, _ := hex.DecodeString(res.TxResponse.TxHash)
t := time.NewTimer(defaultBroadcastStatusPoll)

for {
select {
case <-awaitCtx.Done():
err := errors.Wrapf(ErrTimedOut, "%s", res.TxResponse.TxHash)
t.Stop()
return nil, err
case <-t.C:
resultTx, err := c.ctx.Client.Tx(awaitCtx, txHash, false)
if err != nil {
if errRes := client.CheckCometError(err, req.TxBytes); errRes != nil {
return &txtypes.BroadcastTxResponse{TxResponse: errRes}, err
}

t.Reset(defaultBroadcastStatusPoll)
continue

} else if resultTx.Height > 0 {
resResultTx := sdk.NewResponseResultTx(resultTx, res.TxResponse.Tx, res.TxResponse.Timestamp)
res = &txtypes.BroadcastTxResponse{TxResponse: resResultTx}
t.Stop()
return res, err
}

t.Reset(defaultBroadcastStatusPoll)
}
}
}

// AsyncBroadcastMsg sends Tx to chain and doesn't wait until Tx is included in block. This method
// cannot be used for rapid Tx sending, it is expected that you wait for transaction status with
// external tools. If you want sdk to wait for it, use SyncBroadcastMsg.
func (c *chainClient) AsyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
_, res, err := c.BroadcastMsg(txtypes.BroadcastMode_BROADCAST_MODE_ASYNC, msgs...)
return res, err
}

// BroadcastMsg submits a group of messages in one transaction to the chain
// The function uses the broadcast mode specified with the broadcastMode parameter
func (c *chainClient) BroadcastMsg(broadcastMode txtypes.BroadcastMode, msgs ...sdk.Msg) (*txtypes.BroadcastTxRequest, *txtypes.BroadcastTxResponse, error) {
c.syncMux.Lock()
defer c.syncMux.Unlock()

sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
req, res, err := c.broadcastTx(c.ctx, c.txFactory, broadcastMode, msgs...)
if err != nil {
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", sequence)
req, res, err = c.broadcastTx(c.ctx, c.txFactory, broadcastMode, msgs...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed to asynchronously broadcast messagess:", string(resJSON))
return nil, nil, err
}
}

return req, res, nil
}
Loading

0 comments on commit c16c302

Please sign in to comment.