From 71553ac94b7a99a1ec090bda1e156ffaea2b4ce4 Mon Sep 17 00:00:00 2001 From: jeff <113397187+cyberhorsey@users.noreply.github.com> Date: Tue, 5 Mar 2024 16:24:52 -0800 Subject: [PATCH] chore(relayer): document relayer processor in comments (#16333) Co-authored-by: David Co-authored-by: Roger <50648015+RogerLamTd@users.noreply.github.com> --- .../relayer/processor/can_process_message.go | 9 +++- packages/relayer/processor/config.go | 8 ++++ packages/relayer/processor/estimate_gas.go | 3 ++ .../relayer/processor/get_latest_nonce.go | 2 + packages/relayer/processor/is_profitable.go | 3 ++ packages/relayer/processor/process_message.go | 43 +++++++++++++++---- packages/relayer/processor/process_single.go | 2 + packages/relayer/processor/processor.go | 12 ++++++ .../processor/wait_for_confirmations.go | 2 + .../relayer/processor/wait_header_synced.go | 3 ++ 10 files changed, 77 insertions(+), 10 deletions(-) diff --git a/packages/relayer/processor/can_process_message.go b/packages/relayer/processor/can_process_message.go index 39298e8e36..2288beb3fa 100644 --- a/packages/relayer/processor/can_process_message.go +++ b/packages/relayer/processor/can_process_message.go @@ -8,6 +8,13 @@ import ( "github.com/taikoxyz/taiko-mono/packages/relayer" ) +// canProcessMessage determines whether a message is processable by the relayer. +// there are several conditions which it would not be processable, which include: +// - the event status is New, and the GasLimit is 0, which means only the user who +// sent the message can process it. +// - the event status is not New, which means it is either already processed and succeeded, +// or its processed, failed, and is in Retriable or Failed state, where the user +// should finish manually. func canProcessMessage( ctx context.Context, eventStatus relayer.EventStatus, @@ -28,7 +35,7 @@ func canProcessMessage( return true } - slog.Info("cant process message due to", "eventStatus", eventStatus.String()) + slog.Info("cant process message", "eventStatus", eventStatus.String()) return false } diff --git a/packages/relayer/processor/config.go b/packages/relayer/processor/config.go index a6bf3e2b3e..e21aadd66a 100644 --- a/packages/relayer/processor/config.go +++ b/packages/relayer/processor/config.go @@ -16,11 +16,19 @@ import ( "gorm.io/gorm/logger" ) +// hopConfig is a config struct that must be provided for an individual +// hop, when the processor is not configured to only process srcChain => destChain. +// for instance, when going from L2A to L2B, we have a hop of the shared "L1". +// the hopConfig in this case should be the L1 signalServiceAddress, taikoAddress, +// and rpcURL. If we have multiple hops, such as an L3 deployed on L2A to L2B, +// the hops would be L2A and L1, and multiple configs should be passed in. type hopConfig struct { signalServiceAddress common.Address taikoAddress common.Address rpcURL string } + +// Config is a struct used to initialize a processor. type Config struct { // address configs SrcSignalServiceAddress common.Address diff --git a/packages/relayer/processor/estimate_gas.go b/packages/relayer/processor/estimate_gas.go index 5743b44049..9239219f0c 100644 --- a/packages/relayer/processor/estimate_gas.go +++ b/packages/relayer/processor/estimate_gas.go @@ -14,6 +14,9 @@ var ( gasPaddingAmt uint64 = 80000 ) +// estimateGas estimates the gas for a ProcessMessage call. it will add a gasPaddingAmt +// in case, because the amount of exact gas is hard to predict due to proof verification +// on chain. func (p *Processor) estimateGas( ctx context.Context, message bridge.IBridgeMessage, proof []byte) (uint64, error) { auth, err := bind.NewKeyedTransactorWithChainID(p.ecdsaKey, new(big.Int).SetUint64(message.DestChainId)) diff --git a/packages/relayer/processor/get_latest_nonce.go b/packages/relayer/processor/get_latest_nonce.go index 6bf10c5715..161d4d382e 100644 --- a/packages/relayer/processor/get_latest_nonce.go +++ b/packages/relayer/processor/get_latest_nonce.go @@ -7,6 +7,8 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" ) +// getLatestNonce will return the latest nonce on chain if its higher than +// the one locally stored in the processor, then set it on the auth struct. func (p *Processor) getLatestNonce(ctx context.Context, auth *bind.TransactOpts) error { pendingNonce, err := p.destEthClient.PendingNonceAt(ctx, p.relayerAddr) if err != nil { diff --git a/packages/relayer/processor/is_profitable.go b/packages/relayer/processor/is_profitable.go index e12ec3a0c5..2b5eeda6e4 100644 --- a/packages/relayer/processor/is_profitable.go +++ b/packages/relayer/processor/is_profitable.go @@ -9,6 +9,9 @@ import ( "github.com/taikoxyz/taiko-mono/packages/relayer/bindings/bridge" ) +// isProfitable determines whether a message is profitable or not. It should +// check the processing fee, if one does not exist at all, it is definitely not +// profitable. Otherwise, we compare it to the estimated cost. func (p *Processor) isProfitable( ctx context.Context, message bridge.IBridgeMessage, cost *big.Int) (bool, error) { processingFee := message.Fee diff --git a/packages/relayer/processor/process_message.go b/packages/relayer/processor/process_message.go index 70851e2c99..24b00a861b 100644 --- a/packages/relayer/processor/process_message.go +++ b/packages/relayer/processor/process_message.go @@ -29,6 +29,8 @@ var ( errUnprocessable = errors.New("message is unprocessable") ) +// eventStatusFromMsgHash will check the event's msgHash/signal, and +// get it's on-chain current status. func (p *Processor) eventStatusFromMsgHash( ctx context.Context, gasLimit *big.Int, @@ -58,11 +60,9 @@ func (p *Processor) eventStatusFromMsgHash( return eventStatus, nil } -// processMessage prepares and calls `processMessage` on the bridge. -// the proof must be generated from the gethclient's eth_getProof via the Prover, -// then rlp-encoded and combined as a singular byte slice, -// then abi encoded into a SignalProof struct as the contract -// expects +// processMessage prepares and calls `processMessage` on the bridge, given a +// message from the queue (from the indexer). It will +// generate a proof, or multiple proofs if hops are needed. func (p *Processor) processMessage( ctx context.Context, msg queue.Message, @@ -89,6 +89,8 @@ func (p *Processor) processMessage( return errors.Wrap(err, "p.waitForConfirmations") } + // we need to check the invocation delays and proof receipt to see if + // this is currently processable, or we need to wait. invocationDelays, err := p.destBridge.GetInvocationDelays(nil) if err != nil { return errors.Wrap(err, "p.destBridge.invocationDelays") @@ -134,10 +136,12 @@ func (p *Processor) processMessage( return err } + // we need to check the receipt logs to see if we received MessageReceived + // or MessageExecuted, because we have a two-step bridge. for _, log := range receipt.Logs { topic := log.Topics[0] - // if we have a MessageReceived event, this was not processed, - // and we have to wait for the invocation delay. + // if we have a MessageReceived event, this was not processed, only + // the first step was. now we have to wait for the invocation delay. if topic == bridgeAbi.Events["MessageReceived"].ID { slog.Info("message processing resulted in MessageReceived event", "msgHash", common.BytesToHash(msgBody.Event.MsgHash[:]).Hex(), @@ -159,6 +163,9 @@ func (p *Processor) processMessage( return errors.Wrap(err, "p.sendProcessMessageAndWaitForReceipt") } } else if topic == bridgeAbi.Events["MessageExecuted"].ID { + // if we got MessageExecuted, the message is finished processing. this occurs + // either in one-step bridge processing (no invocation delay), or if this is the second process + // message call after the first step was completed. slog.Info("message processing resulted in MessageExecuted event. processing finished") } } @@ -181,7 +188,7 @@ func (p *Processor) processMessage( } // internal will only be set if it's an actual queue message, not a targeted - // transaction hash. + // transaction hash set via config flag. if msg.Internal != nil { // update message status if err := p.eventRepo.UpdateStatus(ctx, msgBody.ID, relayer.EventStatus(messageStatus)); err != nil { @@ -192,6 +199,9 @@ func (p *Processor) processMessage( return nil } +// sendProcessMessageAndWaitForReceipt uses a backoff retry message mechanism +// to send the onchain processMessage call on the bridge, then wait +// for the transaction receipt, and save the updated status to the database. func (p *Processor) sendProcessMessageAndWaitForReceipt( ctx context.Context, encodedSignalProof []byte, @@ -241,6 +251,8 @@ func (p *Processor) sendProcessMessageAndWaitForReceipt( return receipt, nil } +// waitForInvocationDelay will return when the invocation delay has been met, +// if one exists, or return immediately if not. func (p *Processor) waitForInvocationDelay( ctx context.Context, invocationDelays struct { @@ -351,6 +363,9 @@ func (p *Processor) generateEncodedSignalProof(ctx context.Context, return nil, errors.Wrap(err, "p.srcSignalService.GetSignalSlot") } + // if we have no hops, this is strictly a srcChain => destChain message. + // we can grab the latestBlockID, create a singular "hop" of srcChain => destChain, + // and generate a proof. if len(p.hops) == 0 { latestBlockID, err := p.eventRepo.LatestChainDataSyncedEvent( ctx, @@ -371,6 +386,8 @@ func (p *Processor) generateEncodedSignalProof(ctx context.Context, BlockNumber: latestBlockID, }) } else { + // otherwise, we should just create the first hop in the array, we will append + // the rest of the hops after. hops = append(hops, proof.HopParams{ ChainID: p.destChainId, SignalServiceAddress: p.srcSignalServiceAddress, @@ -466,6 +483,8 @@ func (p *Processor) generateEncodedSignalProof(ctx context.Context, return encodedSignalProof, nil } +// sendProcessMessageCall calls `bridge.processMessage` with latest nonce +// after estimating gas, and checking profitability. func (p *Processor) sendProcessMessageCall( ctx context.Context, event *bridge.BridgeMessageSent, @@ -546,7 +565,9 @@ func (p *Processor) sendProcessMessageCall( return tx, nil } -// node is unable to estimate gas correctly for contract deployments, we need to check if the token +// needsContractDeployment is needed because +// node is unable to estimate gas correctly for contract deployments, +// so we need to check if the token // is deployed, and always hardcode in this case. we need to check this before calling // estimategas, as the node will soemtimes return a gas estimate for a contract deployment, however, // it is incorrect and the tx will revert. @@ -665,10 +686,13 @@ func (p *Processor) hardcodeGasLimit( return nil } +// setLatestNonce sets the latest nonce used for the relayer key func (p *Processor) setLatestNonce(nonce uint64) { p.destNonce = nonce } +// saveMessageStatusChangedEvent writes the MessageStatusChanged event to the +// database after a message is processed func (p *Processor) saveMessageStatusChangedEvent( ctx context.Context, receipt *types.Receipt, @@ -714,6 +738,7 @@ func (p *Processor) saveMessageStatusChangedEvent( return nil } +// getCost determines the fee of a processMessage call func (p *Processor) getCost(ctx context.Context, auth *bind.TransactOpts) (*big.Int, error) { if auth.GasTipCap != nil { blk, err := p.destEthClient.BlockByNumber(ctx, nil) diff --git a/packages/relayer/processor/process_single.go b/packages/relayer/processor/process_single.go index db9f7996ad..ea48304d45 100644 --- a/packages/relayer/processor/process_single.go +++ b/packages/relayer/processor/process_single.go @@ -13,6 +13,8 @@ import ( "github.com/taikoxyz/taiko-mono/packages/relayer/pkg/queue" ) +// processSingle is used to process a single message, when we are +// targeting a specific message via config flag func (p *Processor) processSingle(ctx context.Context) error { slog.Info("processing single", "txHash", common.Hash(*p.targetTxHash).Hex()) diff --git a/packages/relayer/processor/processor.go b/packages/relayer/processor/processor.go index 804a99534c..c8710f67a1 100644 --- a/packages/relayer/processor/processor.go +++ b/packages/relayer/processor/processor.go @@ -38,6 +38,8 @@ type DB interface { GormDB() *gorm.DB } +// ethClient is a slimmed down interface of a go-ethereum ethclient.Client +// we can use for mocking and testing type ethClient interface { PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) @@ -50,6 +52,9 @@ type ethClient interface { ChainID(ctx context.Context) (*big.Int, error) } +// hop is a struct which needs to be created based on the config parameters +// for a hop. Each hop is an intermediary hop - if we are just processing +// srcChain to destChain, we should have no hops. type hop struct { chainID *big.Int signalServiceAddress common.Address @@ -60,6 +65,8 @@ type hop struct { blockNum uint64 } +// Processor is the main struct which handles message processing and queue +// instantiation type Processor struct { cancel context.CancelFunc @@ -115,6 +122,7 @@ type Processor struct { cfg *Config } +// InitFromCli creates a new processor from a cli context func (p *Processor) InitFromCli(ctx context.Context, c *cli.Context) error { cfg, err := NewConfigFromCliContext(c) if err != nil { @@ -155,6 +163,8 @@ func InitFromConfig(ctx context.Context, p *Processor, cfg *Config) error { hops := []hop{} + // iteraate over all the hop configs and create a hop struct + // which can be used to generate hop proofs for _, hopConfig := range cfg.hopConfigs { var hopEthClient *ethclient.Client @@ -382,6 +392,8 @@ func (p *Processor) queueName() string { return fmt.Sprintf("%v-%v-%v-queue", p.srcChainId.String(), p.destChainId.String(), relayer.EventNameMessageSent) } +// eventLoop is the main event loop of a Processor which should read +// messages from a queue and then process them. func (p *Processor) eventLoop(ctx context.Context) { defer func() { p.wg.Done() diff --git a/packages/relayer/processor/wait_for_confirmations.go b/packages/relayer/processor/wait_for_confirmations.go index beca79400c..1c86e8ea17 100644 --- a/packages/relayer/processor/wait_for_confirmations.go +++ b/packages/relayer/processor/wait_for_confirmations.go @@ -9,6 +9,8 @@ import ( "github.com/taikoxyz/taiko-mono/packages/relayer" ) +// waitForConfirmations waits for the given transaction to reach N confs +// before returning func (p *Processor) waitForConfirmations(ctx context.Context, txHash common.Hash, blockNumber uint64) error { ctx, cancelFunc := context.WithTimeout(ctx, time.Duration(p.confTimeoutInSeconds)*time.Second) diff --git a/packages/relayer/processor/wait_header_synced.go b/packages/relayer/processor/wait_header_synced.go index 14b51e6679..252e7a49f9 100644 --- a/packages/relayer/processor/wait_header_synced.go +++ b/packages/relayer/processor/wait_header_synced.go @@ -9,6 +9,9 @@ import ( "github.com/taikoxyz/taiko-mono/packages/relayer" ) +// waitHeaderSynced waits for a event to appear in the database from the indexer +// for the type "ChainDataSynced" to be greater or less than the given blockNum. +// this is used to make sure a valid proof can be generated and verified on chain. func (p *Processor) waitHeaderSynced( ctx context.Context, ethClient ethClient,