diff --git a/driver/driver_test.go b/driver/driver_test.go index 8b5900e72..37fc61b58 100644 --- a/driver/driver_test.go +++ b/driver/driver_test.go @@ -129,8 +129,10 @@ func (s *DriverTestSuite) TestProcessL1Blocks() { } func (s *DriverTestSuite) TestCheckL1ReorgToHigherFork() { - var testnetL1SnapshotID = s.SetL1Snapshot() - + var ( + testnetL1SnapshotID = s.SetL1Snapshot() + sender = s.p.GetSender() + ) l1Head1, err := s.d.rpc.L1.HeaderByNumber(context.Background(), nil) s.Nil(err) l2Head1, err := s.d.rpc.L2.HeaderByNumber(context.Background(), nil) @@ -164,6 +166,8 @@ func (s *DriverTestSuite) TestCheckL1ReorgToHigherFork() { s.Equal(l1Head3.Number.Uint64(), l1Head1.Number.Uint64()) s.Equal(l1Head3.Hash(), l1Head1.Hash()) + // Because of evm_revert operation, the nonce of the proposer need to be adjusted. + sender.AdjustNonce(nil) // Propose ten blocks on another fork for i := 0; i < 10; i++ { s.ProposeInvalidTxListBytes(s.p) @@ -188,8 +192,10 @@ func (s *DriverTestSuite) TestCheckL1ReorgToHigherFork() { } func (s *DriverTestSuite) TestCheckL1ReorgToLowerFork() { - var testnetL1SnapshotID = s.SetL1Snapshot() - + var ( + testnetL1SnapshotID = s.SetL1Snapshot() + sender = s.p.GetSender() + ) l1Head1, err := s.d.rpc.L1.HeaderByNumber(context.Background(), nil) s.Nil(err) l2Head1, err := s.d.rpc.L2.HeaderByNumber(context.Background(), nil) @@ -223,6 +229,7 @@ func (s *DriverTestSuite) TestCheckL1ReorgToLowerFork() { s.Equal(l1Head3.Number.Uint64(), l1Head1.Number.Uint64()) s.Equal(l1Head3.Hash(), l1Head1.Hash()) + sender.AdjustNonce(nil) // Propose one blocks on another fork s.ProposeInvalidTxListBytes(s.p) @@ -244,8 +251,10 @@ func (s *DriverTestSuite) TestCheckL1ReorgToLowerFork() { } func (s *DriverTestSuite) TestCheckL1ReorgToSameHeightFork() { - var testnetL1SnapshotID = s.SetL1Snapshot() - + var ( + testnetL1SnapshotID = s.SetL1Snapshot() + sender = s.p.GetSender() + ) l1Head1, err := s.d.rpc.L1.HeaderByNumber(context.Background(), nil) s.Nil(err) l2Head1, err := s.d.rpc.L2.HeaderByNumber(context.Background(), nil) @@ -279,6 +288,7 @@ func (s *DriverTestSuite) TestCheckL1ReorgToSameHeightFork() { s.Equal(l1Head3.Number.Uint64(), l1Head1.Number.Uint64()) s.Equal(l1Head3.Hash(), l1Head1.Hash()) + sender.AdjustNonce(nil) // Propose two blocks on another fork s.ProposeInvalidTxListBytes(s.p) time.Sleep(3 * time.Second) diff --git a/go.mod b/go.mod index 7b5e0825c..60285d4a9 100644 --- a/go.mod +++ b/go.mod @@ -12,12 +12,15 @@ require ( github.com/joho/godotenv v1.5.1 github.com/labstack/echo/v4 v4.11.1 github.com/modern-go/reflect2 v1.0.2 + github.com/orcaman/concurrent-map/v2 v2.0.1 + github.com/pborman/uuid v1.2.1 github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 github.com/prysmaticlabs/prysm/v4 v4.2.0 github.com/stretchr/testify v1.8.4 github.com/swaggo/swag v1.16.2 github.com/urfave/cli/v2 v2.25.7 golang.org/x/sync v0.5.0 + modernc.org/mathutil v1.6.0 ) require ( @@ -79,7 +82,7 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b // indirect - github.com/google/uuid v1.3.0 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/gorilla/mux v1.8.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.0.1 // indirect @@ -164,6 +167,7 @@ require ( github.com/quic-go/quic-go v0.39.3 // indirect github.com/quic-go/webtransport-go v0.6.0 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/rs/cors v1.7.0 // indirect diff --git a/go.sum b/go.sum index c0a4eecb5..bd2399a37 100644 --- a/go.sum +++ b/go.sum @@ -442,8 +442,8 @@ github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3 github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= @@ -797,6 +797,8 @@ github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTm github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= +github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= +github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= @@ -804,6 +806,8 @@ github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTK github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= +github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= +github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= @@ -887,6 +891,8 @@ github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtB github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= @@ -1648,6 +1654,8 @@ k8s.io/utils v0.0.0-20201110183641-67b214c5f920 h1:CbnUZsM497iRC5QMVkHwyl8s2tB3g k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI= lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= +modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/internal/sender/common.go b/internal/sender/common.go new file mode 100644 index 000000000..87ab346b1 --- /dev/null +++ b/internal/sender/common.go @@ -0,0 +1,202 @@ +package sender + +import ( + "fmt" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/holiman/uint256" + "github.com/pborman/uuid" + "modernc.org/mathutil" + + "github.com/taikoxyz/taiko-client/pkg/rpc" +) + +// adjustGas adjusts the gas fee cap and gas tip cap of the given transaction with the configured +// growth rate. +func (s *Sender) adjustGas(txData types.TxData) { + rate := s.GasGrowthRate + 100 + switch baseTx := txData.(type) { + case *types.DynamicFeeTx: + gasFeeCap := baseTx.GasFeeCap.Int64() + gasFeeCap = gasFeeCap / 100 * int64(rate) + gasFeeCap = mathutil.MinInt64(gasFeeCap, int64(s.MaxGasFee)) + baseTx.GasFeeCap = big.NewInt(gasFeeCap) + + gasTipCap := baseTx.GasTipCap.Int64() + gasTipCap = gasTipCap / 100 * int64(rate) + gasTipCap = mathutil.MinInt64(gasFeeCap, mathutil.MinInt64(gasTipCap, int64(s.MaxGasFee))) + baseTx.GasTipCap = big.NewInt(gasTipCap) + case *types.BlobTx: + gasFeeCap := baseTx.GasFeeCap.Uint64() + gasFeeCap = gasFeeCap / 100 * rate + gasFeeCap = mathutil.MinUint64(gasFeeCap, s.MaxGasFee) + baseTx.GasFeeCap = uint256.NewInt(gasFeeCap) + + gasTipCap := baseTx.GasTipCap.Uint64() + gasTipCap = gasTipCap / 100 * rate + gasTipCap = mathutil.MinUint64(gasFeeCap, mathutil.MinUint64(gasTipCap, s.MaxGasFee)) + baseTx.GasTipCap = uint256.NewInt(gasTipCap) + + blobFeeCap := baseTx.BlobFeeCap.Uint64() + blobFeeCap = blobFeeCap / 100 * rate + blobFeeCap = mathutil.MinUint64(blobFeeCap, s.MaxBlobFee) + baseTx.BlobFeeCap = uint256.NewInt(blobFeeCap) + } +} + +// AdjustNonce adjusts the nonce of the given transaction with the current nonce of the sender. +func (s *Sender) AdjustNonce(txData types.TxData) { + nonce, err := s.client.NonceAt(s.ctx, s.Opts.From, nil) + if err != nil { + log.Warn("Failed to get the nonce", "from", s.Opts.From, "err", err) + return + } + s.Opts.Nonce = new(big.Int).SetUint64(nonce) + + switch tx := txData.(type) { + case *types.DynamicFeeTx: + tx.Nonce = nonce + case *types.BlobTx: + tx.Nonce = nonce + default: + log.Warn("Unsupported transaction type", "from", s.Opts.From) + } +} + +// updateGasTipGasFee updates the gas tip cap and gas fee cap of the sender with the given chain head info. +func (s *Sender) updateGasTipGasFee(head *types.Header) error { + // Get the gas tip cap + gasTipCap, err := s.client.SuggestGasTipCap(s.ctx) + if err != nil { + return err + } + + // Get the gas fee cap + gasFeeCap := new(big.Int).Add(gasTipCap, new(big.Int).Mul(head.BaseFee, big.NewInt(2))) + // Check if the gas fee cap is less than the gas tip cap + if gasFeeCap.Cmp(gasTipCap) < 0 { + return fmt.Errorf("maxFeePerGas (%v) < maxPriorityFeePerGas (%v)", gasFeeCap, gasTipCap) + } + maxGasFee := new(big.Int).SetUint64(s.MaxGasFee) + if gasFeeCap.Cmp(maxGasFee) > 0 { + gasFeeCap = new(big.Int).Set(maxGasFee) + gasTipCap = new(big.Int).Set(maxGasFee) + } + + s.Opts.GasTipCap = gasTipCap + s.Opts.GasFeeCap = gasFeeCap + + return nil +} + +// buildTxData assembles the transaction data from the given transaction. +func (s *Sender) buildTxData(tx *types.Transaction) (types.TxData, error) { + switch tx.Type() { + case types.DynamicFeeTxType: + return &types.DynamicFeeTx{ + ChainID: s.client.ChainID, + To: tx.To(), + Nonce: tx.Nonce(), + GasFeeCap: s.Opts.GasFeeCap, + GasTipCap: s.Opts.GasTipCap, + Gas: tx.Gas(), + Value: tx.Value(), + Data: tx.Data(), + AccessList: tx.AccessList(), + }, nil + case types.BlobTxType: + var to common.Address + if tx.To() != nil { + to = *tx.To() + } + return &types.BlobTx{ + ChainID: uint256.MustFromBig(s.client.ChainID), + To: to, + Nonce: tx.Nonce(), + GasFeeCap: uint256.MustFromBig(s.Opts.GasFeeCap), + GasTipCap: uint256.MustFromBig(s.Opts.GasTipCap), + Gas: tx.Gas(), + Value: uint256.MustFromBig(tx.Value()), + Data: tx.Data(), + AccessList: tx.AccessList(), + BlobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()), + BlobHashes: tx.BlobHashes(), + Sidecar: tx.BlobTxSidecar(), + }, nil + default: + return nil, fmt.Errorf("unsupported transaction type: %v", tx.Type()) + } +} + +// handleReorgTransactions handles the transactions which are backed to the mempool due to reorg. +func (s *Sender) handleReorgTransactions() { // nolint: unused + content, err := rpc.Content(s.ctx, s.client) + if err != nil { + log.Warn("failed to get the unconfirmed transactions", "address", s.Opts.From.String(), "err", err) + return + } + if len(content) == 0 { + return + } + + txs := map[common.Hash]*types.Transaction{} + for _, txMapStatus := range content { + for key, txMapNonce := range txMapStatus { + addr := common.HexToAddress(key) + if addr != s.Opts.From { + continue + } + for _, tx := range txMapNonce { + txs[tx.Hash()] = tx + } + } + } + // Remove the already handled transactions. + for _, confirm := range s.unconfirmedTxs.Items() { + delete(txs, confirm.CurrentTx.Hash()) + } + for _, tx := range txs { + baseTx, err := s.buildTxData(tx) + if err != nil { + log.Warn("failed to make the transaction data when handle reorg txs", "tx_hash", tx.Hash().String(), "err", err) + return + } + txID := uuid.New() + confirm := &TxToConfirm{ + ID: txID, + CurrentTx: tx, + originalTx: baseTx, + } + s.unconfirmedTxs.Set(txID, confirm) + s.txToConfirmCh.Set(txID, make(chan *TxToConfirm, 1)) + log.Info("handle reorg tx", "tx_hash", tx.Hash().String(), "tx_id", txID) + } +} + +// setDefault sets the default value if the given value is 0. +func setDefault[T uint64 | time.Duration](src, dest T) T { + if src == 0 { + return dest + } + return src +} + +// setConfigWithDefaultValues sets the config with default values if the given config is nil. +func setConfigWithDefaultValues(config *Config) *Config { + if config == nil { + return DefaultConfig + } + return &Config{ + ConfirmationDepth: setDefault(config.ConfirmationDepth, DefaultConfig.ConfirmationDepth), + MaxRetrys: setDefault(config.MaxRetrys, DefaultConfig.MaxRetrys), + MaxWaitingTime: setDefault(config.MaxWaitingTime, DefaultConfig.MaxWaitingTime), + GasLimit: setDefault(config.GasLimit, DefaultConfig.GasLimit), + GasGrowthRate: setDefault(config.GasGrowthRate, DefaultConfig.GasGrowthRate), + MaxGasFee: setDefault(config.MaxGasFee, DefaultConfig.MaxGasFee), + MaxBlobFee: setDefault(config.MaxBlobFee, DefaultConfig.MaxBlobFee), + } +} diff --git a/internal/sender/sender.go b/internal/sender/sender.go new file mode 100644 index 000000000..3007b571d --- /dev/null +++ b/internal/sender/sender.go @@ -0,0 +1,376 @@ +package sender + +import ( + "context" + "crypto/ecdsa" + "fmt" + "math/big" + "os" + "strings" + "sync" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/pborman/uuid" + + "github.com/taikoxyz/taiko-client/pkg/rpc" +) + +var ( + sendersMap = map[uint64]map[common.Address]*Sender{} + unconfirmedTxsCap = 100 + nonceIncorrectRetrys = 3 + unconfirmedTxsCheckInternal = 2 * time.Second + chainHeadFetchInterval = 3 * time.Second // nolint:unused + errTimeoutInMempool = fmt.Errorf("transaction in mempool for too long") + DefaultConfig = &Config{ + ConfirmationDepth: 0, + MaxRetrys: 0, + MaxWaitingTime: 5 * time.Minute, + GasLimit: params.TxGas, + GasGrowthRate: 50, + MaxGasFee: 20_000_000_000, + MaxBlobFee: 1_000_000_000, + } +) + +// Config represents the configuration of the transaction sender. +type Config struct { + // The minimum block confirmations to wait to confirm a transaction. + ConfirmationDepth uint64 + // The maximum retry times when sending transactions. + MaxRetrys uint64 + // The maximum waiting time for the inclusion of transactions. + MaxWaitingTime time.Duration + + // The gas limit for transactions. + GasLimit uint64 + // The gas rate to increase the gas price, 20 means 20% gas growth rate. + GasGrowthRate uint64 + // The maximum gas fee can be used when sending transactions. + MaxGasFee uint64 + MaxBlobFee uint64 +} + +// TxToConfirm represents a transaction which is waiting for its confirmation. +type TxToConfirm struct { + confirmations uint64 + originalTx types.TxData + + ID string + Retrys uint64 + CurrentTx *types.Transaction + Receipt *types.Receipt + + Err error +} + +// Sender represents a global transaction sender. +type Sender struct { + ctx context.Context + *Config + + head *types.Header + client *rpc.EthClient + + Opts *bind.TransactOpts + + unconfirmedTxs cmap.ConcurrentMap[string, *TxToConfirm] + txToConfirmCh cmap.ConcurrentMap[string, chan *TxToConfirm] + + mu sync.Mutex + wg sync.WaitGroup + stopCh chan struct{} +} + +// NewSender creates a new instance of Sender. +func NewSender(ctx context.Context, cfg *Config, client *rpc.EthClient, priv *ecdsa.PrivateKey) (*Sender, error) { + cfg = setConfigWithDefaultValues(cfg) + + // Create a new transactor + opts, err := bind.NewKeyedTransactorWithChainID(priv, client.ChainID) + if err != nil { + return nil, err + } + // Do not automatically send transactions + opts.NoSend = true + + // Add the sender to the root sender. + if root := sendersMap[client.ChainID.Uint64()]; root == nil { + sendersMap[client.ChainID.Uint64()] = map[common.Address]*Sender{} + } else { + if root[opts.From] != nil { + return nil, fmt.Errorf("sender already exists") + } + } + + // Get the chain ID + head, err := client.HeaderByNumber(ctx, nil) + if err != nil { + return nil, err + } + + sender := &Sender{ + ctx: ctx, + Config: cfg, + head: head, + client: client, + Opts: opts, + unconfirmedTxs: cmap.New[*TxToConfirm](), + txToConfirmCh: cmap.New[chan *TxToConfirm](), + stopCh: make(chan struct{}), + } + // Initialize the nonce + sender.AdjustNonce(nil) + + // Initialize the gas fee related fields + if err = sender.updateGasTipGasFee(head); err != nil { + return nil, err + } + if os.Getenv("RUN_TESTS") == "" { + sendersMap[client.ChainID.Uint64()][opts.From] = sender + } + + sender.wg.Add(1) + go sender.loop() + + return sender, nil +} + +func (s *Sender) Close() { + close(s.stopCh) + s.wg.Wait() +} + +// TxToConfirmChannel returns a channel to wait the given transaction's confirmation. +func (s *Sender) TxToConfirmChannel(txID string) <-chan *TxToConfirm { + ch, ok := s.txToConfirmCh.Get(txID) + if !ok { + log.Warn("Transaction not found", "id", txID) + } + return ch +} + +// TxToConfirmChannels returns channels to wait the given transactions confirmation. +func (s *Sender) TxToConfirmChannels() map[string]<-chan *TxToConfirm { + channels := map[string]<-chan *TxToConfirm{} + for txID, confirmCh := range s.txToConfirmCh.Items() { + channels[txID] = confirmCh + } + return channels +} + +// GetUnconfirmedTx returns the unconfirmed transaction by the transaction ID. +func (s *Sender) GetUnconfirmedTx(txID string) *types.Transaction { + txToConfirm, ok := s.unconfirmedTxs.Get(txID) + if !ok { + return nil + } + return txToConfirm.CurrentTx +} + +// SendRawTransaction sends a transaction to the given Ethereum node. +func (s *Sender) SendRawTransaction(nonce uint64, target *common.Address, value *big.Int, data []byte) (string, error) { + return s.SendTransaction(types.NewTx(&types.DynamicFeeTx{ + ChainID: s.client.ChainID, + To: target, + Nonce: nonce, + GasFeeCap: s.Opts.GasFeeCap, + GasTipCap: s.Opts.GasTipCap, + Gas: s.GasLimit, + Value: value, + Data: data, + })) +} + +// SendTransaction sends a transaction to the given Ethereum node. +func (s *Sender) SendTransaction(tx *types.Transaction) (string, error) { + if s.unconfirmedTxs.Count() >= unconfirmedTxsCap { + return "", fmt.Errorf("too many pending transactions") + } + + txData, err := s.buildTxData(tx) + if err != nil { + return "", err + } + + txID := uuid.New() + txToConfirm := &TxToConfirm{ + ID: txID, + originalTx: txData, + CurrentTx: tx, + } + + if err := s.send(txToConfirm); err != nil && !strings.Contains(err.Error(), "replacement transaction") { + log.Error("Failed to send transaction", "id", txID, "hash", tx.Hash(), "err", err) + return "", err + } + + // Add the transaction to the unconfirmed transactions + s.unconfirmedTxs.Set(txID, txToConfirm) + s.txToConfirmCh.Set(txID, make(chan *TxToConfirm, 1)) + + return txID, nil +} + +// send is the internal method to send the given transaction. +func (s *Sender) send(tx *TxToConfirm) error { + s.mu.Lock() + defer s.mu.Unlock() + + originalTx := tx.originalTx + + for i := 0; i < nonceIncorrectRetrys; i++ { + // Retry when nonce is incorrect + rawTx, err := s.Opts.Signer(s.Opts.From, types.NewTx(originalTx)) + if err != nil { + return err + } + tx.CurrentTx = rawTx + err = s.client.SendTransaction(s.ctx, rawTx) + tx.Err = err + // Check if the error is nonce too low + if err != nil { + if strings.Contains(err.Error(), "nonce too low") { + s.AdjustNonce(originalTx) + log.Warn("Nonce is incorrect, retry sending the transaction with new nonce", "hash", rawTx.Hash(), "err", err) + continue + } + if err.Error() == "replacement transaction underpriced" { + s.adjustGas(originalTx) + log.Warn("Replacement transaction underpriced", "hash", rawTx.Hash(), "err", err) + continue + } + log.Error("Failed to send transaction", "hash", rawTx.Hash(), "err", err) + return err + } + s.Opts.Nonce = new(big.Int).Add(s.Opts.Nonce, common.Big1) + break + } + return nil +} + +// loop is the main event loop of the transaction sender. +func (s *Sender) loop() { + defer s.wg.Done() + + // Subscribe new head. + headCh := make(chan *types.Header, 3) + sub, err := s.client.SubscribeNewHead(s.ctx, headCh) + if err != nil { + log.Error("failed to subscribe new head", "err", err) + return + } + defer sub.Unsubscribe() + + unconfirmedTxsCheckTicker := time.NewTicker(unconfirmedTxsCheckInternal) + defer unconfirmedTxsCheckTicker.Stop() + + for { + select { + case <-s.ctx.Done(): + return + case <-s.stopCh: + return + case <-unconfirmedTxsCheckTicker.C: + s.resendUnconfirmedTxs() + case newHead := <-headCh: + // If chain appear reorg then handle mempool transactions. + // TODO(Huan): handle reorg transactions + //if s.header.Hash() != header.ParentHash { + //s.handleReorgTransactions() + //} + s.head = newHead + // Update the gas tip and gas fee + if err = s.updateGasTipGasFee(newHead); err != nil { + log.Warn("Failed to update gas tip and gas fee", "err", err) + } + // Check the unconfirmed transactions + s.checkPendingTransactionsConfirmation() + } + } +} + +// resendUnconfirmedTxs resends all unconfirmed transactions. +func (s *Sender) resendUnconfirmedTxs() { + for id, unconfirmedTx := range s.unconfirmedTxs.Items() { + if unconfirmedTx.Err == nil { + continue + } + unconfirmedTx.Retrys++ + if s.MaxRetrys != 0 && unconfirmedTx.Retrys >= s.MaxRetrys { + s.releaseUnconfirmedTx(id) + continue + } + if err := s.send(unconfirmedTx); err != nil { + log.Warn( + "Failed to resend the transaction", + "id", id, + "retrys", unconfirmedTx.Retrys, + "err", err, + ) + } + } +} + +// checkPendingTransactionsConfirmation checks the confirmation of the pending transactions. +func (s *Sender) checkPendingTransactionsConfirmation() { + for id, pendingTx := range s.unconfirmedTxs.Items() { + if pendingTx.Err != nil { + continue + } + if pendingTx.Receipt == nil { + // Ignore the transaction if it is pending. + tx, isPending, err := s.client.TransactionByHash(s.ctx, pendingTx.CurrentTx.Hash()) + if err != nil { + log.Warn("Failed to fetch transaction", "hash", pendingTx.CurrentTx.Hash(), "err", err) + continue + } + if isPending { + // If the transaction is in mempool for too long, replace it. + if time.Since(tx.Time()) > s.MaxWaitingTime { + pendingTx.Err = errTimeoutInMempool + } + continue + } + // Get the transaction receipt. + receipt, err := s.client.TransactionReceipt(s.ctx, pendingTx.CurrentTx.Hash()) + if err != nil { + if err.Error() == "not found" { + pendingTx.Err = err + s.releaseUnconfirmedTx(id) + } + log.Warn("Failed to get the transaction receipt", "hash", pendingTx.CurrentTx.Hash(), "err", err) + continue + } + pendingTx.Receipt = receipt + if receipt.Status != types.ReceiptStatusSuccessful { + pendingTx.Err = fmt.Errorf("transaction reverted, hash: %s", receipt.TxHash) + s.releaseUnconfirmedTx(id) + continue + } + } + pendingTx.confirmations = s.head.Number.Uint64() - pendingTx.Receipt.BlockNumber.Uint64() + if pendingTx.confirmations >= s.ConfirmationDepth { + s.releaseUnconfirmedTx(id) + } + } +} + +// releaseUnconfirmedTx releases the unconfirmed transaction by the transaction ID. +func (s *Sender) releaseUnconfirmedTx(txID string) { + txConfirm, _ := s.unconfirmedTxs.Get(txID) + confirmCh, _ := s.txToConfirmCh.Get(txID) + select { + case confirmCh <- txConfirm: + default: + } + // Remove the transaction from the unconfirmed transactions + s.unconfirmedTxs.Remove(txID) + s.txToConfirmCh.Remove(txID) +} diff --git a/internal/sender/sender_test.go b/internal/sender/sender_test.go new file mode 100644 index 000000000..80e8722ca --- /dev/null +++ b/internal/sender/sender_test.go @@ -0,0 +1,146 @@ +package sender_test + +import ( + "context" + "math/big" + "os" + "runtime" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/stretchr/testify/suite" + "golang.org/x/sync/errgroup" + + "github.com/taikoxyz/taiko-client/internal/sender" + "github.com/taikoxyz/taiko-client/internal/testutils" +) + +type SenderTestSuite struct { + testutils.ClientTestSuite + sender *sender.Sender +} + +func (s *SenderTestSuite) TestNormalSender() { + var eg errgroup.Group + eg.SetLimit(runtime.NumCPU()) + for i := 0; i < 5; i++ { + i := i + eg.Go(func() error { + addr := common.BigToAddress(big.NewInt(int64(i))) + _, err := s.sender.SendRawTransaction(s.sender.Opts.Nonce.Uint64(), &addr, big.NewInt(1), nil) + return err + }) + } + s.Nil(eg.Wait()) + + for _, confirmCh := range s.sender.TxToConfirmChannels() { + confirm := <-confirmCh + s.Nil(confirm.Err) + } +} + +// Test touch max gas price and replacement. +func (s *SenderTestSuite) TestReplacement() { + send := s.sender + client := s.RPCClient.L1 + + // Let max gas price be 2 times of the gas fee cap. + send.MaxGasFee = send.Opts.GasFeeCap.Uint64() * 2 + + nonce, err := client.NonceAt(context.Background(), send.Opts.From, nil) + s.Nil(err) + + pendingNonce, err := client.PendingNonceAt(context.Background(), send.Opts.From) + s.Nil(err) + // Run test only if mempool has no pending transactions. + if pendingNonce > nonce { + return + } + + nonce++ + baseTx := &types.DynamicFeeTx{ + ChainID: client.ChainID, + To: &common.Address{}, + GasFeeCap: big.NewInt(int64(send.MaxGasFee - 1)), + GasTipCap: big.NewInt(int64(send.MaxGasFee - 1)), + Nonce: nonce, + Gas: 21000, + Value: big.NewInt(1), + Data: nil, + } + rawTx, err := send.Opts.Signer(send.Opts.From, types.NewTx(baseTx)) + s.Nil(err) + err = client.SendTransaction(context.Background(), rawTx) + s.Nil(err) + + // Replace the transaction with a higher nonce. + _, err = send.SendRawTransaction(nonce, &common.Address{}, big.NewInt(1), nil) + s.Nil(err) + + time.Sleep(time.Second * 6) + // Send a transaction with a next nonce and let all the transactions be confirmed. + _, err = send.SendRawTransaction(nonce-1, &common.Address{}, big.NewInt(1), nil) + s.Nil(err) + + for _, confirmCh := range send.TxToConfirmChannels() { + confirm := <-confirmCh + // Check the replaced transaction's gasFeeTap touch the max gas price. + if confirm.CurrentTx.Nonce() == nonce { + s.Equal(send.MaxGasFee, confirm.CurrentTx.GasFeeCap().Uint64()) + } + s.Nil(confirm.Err) + } + + _, err = client.TransactionReceipt(context.Background(), rawTx.Hash()) + s.Equal("not found", err.Error()) +} + +// Test nonce too low. +func (s *SenderTestSuite) TestNonceTooLow() { + client := s.RPCClient.L1 + send := s.sender + + nonce, err := client.NonceAt(context.Background(), send.Opts.From, nil) + s.Nil(err) + pendingNonce, err := client.PendingNonceAt(context.Background(), send.Opts.From) + s.Nil(err) + // Run test only if mempool has no pending transactions. + if pendingNonce > nonce { + return + } + + txID, err := send.SendRawTransaction(nonce-3, &common.Address{}, big.NewInt(1), nil) + s.Nil(err) + confirm := <-send.TxToConfirmChannel(txID) + s.Nil(confirm.Err) + s.Equal(nonce, confirm.CurrentTx.Nonce()) +} + +func (s *SenderTestSuite) SetupTest() { + s.ClientTestSuite.SetupTest() + + ctx := context.Background() + priv, err := crypto.ToECDSA(common.FromHex(os.Getenv("L1_PROPOSER_PRIVATE_KEY"))) + s.Nil(err) + + s.sender, err = sender.NewSender(ctx, &sender.Config{ + MaxGasFee: 20000000000, + GasGrowthRate: 50, + MaxRetrys: 0, + GasLimit: 2000000, + MaxWaitingTime: time.Second * 10, + }, s.RPCClient.L1, priv) + s.Nil(err) +} + +func (s *SenderTestSuite) TearDownTest() { + s.sender.Close() + s.ClientTestSuite.TearDownTest() +} + +func TestSenderTestSuite(t *testing.T) { + suite.Run(t, new(SenderTestSuite)) +} diff --git a/internal/testutils/helper.go b/internal/testutils/helper.go index b58f53009..e17d06b9a 100644 --- a/internal/testutils/helper.go +++ b/internal/testutils/helper.go @@ -27,7 +27,7 @@ import ( func (s *ClientTestSuite) ProposeInvalidTxListBytes(proposer Proposer) { invalidTxListBytes := RandomBytes(256) - s.Nil(proposer.ProposeTxList(context.Background(), invalidTxListBytes, 1, nil)) + s.Nil(proposer.ProposeTxList(context.Background(), invalidTxListBytes, 1)) } func (s *ClientTestSuite) ProposeAndInsertEmptyBlocks( @@ -53,7 +53,7 @@ func (s *ClientTestSuite) ProposeAndInsertEmptyBlocks( encoded, err := rlp.EncodeToBytes(emptyTxs) s.Nil(err) - s.Nil(proposer.ProposeTxList(context.Background(), encoded, 0, nil)) + s.Nil(proposer.ProposeTxList(context.Background(), encoded, 0)) s.ProposeInvalidTxListBytes(proposer) diff --git a/internal/testutils/interfaces.go b/internal/testutils/interfaces.go index f53c234a1..487d83bf9 100644 --- a/internal/testutils/interfaces.go +++ b/internal/testutils/interfaces.go @@ -6,6 +6,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/taikoxyz/taiko-client/cmd/utils" + "github.com/taikoxyz/taiko-client/internal/sender" ) type CalldataSyncer interface { @@ -20,6 +21,6 @@ type Proposer interface { ctx context.Context, txListBytes []byte, txNum uint, - nonce *uint64, ) error + GetSender() *sender.Sender } diff --git a/internal/testutils/suite.go b/internal/testutils/suite.go index 13c0774d1..b5164f08c 100644 --- a/internal/testutils/suite.go +++ b/internal/testutils/suite.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/taikoxyz/taiko-client/bindings" + "github.com/taikoxyz/taiko-client/internal/utils" "github.com/taikoxyz/taiko-client/pkg/jwt" "github.com/taikoxyz/taiko-client/pkg/rpc" "github.com/taikoxyz/taiko-client/prover/server" @@ -32,6 +33,7 @@ type ClientTestSuite struct { } func (s *ClientTestSuite) SetupTest() { + utils.LoadEnv() // Default logger glogger := log.NewGlogHandler(log.NewTerminalHandlerWithLevel(os.Stdout, log.LevelInfo, true)) log.SetDefault(log.NewLogger(glogger)) @@ -151,3 +153,9 @@ func (s *ClientTestSuite) RevertL1Snapshot(snapshotID string) { s.Nil(s.RPCClient.L1.CallContext(context.Background(), &revertRes, "evm_revert", snapshotID)) s.True(revertRes) } + +func (s *ClientTestSuite) MineL1Block() { + var blockID string + s.Nil(s.RPCClient.L1.CallContext(context.Background(), &blockID, "evm_mine")) + s.NotEmpty(blockID) +} diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 12497d7e7..1260ecce3 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -17,15 +17,15 @@ func LoadEnv() { // load test environment variables. currentPath, err := os.Getwd() if err != nil { - log.Warn("get current path failed", "err", err) + log.Debug("get current path failed", "err", err) } path := strings.Split(currentPath, "/taiko-client") if len(path) == 0 { - log.Warn("not a taiko-client repo") + log.Debug("not a taiko-client repo") } err = godotenv.Load(fmt.Sprintf("%s/taiko-client/integration_test/.env", path[0])) if err != nil { - log.Warn("failed to load test env", "current path", currentPath, "err", err) + log.Debug("failed to load test env", "current path", currentPath, "err", err) } } diff --git a/pkg/rpc/utils.go b/pkg/rpc/utils.go index 7c0032faf..e4de73293 100644 --- a/pkg/rpc/utils.go +++ b/pkg/rpc/utils.go @@ -250,18 +250,28 @@ func GetBlockProofStatus( }, nil } -type AccountPoolContent map[string]map[string]*types.Transaction +type AccountPoolContent map[string]map[string]map[string]*types.Transaction +type AccountPoolContentFrom map[string]map[string]*types.Transaction + +// Content GetPendingTxs fetches the pending transactions from tx pool. +func Content(ctx context.Context, client *EthClient) (AccountPoolContent, error) { + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + + var result AccountPoolContent + return result, client.CallContext(ctxWithTimeout, &result, "txpool_content") +} // ContentFrom fetches a given account's transactions list from a node's transactions pool. func ContentFrom( ctx context.Context, rawRPC *EthClient, address common.Address, -) (AccountPoolContent, error) { +) (AccountPoolContentFrom, error) { ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) defer cancel() - var result AccountPoolContent + var result AccountPoolContentFrom return result, rawRPC.CallContext( ctxWithTimeout, &result, @@ -284,7 +294,7 @@ func IncreaseGasTipCap( log.Info("Try replacing a transaction with same nonce", "sender", address, "nonce", opts.Nonce) - originalTx, err := GetPendingTxByNonce(ctxWithTimeout, cli, address, opts.Nonce.Uint64()) + originalTx, err := GetPendingTxByNonce(ctxWithTimeout, cli.L1, address, opts.Nonce.Uint64()) if err != nil || originalTx == nil { log.Warn( "Original transaction not found", @@ -322,14 +332,14 @@ func IncreaseGasTipCap( // GetPendingTxByNonce tries to retrieve a pending transaction with a given nonce in a node's mempool. func GetPendingTxByNonce( ctx context.Context, - cli *Client, + cli *EthClient, address common.Address, nonce uint64, ) (*types.Transaction, error) { ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) defer cancel() - content, err := ContentFrom(ctxWithTimeout, cli.L1, address) + content, err := ContentFrom(ctxWithTimeout, cli, address) if err != nil { return nil, err } diff --git a/proposer/proposer.go b/proposer/proposer.go index 7da5ad465..0bd52cb4d 100644 --- a/proposer/proposer.go +++ b/proposer/proposer.go @@ -3,20 +3,16 @@ package proposer import ( "bytes" "context" - "crypto/ecdsa" "errors" "fmt" "math/big" "math/rand" - "strings" "sync" "time" "github.com/cenkalti/backoff/v4" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" @@ -27,6 +23,7 @@ import ( "github.com/taikoxyz/taiko-client/bindings" "github.com/taikoxyz/taiko-client/bindings/encoding" "github.com/taikoxyz/taiko-client/internal/metrics" + "github.com/taikoxyz/taiko-client/internal/sender" "github.com/taikoxyz/taiko-client/pkg/rpc" selector "github.com/taikoxyz/taiko-client/proposer/prover_selector" ) @@ -63,6 +60,8 @@ type Proposer struct { CustomProposeOpHook func() error AfterCommitHook func() error + sender *sender.Sender + ctx context.Context wg sync.WaitGroup } @@ -105,6 +104,15 @@ func (p *Proposer) InitFromConfig(ctx context.Context, cfg *Config) (err error) return err } + if p.sender, err = sender.NewSender(ctx, &sender.Config{ + MaxGasFee: 20000000000, + GasGrowthRate: 20, + GasLimit: cfg.ProposeBlockTxGasLimit, + MaxWaitingTime: time.Second * 30, + }, p.rpc.L1, cfg.L1ProposerPrivKey); err != nil { + return err + } + if p.proverSelector, err = selector.NewETHFeeEOASelector( &protocolConfigs, p.rpc, @@ -176,6 +184,7 @@ func (p *Proposer) eventLoop() { // Close closes the proposer instance. func (p *Proposer) Close(ctx context.Context) { + p.sender.Close() p.wg.Wait() } @@ -238,22 +247,6 @@ func (p *Proposer) ProposeOp(ctx context.Context) error { if len(txLists) == 0 { return errNoNewTxs } - - head, err := p.rpc.L1.BlockNumber(ctx) - if err != nil { - return err - } - nonce, err := p.rpc.L1.NonceAt( - ctx, - crypto.PubkeyToAddress(p.L1ProposerPrivKey.PublicKey), - new(big.Int).SetUint64(head), - ) - if err != nil { - return err - } - - log.Info("Proposer account information", "chainHead", head, "nonce", nonce) - g := new(errgroup.Group) for i, txs := range txLists { func(i int, txs types.Transactions) { @@ -267,8 +260,7 @@ func (p *Proposer) ProposeOp(ctx context.Context) error { return fmt.Errorf("failed to encode transactions: %w", err) } - txNonce := nonce + uint64(i) - if err := p.ProposeTxList(ctx, txListBytes, uint(txs.Len()), &txNonce); err != nil { + if err := p.ProposeTxList(ctx, txListBytes, uint(txs.Len())); err != nil { return fmt.Errorf("failed to propose transactions: %w", err) } @@ -290,11 +282,9 @@ func (p *Proposer) ProposeOp(ctx context.Context) error { return nil } -func (p *Proposer) sendProposeBlockTxWithBlobHash( +func (p *Proposer) makeProposeBlockTxWithBlobHash( ctx context.Context, txListBytes []byte, - nonce *uint64, - isReplacement bool, ) (*types.Transaction, error) { // Make sidecar in order to get blob hash. sideCar, err := rpc.MakeSidecar(txListBytes) @@ -311,28 +301,6 @@ func (p *Proposer) sendProposeBlockTxWithBlobHash( return nil, err } - // Propose the transactions list - opts, err := getTxOpts(ctx, p.rpc.L1, p.L1ProposerPrivKey, p.rpc.L1.ChainID, maxFee) - if err != nil { - return nil, err - } - if nonce != nil { - opts.Nonce = new(big.Int).SetUint64(*nonce) - } - opts.GasLimit = p.ProposeBlockTxGasLimit - if isReplacement { - if opts, err = rpc.IncreaseGasTipCap( - ctx, - p.rpc, - opts, - p.proposerAddress, - new(big.Int).SetUint64(p.ProposeBlockTxReplacementMultiplier), - p.ProposeBlockTxGasTipCap, - ); err != nil { - return nil, err - } - } - var parentMetaHash = [32]byte{} if p.IncludeParentMetaHash { state, err := p.rpc.TaikoL1.State(&bind.CallOpts{Context: ctx}) @@ -348,8 +316,6 @@ func (p *Proposer) sendProposeBlockTxWithBlobHash( parentMetaHash = parent.Blk.MetaHash } - hookCalls := make([]encoding.HookCall, 0) - // Initially just use the AssignmentHook default. hookInputData, err := encoding.EncodeAssignmentHookInput(&encoding.AssignmentHookInput{ Assignment: assignment, @@ -359,10 +325,6 @@ func (p *Proposer) sendProposeBlockTxWithBlobHash( return nil, err } - hookCalls = append(hookCalls, encoding.HookCall{ - Hook: p.AssignmentHookAddress, - Data: hookInputData, - }) encodedParams, err := encoding.EncodeBlockParams(&encoding.BlockParams{ AssignedProver: assignedProver, ExtraData: rpc.StringToBytes32(p.ExtraData), @@ -371,13 +333,17 @@ func (p *Proposer) sendProposeBlockTxWithBlobHash( BlobHash: [32]byte{}, CacheBlobForReuse: false, ParentMetaHash: parentMetaHash, - HookCalls: hookCalls, + HookCalls: []encoding.HookCall{{ + Hook: p.AssignmentHookAddress, + Data: hookInputData, + }}, }) if err != nil { return nil, err } - opts.NoSend = true + opts := p.sender.Opts + opts.Value = maxFee rawTx, err := p.rpc.TaikoL1.ProposeBlock( opts, encodedParams, @@ -387,8 +353,6 @@ func (p *Proposer) sendProposeBlockTxWithBlobHash( return nil, encoding.TryParsingCustomError(err) } - // Create blob tx and send it. - opts.NoSend = false proposeTx, err := p.rpc.L1.TransactBlobTx(opts, &p.TaikoL1Address, rawTx.Data(), sideCar) if err != nil { return nil, err @@ -399,12 +363,10 @@ func (p *Proposer) sendProposeBlockTxWithBlobHash( return proposeTx, nil } -// sendProposeBlockTx tries to send a TaikoL1.proposeBlock transaction. -func (p *Proposer) sendProposeBlockTx( +// makeProposeBlockTx tries to send a TaikoL1.proposeBlock transaction. +func (p *Proposer) makeProposeBlockTx( ctx context.Context, txListBytes []byte, - nonce *uint64, - isReplacement bool, ) (*types.Transaction, error) { assignment, assignedProver, maxFee, err := p.proverSelector.AssignProver( ctx, @@ -415,27 +377,8 @@ func (p *Proposer) sendProposeBlockTx( return nil, err } - // Propose the transactions list - opts, err := getTxOpts(ctx, p.rpc.L1, p.L1ProposerPrivKey, p.rpc.L1.ChainID, maxFee) - if err != nil { - return nil, err - } - if nonce != nil { - opts.Nonce = new(big.Int).SetUint64(*nonce) - } - opts.GasLimit = p.ProposeBlockTxGasLimit - if isReplacement { - if opts, err = rpc.IncreaseGasTipCap( - ctx, - p.rpc, - opts, - p.proposerAddress, - new(big.Int).SetUint64(p.ProposeBlockTxReplacementMultiplier), - p.ProposeBlockTxGasTipCap, - ); err != nil { - return nil, err - } - } + opts := p.sender.Opts + opts.Value = maxFee var parentMetaHash = [32]byte{} if p.IncludeParentMetaHash { @@ -500,48 +443,39 @@ func (p *Proposer) ProposeTxList( ctx context.Context, txListBytes []byte, txNum uint, - nonce *uint64, ) error { - var ( - isReplacement bool - tx *types.Transaction - err error - ) - if err = backoff.Retry( + var txID string + if err := backoff.Retry( func() error { if ctx.Err() != nil { return nil } + var ( + tx *types.Transaction + err error + ) // Send tx list by blob tx. if p.BlobAllowed { - tx, err = p.sendProposeBlockTxWithBlobHash( + tx, err = p.makeProposeBlockTxWithBlobHash( ctx, txListBytes, - nonce, - isReplacement, ) } else { - tx, err = p.sendProposeBlockTx( + tx, err = p.makeProposeBlockTx( ctx, txListBytes, - nonce, - isReplacement, ) } + if err != nil { + log.Warn("Failed to make taikoL1.proposeBlock transaction", "error", encoding.TryParsingCustomError(err)) + return err + } + txID, err = p.sender.SendTransaction(tx) if err != nil { log.Warn("Failed to send taikoL1.proposeBlock transaction", "error", encoding.TryParsingCustomError(err)) - if strings.Contains(err.Error(), core.ErrNonceTooLow.Error()) { - return nil - } - if strings.Contains(err.Error(), txpool.ErrReplaceUnderpriced.Error()) { - isReplacement = true - } else { - isReplacement = false - } return err } - - return nil + return err }, backoff.WithMaxRetries( backoff.NewConstantBackOff(retryInterval), @@ -553,16 +487,11 @@ func (p *Proposer) ProposeTxList( if ctx.Err() != nil { return ctx.Err() } - if err != nil { - return err - } - ctxWithTimeout, cancel := context.WithTimeout(ctx, p.WaitReceiptTimeout) - defer cancel() - if tx != nil { - if _, err = rpc.WaitReceipt(ctxWithTimeout, p.rpc.L1, tx); err != nil { - return err - } + // Waiting for the transaction to be confirmed. + confirm := <-p.sender.TxToConfirmChannel(txID) + if confirm.Err != nil { + return confirm.Err } log.Info("📝 Propose transactions succeeded", "txs", txNum) @@ -579,7 +508,7 @@ func (p *Proposer) ProposeEmptyBlockOp(ctx context.Context) error { if err != nil { return err } - return p.ProposeTxList(ctx, emptyTxListBytes, 0, nil) + return p.ProposeTxList(ctx, emptyTxListBytes, 0) } // updateProposingTicker updates the internal proposing timer. @@ -605,6 +534,11 @@ func (p *Proposer) Name() string { return "proposer" } +// GetSender returns the sender instance. +func (p *Proposer) GetSender() *sender.Sender { + return p.sender +} + // initTierFees initializes the proving fees for every proof tier configured in the protocol for the proposer. func (p *Proposer) initTierFees() error { for _, tier := range p.tiers { @@ -637,31 +571,3 @@ func (p *Proposer) initTierFees() error { return nil } - -// getTxOpts creates a bind.TransactOpts instance using the given private key. -func getTxOpts( - ctx context.Context, - cli *rpc.EthClient, - privKey *ecdsa.PrivateKey, - chainID *big.Int, - fee *big.Int, -) (*bind.TransactOpts, error) { - opts, err := bind.NewKeyedTransactorWithChainID(privKey, chainID) - if err != nil { - return nil, fmt.Errorf("failed to generate prepareBlock transaction options: %w", err) - } - - gasTipCap, err := cli.SuggestGasTipCap(ctx) - if err != nil { - if rpc.IsMaxPriorityFeePerGasNotFoundError(err) { - gasTipCap = rpc.FallbackGasTipCap - } else { - return nil, err - } - } - - opts.GasTipCap = gasTipCap - opts.Value = fee - - return opts, nil -} diff --git a/proposer/proposer_test.go b/proposer/proposer_test.go index ea7dab46f..e5a1e23b1 100644 --- a/proposer/proposer_test.go +++ b/proposer/proposer_test.go @@ -155,47 +155,41 @@ func (s *ProposerTestSuite) TestCustomProposeOpHook() { } func (s *ProposerTestSuite) TestSendProposeBlockTx() { + sender := s.p.GetSender() + s.SetL1Automine(false) + defer s.SetL1Automine(true) + + sender.AdjustNonce(nil) + fee := big.NewInt(10000) - opts, err := getTxOpts( - context.Background(), - s.p.rpc.L1, - s.p.L1ProposerPrivKey, - s.RPCClient.L1.ChainID, - fee, - ) - s.Nil(err) + opts := sender.Opts + opts.Value = fee s.Greater(opts.GasTipCap.Uint64(), uint64(0)) nonce, err := s.RPCClient.L1.PendingNonceAt(context.Background(), s.p.proposerAddress) s.Nil(err) - tx := types.NewTransaction( - nonce, - common.BytesToAddress([]byte{}), - common.Big1, - 100000, - opts.GasTipCap, - []byte{}, - ) - - s.SetL1Automine(false) - defer s.SetL1Automine(true) - - signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(s.RPCClient.L1.ChainID), s.p.L1ProposerPrivKey) + txID, err := sender.SendRawTransaction(nonce, &common.Address{}, common.Big1, nil) s.Nil(err) - s.Nil(s.RPCClient.L1.SendTransaction(context.Background(), signedTx)) + tx := sender.GetUnconfirmedTx(txID) - var emptyTxs []types.Transaction - encoded, err := rlp.EncodeToBytes(emptyTxs) + encoded, err := rlp.EncodeToBytes([]types.Transaction{}) + s.Nil(err) + var newTx *types.Transaction + if s.p.BlobAllowed { + newTx, err = s.p.makeProposeBlockTxWithBlobHash(context.Background(), encoded) + } else { + newTx, err = s.p.makeProposeBlockTx( + context.Background(), + encoded, + ) + } s.Nil(err) - newTx, err := s.p.sendProposeBlockTx( - context.Background(), - encoded, - &nonce, - true, - ) + txID, err = sender.SendRawTransaction(nonce, newTx.To(), newTx.Value(), newTx.Data()) s.Nil(err) + newTx = sender.GetUnconfirmedTx(txID) + s.Greater(newTx.GasTipCap().Uint64(), tx.GasTipCap().Uint64()) }