Skip to content

Commit

Permalink
chore(relayer): document relayer processor in comments (#16333)
Browse files Browse the repository at this point in the history
Co-authored-by: David <david@taiko.xyz>
Co-authored-by: Roger <50648015+RogerLamTd@users.noreply.github.com>
  • Loading branch information
3 people committed Mar 6, 2024
1 parent 8987156 commit 71553ac
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 10 deletions.
9 changes: 8 additions & 1 deletion packages/relayer/processor/can_process_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ import (
"github.com/taikoxyz/taiko-mono/packages/relayer"
)

// canProcessMessage determines whether a message is processable by the relayer.
// there are several conditions which it would not be processable, which include:
// - the event status is New, and the GasLimit is 0, which means only the user who
// sent the message can process it.
// - the event status is not New, which means it is either already processed and succeeded,
// or its processed, failed, and is in Retriable or Failed state, where the user
// should finish manually.
func canProcessMessage(
ctx context.Context,
eventStatus relayer.EventStatus,
Expand All @@ -28,7 +35,7 @@ func canProcessMessage(
return true
}

slog.Info("cant process message due to", "eventStatus", eventStatus.String())
slog.Info("cant process message", "eventStatus", eventStatus.String())

return false
}
8 changes: 8 additions & 0 deletions packages/relayer/processor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,19 @@ import (
"gorm.io/gorm/logger"
)

// hopConfig is a config struct that must be provided for an individual
// hop, when the processor is not configured to only process srcChain => destChain.
// for instance, when going from L2A to L2B, we have a hop of the shared "L1".
// the hopConfig in this case should be the L1 signalServiceAddress, taikoAddress,
// and rpcURL. If we have multiple hops, such as an L3 deployed on L2A to L2B,
// the hops would be L2A and L1, and multiple configs should be passed in.
type hopConfig struct {
signalServiceAddress common.Address
taikoAddress common.Address
rpcURL string
}

// Config is a struct used to initialize a processor.
type Config struct {
// address configs
SrcSignalServiceAddress common.Address
Expand Down
3 changes: 3 additions & 0 deletions packages/relayer/processor/estimate_gas.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ var (
gasPaddingAmt uint64 = 80000
)

// estimateGas estimates the gas for a ProcessMessage call. it will add a gasPaddingAmt
// in case, because the amount of exact gas is hard to predict due to proof verification
// on chain.
func (p *Processor) estimateGas(
ctx context.Context, message bridge.IBridgeMessage, proof []byte) (uint64, error) {
auth, err := bind.NewKeyedTransactorWithChainID(p.ecdsaKey, new(big.Int).SetUint64(message.DestChainId))
Expand Down
2 changes: 2 additions & 0 deletions packages/relayer/processor/get_latest_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
)

// getLatestNonce will return the latest nonce on chain if its higher than
// the one locally stored in the processor, then set it on the auth struct.
func (p *Processor) getLatestNonce(ctx context.Context, auth *bind.TransactOpts) error {
pendingNonce, err := p.destEthClient.PendingNonceAt(ctx, p.relayerAddr)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions packages/relayer/processor/is_profitable.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"github.com/taikoxyz/taiko-mono/packages/relayer/bindings/bridge"
)

// isProfitable determines whether a message is profitable or not. It should
// check the processing fee, if one does not exist at all, it is definitely not
// profitable. Otherwise, we compare it to the estimated cost.
func (p *Processor) isProfitable(
ctx context.Context, message bridge.IBridgeMessage, cost *big.Int) (bool, error) {
processingFee := message.Fee
Expand Down
43 changes: 34 additions & 9 deletions packages/relayer/processor/process_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ var (
errUnprocessable = errors.New("message is unprocessable")
)

// eventStatusFromMsgHash will check the event's msgHash/signal, and
// get it's on-chain current status.
func (p *Processor) eventStatusFromMsgHash(
ctx context.Context,
gasLimit *big.Int,
Expand Down Expand Up @@ -58,11 +60,9 @@ func (p *Processor) eventStatusFromMsgHash(
return eventStatus, nil
}

// processMessage prepares and calls `processMessage` on the bridge.
// the proof must be generated from the gethclient's eth_getProof via the Prover,
// then rlp-encoded and combined as a singular byte slice,
// then abi encoded into a SignalProof struct as the contract
// expects
// processMessage prepares and calls `processMessage` on the bridge, given a
// message from the queue (from the indexer). It will
// generate a proof, or multiple proofs if hops are needed.
func (p *Processor) processMessage(
ctx context.Context,
msg queue.Message,
Expand All @@ -89,6 +89,8 @@ func (p *Processor) processMessage(
return errors.Wrap(err, "p.waitForConfirmations")
}

// we need to check the invocation delays and proof receipt to see if
// this is currently processable, or we need to wait.
invocationDelays, err := p.destBridge.GetInvocationDelays(nil)
if err != nil {
return errors.Wrap(err, "p.destBridge.invocationDelays")
Expand Down Expand Up @@ -134,10 +136,12 @@ func (p *Processor) processMessage(
return err
}

// we need to check the receipt logs to see if we received MessageReceived
// or MessageExecuted, because we have a two-step bridge.
for _, log := range receipt.Logs {
topic := log.Topics[0]
// if we have a MessageReceived event, this was not processed,
// and we have to wait for the invocation delay.
// if we have a MessageReceived event, this was not processed, only
// the first step was. now we have to wait for the invocation delay.
if topic == bridgeAbi.Events["MessageReceived"].ID {
slog.Info("message processing resulted in MessageReceived event",
"msgHash", common.BytesToHash(msgBody.Event.MsgHash[:]).Hex(),
Expand All @@ -159,6 +163,9 @@ func (p *Processor) processMessage(
return errors.Wrap(err, "p.sendProcessMessageAndWaitForReceipt")
}
} else if topic == bridgeAbi.Events["MessageExecuted"].ID {
// if we got MessageExecuted, the message is finished processing. this occurs
// either in one-step bridge processing (no invocation delay), or if this is the second process
// message call after the first step was completed.
slog.Info("message processing resulted in MessageExecuted event. processing finished")
}
}
Expand All @@ -181,7 +188,7 @@ func (p *Processor) processMessage(
}

// internal will only be set if it's an actual queue message, not a targeted
// transaction hash.
// transaction hash set via config flag.
if msg.Internal != nil {
// update message status
if err := p.eventRepo.UpdateStatus(ctx, msgBody.ID, relayer.EventStatus(messageStatus)); err != nil {
Expand All @@ -192,6 +199,9 @@ func (p *Processor) processMessage(
return nil
}

// sendProcessMessageAndWaitForReceipt uses a backoff retry message mechanism
// to send the onchain processMessage call on the bridge, then wait
// for the transaction receipt, and save the updated status to the database.
func (p *Processor) sendProcessMessageAndWaitForReceipt(
ctx context.Context,
encodedSignalProof []byte,
Expand Down Expand Up @@ -241,6 +251,8 @@ func (p *Processor) sendProcessMessageAndWaitForReceipt(
return receipt, nil
}

// waitForInvocationDelay will return when the invocation delay has been met,
// if one exists, or return immediately if not.
func (p *Processor) waitForInvocationDelay(
ctx context.Context,
invocationDelays struct {
Expand Down Expand Up @@ -351,6 +363,9 @@ func (p *Processor) generateEncodedSignalProof(ctx context.Context,
return nil, errors.Wrap(err, "p.srcSignalService.GetSignalSlot")
}

// if we have no hops, this is strictly a srcChain => destChain message.
// we can grab the latestBlockID, create a singular "hop" of srcChain => destChain,
// and generate a proof.
if len(p.hops) == 0 {
latestBlockID, err := p.eventRepo.LatestChainDataSyncedEvent(
ctx,
Expand All @@ -371,6 +386,8 @@ func (p *Processor) generateEncodedSignalProof(ctx context.Context,
BlockNumber: latestBlockID,
})
} else {
// otherwise, we should just create the first hop in the array, we will append
// the rest of the hops after.
hops = append(hops, proof.HopParams{
ChainID: p.destChainId,
SignalServiceAddress: p.srcSignalServiceAddress,
Expand Down Expand Up @@ -466,6 +483,8 @@ func (p *Processor) generateEncodedSignalProof(ctx context.Context,
return encodedSignalProof, nil
}

// sendProcessMessageCall calls `bridge.processMessage` with latest nonce
// after estimating gas, and checking profitability.
func (p *Processor) sendProcessMessageCall(
ctx context.Context,
event *bridge.BridgeMessageSent,
Expand Down Expand Up @@ -546,7 +565,9 @@ func (p *Processor) sendProcessMessageCall(
return tx, nil
}

// node is unable to estimate gas correctly for contract deployments, we need to check if the token
// needsContractDeployment is needed because
// node is unable to estimate gas correctly for contract deployments,
// so we need to check if the token
// is deployed, and always hardcode in this case. we need to check this before calling
// estimategas, as the node will soemtimes return a gas estimate for a contract deployment, however,
// it is incorrect and the tx will revert.
Expand Down Expand Up @@ -665,10 +686,13 @@ func (p *Processor) hardcodeGasLimit(
return nil
}

// setLatestNonce sets the latest nonce used for the relayer key
func (p *Processor) setLatestNonce(nonce uint64) {
p.destNonce = nonce
}

// saveMessageStatusChangedEvent writes the MessageStatusChanged event to the
// database after a message is processed
func (p *Processor) saveMessageStatusChangedEvent(
ctx context.Context,
receipt *types.Receipt,
Expand Down Expand Up @@ -714,6 +738,7 @@ func (p *Processor) saveMessageStatusChangedEvent(
return nil
}

// getCost determines the fee of a processMessage call
func (p *Processor) getCost(ctx context.Context, auth *bind.TransactOpts) (*big.Int, error) {
if auth.GasTipCap != nil {
blk, err := p.destEthClient.BlockByNumber(ctx, nil)
Expand Down
2 changes: 2 additions & 0 deletions packages/relayer/processor/process_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/taikoxyz/taiko-mono/packages/relayer/pkg/queue"
)

// processSingle is used to process a single message, when we are
// targeting a specific message via config flag
func (p *Processor) processSingle(ctx context.Context) error {
slog.Info("processing single", "txHash", common.Hash(*p.targetTxHash).Hex())

Expand Down
12 changes: 12 additions & 0 deletions packages/relayer/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type DB interface {
GormDB() *gorm.DB
}

// ethClient is a slimmed down interface of a go-ethereum ethclient.Client
// we can use for mocking and testing
type ethClient interface {
PendingNonceAt(ctx context.Context, account common.Address) (uint64, error)
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
Expand All @@ -50,6 +52,9 @@ type ethClient interface {
ChainID(ctx context.Context) (*big.Int, error)
}

// hop is a struct which needs to be created based on the config parameters
// for a hop. Each hop is an intermediary hop - if we are just processing
// srcChain to destChain, we should have no hops.
type hop struct {
chainID *big.Int
signalServiceAddress common.Address
Expand All @@ -60,6 +65,8 @@ type hop struct {
blockNum uint64
}

// Processor is the main struct which handles message processing and queue
// instantiation
type Processor struct {
cancel context.CancelFunc

Expand Down Expand Up @@ -115,6 +122,7 @@ type Processor struct {
cfg *Config
}

// InitFromCli creates a new processor from a cli context
func (p *Processor) InitFromCli(ctx context.Context, c *cli.Context) error {
cfg, err := NewConfigFromCliContext(c)
if err != nil {
Expand Down Expand Up @@ -155,6 +163,8 @@ func InitFromConfig(ctx context.Context, p *Processor, cfg *Config) error {

hops := []hop{}

// iteraate over all the hop configs and create a hop struct
// which can be used to generate hop proofs
for _, hopConfig := range cfg.hopConfigs {
var hopEthClient *ethclient.Client

Expand Down Expand Up @@ -382,6 +392,8 @@ func (p *Processor) queueName() string {
return fmt.Sprintf("%v-%v-%v-queue", p.srcChainId.String(), p.destChainId.String(), relayer.EventNameMessageSent)
}

// eventLoop is the main event loop of a Processor which should read
// messages from a queue and then process them.
func (p *Processor) eventLoop(ctx context.Context) {
defer func() {
p.wg.Done()
Expand Down
2 changes: 2 additions & 0 deletions packages/relayer/processor/wait_for_confirmations.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/taikoxyz/taiko-mono/packages/relayer"
)

// waitForConfirmations waits for the given transaction to reach N confs
// before returning
func (p *Processor) waitForConfirmations(ctx context.Context, txHash common.Hash, blockNumber uint64) error {
ctx, cancelFunc := context.WithTimeout(ctx, time.Duration(p.confTimeoutInSeconds)*time.Second)

Expand Down
3 changes: 3 additions & 0 deletions packages/relayer/processor/wait_header_synced.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"github.com/taikoxyz/taiko-mono/packages/relayer"
)

// waitHeaderSynced waits for a event to appear in the database from the indexer
// for the type "ChainDataSynced" to be greater or less than the given blockNum.
// this is used to make sure a valid proof can be generated and verified on chain.
func (p *Processor) waitHeaderSynced(
ctx context.Context,
ethClient ethClient,
Expand Down

0 comments on commit 71553ac

Please sign in to comment.