diff --git a/cmd/flags/driver.go b/cmd/flags/driver.go index ef52c69fc..8693f0366 100644 --- a/cmd/flags/driver.go +++ b/cmd/flags/driver.go @@ -35,6 +35,12 @@ var ( Value: 0, Category: driverCategory, } + // blob server endpoint + BlobServerEndpoint = &cli.StringFlag{ + Name: "blob.server", + Usage: "Blob sidecar storage server", + Category: driverCategory, + } ) // DriverFlags All driver flags. @@ -47,4 +53,5 @@ var DriverFlags = MergeFlags(CommonFlags, []cli.Flag{ P2PSyncTimeout, CheckPointSyncURL, MaxExponent, + BlobServerEndpoint, }) diff --git a/driver/chain_syncer/calldata/syncer.go b/driver/chain_syncer/calldata/syncer.go index 12e8c9532..4c2e94973 100644 --- a/driver/chain_syncer/calldata/syncer.go +++ b/driver/chain_syncer/calldata/syncer.go @@ -5,8 +5,10 @@ import ( "errors" "fmt" "math/big" + "net/url" "time" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" @@ -14,18 +16,17 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" - - "github.com/ethereum/go-ethereum" "github.com/taikoxyz/taiko-client/bindings" "github.com/taikoxyz/taiko-client/bindings/encoding" - anchorTxConstructor "github.com/taikoxyz/taiko-client/driver/anchor_tx_constructor" "github.com/taikoxyz/taiko-client/driver/chain_syncer/beaconsync" "github.com/taikoxyz/taiko-client/driver/state" - txlistfetcher "github.com/taikoxyz/taiko-client/driver/txlist_fetcher" "github.com/taikoxyz/taiko-client/internal/metrics" "github.com/taikoxyz/taiko-client/internal/utils" - eventIterator "github.com/taikoxyz/taiko-client/pkg/chain_iterator/event_iterator" "github.com/taikoxyz/taiko-client/pkg/rpc" + + anchorTxConstructor "github.com/taikoxyz/taiko-client/driver/anchor_tx_constructor" + txlistfetcher "github.com/taikoxyz/taiko-client/driver/txlist_fetcher" + eventIterator "github.com/taikoxyz/taiko-client/pkg/chain_iterator/event_iterator" txListValidator "github.com/taikoxyz/taiko-client/pkg/txlist_validator" ) @@ -42,6 +43,7 @@ type Syncer struct { lastInsertedBlockID *big.Int reorgDetectedFlag bool maxRetrieveExponent uint64 + blobDatasource *rpc.BlobDataSource } // NewSyncer creates a new syncer instance. @@ -51,6 +53,7 @@ func NewSyncer( state *state.State, progressTracker *beaconsync.SyncProgressTracker, maxRetrieveExponent uint64, + blobServerEndpoint *url.URL, ) (*Syncer, error) { configs, err := client.TaikoL1.GetConfig(&bind.CallOpts{Context: ctx}) if err != nil { @@ -74,6 +77,11 @@ func NewSyncer( client.L2.ChainID, ), maxRetrieveExponent: maxRetrieveExponent, + blobDatasource: rpc.NewBlobDataSource( + ctx, + client, + blobServerEndpoint, + ), }, nil } @@ -239,7 +247,7 @@ func (s *Syncer) onBlockProposed( // Decode transactions list. var txListDecoder txlistfetcher.TxListFetcher if event.Meta.BlobUsed { - txListDecoder = txlistfetcher.NewBlobTxListFetcher(s.rpc.L1Beacon) + txListDecoder = txlistfetcher.NewBlobTxListFetcher(s.rpc.L1Beacon, s.blobDatasource) } else { txListDecoder = new(txlistfetcher.CalldataFetcher) } diff --git a/driver/chain_syncer/calldata/syncer_test.go b/driver/chain_syncer/calldata/syncer_test.go index 5a2cc346b..c06cce432 100644 --- a/driver/chain_syncer/calldata/syncer_test.go +++ b/driver/chain_syncer/calldata/syncer_test.go @@ -41,6 +41,7 @@ func (s *CalldataSyncerTestSuite) SetupTest() { state2, beaconsync.NewSyncProgressTracker(s.RPCClient.L2, 1*time.Hour), 0, + nil, ) s.Nil(err) s.s = syncer @@ -56,6 +57,7 @@ func (s *CalldataSyncerTestSuite) TestCancelNewSyncer() { s.s.state, s.s.progressTracker, 0, + nil, ) s.Nil(syncer) s.NotNil(err) diff --git a/driver/chain_syncer/chain_syncer.go b/driver/chain_syncer/chain_syncer.go index 13c6ddcc7..ea87f6ec8 100644 --- a/driver/chain_syncer/chain_syncer.go +++ b/driver/chain_syncer/chain_syncer.go @@ -3,6 +3,7 @@ package chainsyncer import ( "context" "fmt" + "net/url" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -45,6 +46,8 @@ func New( p2pSyncVerifiedBlocks bool, p2pSyncTimeout time.Duration, maxRetrieveExponent uint64, + blobServerEndpoint *url.URL, + ) (*L2ChainSyncer, error) { tracker := beaconsync.NewSyncProgressTracker(rpc.L2, p2pSyncTimeout) go tracker.Track(ctx) @@ -54,7 +57,7 @@ func New( return nil, err } beaconSyncer := beaconsync.NewSyncer(ctx, rpc, state, syncMode, tracker) - calldataSyncer, err := calldata.NewSyncer(ctx, rpc, state, tracker, maxRetrieveExponent) + calldataSyncer, err := calldata.NewSyncer(ctx, rpc, state, tracker, maxRetrieveExponent, blobServerEndpoint) if err != nil { return nil, err } diff --git a/driver/chain_syncer/chain_syncer_test.go b/driver/chain_syncer/chain_syncer_test.go index ede2bc863..975e2dd41 100644 --- a/driver/chain_syncer/chain_syncer_test.go +++ b/driver/chain_syncer/chain_syncer_test.go @@ -41,6 +41,7 @@ func (s *ChainSyncerTestSuite) SetupTest() { false, 1*time.Hour, 0, + nil, ) s.Nil(err) s.s = syncer diff --git a/driver/config.go b/driver/config.go index a1d906365..852578f3a 100644 --- a/driver/config.go +++ b/driver/config.go @@ -3,6 +3,7 @@ package driver import ( "errors" "fmt" + "net/url" "time" "github.com/ethereum/go-ethereum/common" @@ -21,6 +22,7 @@ type Config struct { RPCTimeout time.Duration RetryInterval time.Duration MaxExponent uint64 + BlobServerEndpoint *url.URL } // NewConfigFromCliContext creates a new config instance from @@ -44,6 +46,15 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) { return nil, errors.New("empty L1 beacon endpoint") } + var blobServerEndpoint *url.URL + if c.IsSet(flags.BlobServerEndpoint.Name) { + if blobServerEndpoint, err = url.Parse( + c.String(flags.BlobServerEndpoint.Name), + ); err != nil { + return nil, err + } + } + var timeout = c.Duration(flags.RPCTimeout.Name) return &Config{ ClientConfig: &rpc.ClientConfig{ @@ -62,5 +73,6 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) { P2PSyncTimeout: c.Duration(flags.P2PSyncTimeout.Name), RPCTimeout: timeout, MaxExponent: c.Uint64(flags.MaxExponent.Name), + BlobServerEndpoint: blobServerEndpoint, }, nil } diff --git a/driver/driver.go b/driver/driver.go index 753707c35..6c450657d 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -80,6 +80,7 @@ func (d *Driver) InitFromConfig(ctx context.Context, cfg *Config) (err error) { cfg.P2PSyncVerifiedBlocks, cfg.P2PSyncTimeout, cfg.MaxExponent, + cfg.BlobServerEndpoint, ); err != nil { return err } diff --git a/driver/txlist_fetcher/blob.go b/driver/txlist_fetcher/blob.go index 9cb0897e9..dcedaadb4 100644 --- a/driver/txlist_fetcher/blob.go +++ b/driver/txlist_fetcher/blob.go @@ -11,17 +11,19 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/taikoxyz/taiko-client/bindings" + "github.com/taikoxyz/taiko-client/pkg" "github.com/taikoxyz/taiko-client/pkg/rpc" ) // BlobFetcher is responsible for fetching the txList blob from the L1 block sidecar. type BlobFetcher struct { l1Beacon *rpc.BeaconClient + ds *rpc.BlobDataSource } // NewBlobTxListFetcher creates a new BlobFetcher instance based on the given rpc client. -func NewBlobTxListFetcher(l1Beacon *rpc.BeaconClient) *BlobFetcher { - return &BlobFetcher{l1Beacon} +func NewBlobTxListFetcher(l1Beacon *rpc.BeaconClient, ds *rpc.BlobDataSource) *BlobFetcher { + return &BlobFetcher{l1Beacon, ds} } // Fetch implements the TxListFetcher interface. @@ -31,11 +33,11 @@ func (d *BlobFetcher) Fetch( meta *bindings.TaikoDataBlockMetadata, ) ([]byte, error) { if !meta.BlobUsed { - return nil, errBlobUnused + return nil, pkg.ErrBlobUsed } // Fetch the L1 block sidecars. - sidecars, err := d.l1Beacon.GetBlobs(ctx, meta.Timestamp) + sidecars, err := d.ds.GetBlobs(ctx, meta) if err != nil { return nil, err } @@ -61,5 +63,5 @@ func (d *BlobFetcher) Fetch( } } - return nil, errSidecarNotFound + return nil, pkg.ErrSidecarNotFound } diff --git a/driver/txlist_fetcher/calldata.go b/driver/txlist_fetcher/calldata.go index 4abb686b0..ae011012d 100644 --- a/driver/txlist_fetcher/calldata.go +++ b/driver/txlist_fetcher/calldata.go @@ -6,6 +6,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/taikoxyz/taiko-client/bindings" "github.com/taikoxyz/taiko-client/bindings/encoding" + "github.com/taikoxyz/taiko-client/pkg" ) // CalldataFetcher is responsible for fetching the txList bytes from the transaction's calldata. @@ -18,7 +19,7 @@ func (d *CalldataFetcher) Fetch( meta *bindings.TaikoDataBlockMetadata, ) ([]byte, error) { if meta.BlobUsed { - return nil, errBlobUsed + return nil, pkg.ErrBlobUsed } return encoding.UnpackTxListBytes(tx.Data()) diff --git a/driver/txlist_fetcher/interface.go b/driver/txlist_fetcher/interface.go index 6f43e69d8..b4b984cd7 100644 --- a/driver/txlist_fetcher/interface.go +++ b/driver/txlist_fetcher/interface.go @@ -2,18 +2,11 @@ package txlistdecoder import ( "context" - "errors" "github.com/ethereum/go-ethereum/core/types" "github.com/taikoxyz/taiko-client/bindings" ) -var ( - errBlobUsed = errors.New("blob is used") - errBlobUnused = errors.New("blob is not used") - errSidecarNotFound = errors.New("sidecar not found") -) - // TxListFetcher is responsible for fetching the L2 txList bytes from L1 type TxListFetcher interface { Fetch(ctx context.Context, tx *types.Transaction, meta *bindings.TaikoDataBlockMetadata) ([]byte, error) diff --git a/internal/testutils/helper.go b/internal/testutils/helper.go index 6f6418498..9137be6f0 100644 --- a/internal/testutils/helper.go +++ b/internal/testutils/helper.go @@ -287,7 +287,7 @@ func RandomBytes(size int) (b []byte) { func RandomPort() int { port, err := freeport.GetFreePort() if err != nil { - log.Crit("Failed to get local free random port", "err", err) + log.Crit("Failed to get local free random port", "error", err) } return port } @@ -298,7 +298,7 @@ func LocalRandomProverEndpoint() *url.URL { proverEndpoint, err := url.Parse(fmt.Sprintf("http://localhost:%v", port)) if err != nil { - log.Crit("Failed to parse local prover endpoint", "err", err) + log.Crit("Failed to parse local prover endpoint", "error", err) } return proverEndpoint diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 0c49fbe77..0f3a3805c 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -22,14 +22,14 @@ import ( func LoadEnv() { currentPath, err := os.Getwd() if err != nil { - log.Debug("Failed to get current path", "err", err) + log.Debug("Failed to get current path", "error", err) } path := strings.Split(currentPath, "/taiko-client") if len(path) == 0 { log.Debug("Not a taiko-client repo") } if godotenv.Load(fmt.Sprintf("%s/taiko-client/integration_test/.env", path[0])) != nil { - log.Debug("Failed to load test env", "current path", currentPath, "err", err) + log.Debug("Failed to load test env", "current path", currentPath, "error", err) } } diff --git a/pkg/error.go b/pkg/error.go new file mode 100644 index 000000000..676e6d608 --- /dev/null +++ b/pkg/error.go @@ -0,0 +1,12 @@ +package pkg + +import ( + "errors" +) + +var ( + ErrBlobUsed = errors.New("blob is used") + ErrBlobUnused = errors.New("blob is not used") + ErrSidecarNotFound = errors.New("sidecar not found") + ErrBeaconNotFound = errors.New("beacon client not found") +) diff --git a/pkg/rpc/blob_datasource.go b/pkg/rpc/blob_datasource.go new file mode 100644 index 000000000..ebf320377 --- /dev/null +++ b/pkg/rpc/blob_datasource.go @@ -0,0 +1,111 @@ +package rpc + +import ( + "context" + "fmt" + "net/url" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/go-resty/resty/v2" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/blob" + "github.com/taikoxyz/taiko-client/bindings" + "github.com/taikoxyz/taiko-client/pkg" +) + +type BlobDataSource struct { + ctx context.Context + client *Client + blobServerEndpoint *url.URL +} + +type BlobData struct { + BlobHash string `json:"blob_hash"` + KzgCommitment string `json:"kzg_commitment"` + Blob string `json:"blob"` +} + +type BlobDataSeq struct { + Data []*BlobData `json:"data"` +} + +func NewBlobDataSource( + ctx context.Context, + client *Client, + blobServerEndpoint *url.URL, +) *BlobDataSource { + return &BlobDataSource{ + ctx: ctx, + client: client, + blobServerEndpoint: blobServerEndpoint, + } +} + +// GetBlobs get blob sidecar by meta +func (ds *BlobDataSource) GetBlobs( + ctx context.Context, + meta *bindings.TaikoDataBlockMetadata, +) ([]*blob.Sidecar, error) { + if !meta.BlobUsed { + return nil, pkg.ErrBlobUnused + } + + var ( + sidecars []*blob.Sidecar + err error + ) + if ds.client.L1Beacon == nil { + sidecars, err = nil, pkg.ErrBeaconNotFound + } else { + sidecars, err = ds.client.L1Beacon.GetBlobs(ctx, meta.Timestamp) + } + if err != nil { + log.Info("Failed to get blobs from beacon, try to use blob server.", "error", err.Error()) + if ds.blobServerEndpoint == nil { + log.Info("No blob server endpoint set") + return nil, err + } + blobs, err := ds.getBlobFromServer(ctx, meta.BlobHash) + if err != nil { + return nil, err + } + sidecars = make([]*blob.Sidecar, len(blobs.Data)) + for index, value := range blobs.Data { + sidecars[index] = &blob.Sidecar{ + KzgCommitment: value.KzgCommitment, + Blob: value.Blob, + } + } + } + err = nil + return sidecars, err +} + +// getBlobFromServer get blob data from server path `/getBlob`. +func (ds *BlobDataSource) getBlobFromServer(ctx context.Context, blobHash common.Hash) (*BlobDataSeq, error) { + var ( + route = "/getBlob" + param = map[string]string{"blobHash": blobHash.String()} + ) + requestURL, err := url.JoinPath(ds.blobServerEndpoint.String(), route) + if err != nil { + return nil, err + } + resp, err := resty.New().R(). + SetResult(BlobDataSeq{}). + SetQueryParams(param). + SetContext(ctx). + SetHeader("Content-Type", "application/json"). + SetHeader("Accept", "application/json"). + Get(requestURL) + if err != nil { + return nil, err + } + if !resp.IsSuccess() { + return nil, fmt.Errorf( + "unable to contect blob server endpoint, status code: %v", + resp.StatusCode(), + ) + } + return resp.Result().(*BlobDataSeq), nil +} diff --git a/proposer/proposer_test.go b/proposer/proposer_test.go index 62d72c31e..ff1efe39a 100644 --- a/proposer/proposer_test.go +++ b/proposer/proposer_test.go @@ -46,6 +46,7 @@ func (s *ProposerTestSuite) SetupTest() { state2, beaconsync.NewSyncProgressTracker(s.RPCClient.L2, 1*time.Hour), 0, + nil, ) s.Nil(err) s.s = syncer @@ -113,7 +114,11 @@ func parseTxs(client *rpc.Client, event *bindings.TaikoL1ClientBlockProposed) (t // Decode transactions list. var txListDecoder txlistfetcher.TxListFetcher if event.Meta.BlobUsed { - txListDecoder = txlistfetcher.NewBlobTxListFetcher(client.L1Beacon) + txListDecoder = txlistfetcher.NewBlobTxListFetcher(client.L1Beacon, rpc.NewBlobDataSource( + context.Background(), + client, + nil, + )) } else { txListDecoder = new(txlistfetcher.CalldataFetcher) } @@ -152,7 +157,7 @@ func (s *ProposerTestSuite) getLatestProposedTxs( case event := <-sink: txs, err := parseTxs(s.RPCClient, event) if err != nil { - log.Error("failed to parse txs", "err", err) + log.Error("failed to parse txs", "error", err) } txLst = append(txLst, txs) case <-tick: diff --git a/prover/anchor_tx_validator/anchor_tx_validator.go b/prover/anchor_tx_validator/anchor_tx_validator.go index a58df5114..e5307a912 100644 --- a/prover/anchor_tx_validator/anchor_tx_validator.go +++ b/prover/anchor_tx_validator/anchor_tx_validator.go @@ -48,7 +48,7 @@ func (v *AnchorTxValidator) ValidateAnchorTx(tx *types.Transaction) error { method, err := encoding.TaikoL2ABI.MethodById(tx.Data()) if err != nil || method.Name != "anchor" { - return fmt.Errorf("invalid TaikoL2.anchor transaction selector, err: %w", err) + return fmt.Errorf("invalid TaikoL2.anchor transaction selector, error: %w", err) } return nil @@ -61,7 +61,7 @@ func (v *AnchorTxValidator) GetAndValidateAnchorTxReceipt( ) (*types.Receipt, error) { receipt, err := v.rpc.L2.TransactionReceipt(ctx, tx.Hash()) if err != nil { - return nil, fmt.Errorf("failed to get TaikoL2.anchor transaction receipt, err: %w", err) + return nil, fmt.Errorf("failed to get TaikoL2.anchor transaction receipt, error: %w", err) } if receipt.Status != types.ReceiptStatusSuccessful { diff --git a/prover/event_handler/transition_proved_test.go b/prover/event_handler/transition_proved_test.go index d36b456ce..edc5c15ba 100644 --- a/prover/event_handler/transition_proved_test.go +++ b/prover/event_handler/transition_proved_test.go @@ -63,6 +63,7 @@ func (s *EventHandlerTestSuite) SetupTest() { testState, tracker, 0, + nil, ) s.Nil(err) diff --git a/prover/proof_producer/sgx_producer.go b/prover/proof_producer/sgx_producer.go index e5b4df503..f9efabe07 100644 --- a/prover/proof_producer/sgx_producer.go +++ b/prover/proof_producer/sgx_producer.go @@ -123,7 +123,7 @@ func (s *SGXProofProducer) callProverDaemon(ctx context.Context, opts *ProofRequ } output, err := s.requestProof(opts) if err != nil { - log.Error("Failed to request proof", "height", opts.BlockID, "err", err, "endpoint", s.RaikoHostEndpoint) + log.Error("Failed to request proof", "height", opts.BlockID, "error", err, "endpoint", s.RaikoHostEndpoint) return err } diff --git a/prover/proof_submitter/proof_submitter.go b/prover/proof_submitter/proof_submitter.go index deaad60a4..e905af3a6 100644 --- a/prover/proof_submitter/proof_submitter.go +++ b/prover/proof_submitter/proof_submitter.go @@ -67,7 +67,7 @@ func NewProofSubmitter( func (s *ProofSubmitter) RequestProof(ctx context.Context, event *bindings.TaikoL1ClientBlockProposed) error { l1Origin, err := s.rpc.WaitL1Origin(ctx, event.BlockId) if err != nil { - return fmt.Errorf("failed to fetch l1Origin, blockID: %d, err: %w", event.BlockId, err) + return fmt.Errorf("failed to fetch l1Origin, blockID: %d, error: %w", event.BlockId, err) } // Get the header of the block to prove from L2 execution engine. diff --git a/prover/proof_submitter/proof_submitter_test.go b/prover/proof_submitter/proof_submitter_test.go index 43793aaad..1a24c76c3 100644 --- a/prover/proof_submitter/proof_submitter_test.go +++ b/prover/proof_submitter/proof_submitter_test.go @@ -104,6 +104,7 @@ func (s *ProofSubmitterTestSuite) SetupTest() { testState, tracker, 0, + nil, ) s.Nil(err)