From aaec1bbdd54df6d60ce39428febbb2747838c31a Mon Sep 17 00:00:00 2001 From: jeff <113397187+cyberhorsey@users.noreply.github.com> Date: Fri, 14 Jul 2023 02:36:54 -0700 Subject: [PATCH] fix(prover): add end height for block filtering if `startHeight` is not nil, and don't block when notifying (#317) Co-authored-by: David --- .../proof_submitter/valid_proof_submitter.go | 2 +- prover/prover.go | 135 ++++++++++++++---- 2 files changed, 110 insertions(+), 27 deletions(-) diff --git a/prover/proof_submitter/valid_proof_submitter.go b/prover/proof_submitter/valid_proof_submitter.go index e297fb6e1..c3a4fc2d4 100644 --- a/prover/proof_submitter/valid_proof_submitter.go +++ b/prover/proof_submitter/valid_proof_submitter.go @@ -114,7 +114,7 @@ func (s *ValidProofSubmitter) RequestProof(ctx context.Context, event *bindings. signalRoot, err := s.rpc.GetStorageRoot(ctx, s.rpc.L2GethClient, s.l2SignalService, block.Number()) if err != nil { - return fmt.Errorf("error getting storageroot: %w", err) + return fmt.Errorf("failed to get storage root: %w", err) } // Request proof. diff --git a/prover/prover.go b/prover/prover.go index 3405e1091..780f4b9f8 100644 --- a/prover/prover.go +++ b/prover/prover.go @@ -65,7 +65,7 @@ type Prover struct { blockVerifiedSub event.Subscription proverSlashedCh chan *bindings.TaikoL1ProverPoolSlashed proverSlashedSub event.Subscription - proveNotify chan *big.Int + proveNotify chan struct{} // Proof related proofGenerationCh chan *proofProducer.ProofWithHeader @@ -142,7 +142,7 @@ func InitFromConfig(ctx context.Context, p *Prover, cfg *Config) (err error) { p.blockProvenCh = make(chan *bindings.TaikoL1ClientBlockProven, chBufferSize) p.proofGenerationCh = make(chan *proofProducer.ProofWithHeader, chBufferSize) p.proverSlashedCh = make(chan *bindings.TaikoL1ProverPoolSlashed, chBufferSize) - p.proveNotify = make(chan *big.Int, 1) + p.proveNotify = make(chan struct{}, 1) if err := p.initL1Current(cfg.StartingBlockID); err != nil { return fmt.Errorf("initialize L1 current cursor error: %w", err) } @@ -235,7 +235,7 @@ func (p *Prover) eventLoop() { // if we are already proving. reqProving := func() { select { - case p.proveNotify <- nil: + case p.proveNotify <- struct{}{}: default: } } @@ -255,8 +255,7 @@ func (p *Prover) eventLoop() { ) defer verificationCheckTicker.Stop() - checkProofWindowExpiredTicker := time.NewTicker(p.checkProofWindowExpiredInterval) - defer checkProofWindowExpiredTicker.Stop() + checkProofWindowExpiredTicker := time.After(p.checkProofWindowExpiredInterval) // Call reqProving() right away to catch up with the latest state. reqProving() @@ -272,14 +271,17 @@ func (p *Prover) eventLoop() { ); err != nil { log.Error("Check chain verification error", "error", err) } - case <-checkProofWindowExpiredTicker.C: - if err := p.checkProofWindowsExpired(p.ctx); err != nil { - log.Error("error checking proof window expired", "error", err) - } + case <-checkProofWindowExpiredTicker: + func() { + defer func() { checkProofWindowExpiredTicker = time.After(p.checkProofWindowExpiredInterval) }() + if err := p.checkProofWindowsExpired(p.ctx); err != nil { + log.Error("error checking proof window expired", "error", err) + } + }() case proofWithHeader := <-p.proofGenerationCh: p.submitProofOp(p.ctx, proofWithHeader) - case startHeight := <-p.proveNotify: - if err := p.proveOp(startHeight); err != nil { + case <-p.proveNotify: + if err := p.proveOp(); err != nil { log.Error("Prove new blocks error", "error", err) } case <-p.blockProposedCh: @@ -310,12 +312,9 @@ func (p *Prover) Close() { // proveOp performs a proving operation, find current unproven blocks, then // request generating proofs for them. -func (p *Prover) proveOp(startHeight *big.Int) error { +func (p *Prover) proveOp() error { firstTry := true - if startHeight == nil { - startHeight = new(big.Int).SetUint64(p.l1Current) - } for firstTry || p.reorgDetectedFlag { p.reorgDetectedFlag = false firstTry = false @@ -323,7 +322,7 @@ func (p *Prover) proveOp(startHeight *big.Int) error { iter, err := eventIterator.NewBlockProposedIterator(p.ctx, &eventIterator.BlockProposedIteratorConfig{ Client: p.rpc.L1, TaikoL1: p.rpc.TaikoL1, - StartHeight: startHeight, + StartHeight: new(big.Int).SetUint64(p.l1Current), OnBlockProposedEvent: p.onBlockProposed, }) if err != nil { @@ -520,6 +519,8 @@ func (p *Prover) onBlockProposed( block.AssignedProver.Hex(), "proofWindowExpiresAt", proofWindowExpiresAt, + "timeToExipre", + proofWindowExpiresAt-uint64(time.Now().Unix()), ) // if we cant prove it now, but config is set to wait and try to prove @@ -533,6 +534,7 @@ func (p *Prover) onBlockProposed( "proofWindowExpiresAt", proofWindowExpiresAt, ) + p.currentBlocksWaitingForProofWindowMutex.Lock() p.currentBlocksWaitingForProofWindow[event.Meta.Id] = event.Raw.BlockNumber p.currentBlocksWaitingForProofWindowMutex.Unlock() @@ -674,8 +676,7 @@ func (p *Prover) onBlockProven(ctx context.Context, event *bindings.TaikoL1Clien } else { // generate oracle proof if oracle prover, proof is invalid if p.cfg.OracleProver { - // call proveNotify and pass in the L1 start height - p.proveNotify <- new(big.Int).SetUint64(event.Raw.BlockNumber) + return p.requestProofForBlockId(event.BlockId, new(big.Int).SetUint64(event.Raw.BlockNumber)) } } @@ -820,6 +821,9 @@ func (p *Prover) cancelProof(ctx context.Context, blockID uint64) { // which are blocks that have been proposed, but we were not selected as the prover. if the proof window // has expired, we can start generating a proof for them. func (p *Prover) checkProofWindowsExpired(ctx context.Context) error { + p.currentBlocksWaitingForProofWindowMutex.Lock() + defer p.currentBlocksWaitingForProofWindowMutex.Unlock() + for blockId, l1Height := range p.currentBlocksWaitingForProofWindow { if err := p.checkProofWindowExpired(ctx, l1Height, blockId); err != nil { return err @@ -832,15 +836,17 @@ func (p *Prover) checkProofWindowsExpired(ctx context.Context) error { // checkProofWindowExpired checks a single instance of a block to see if its proof winodw has expired // and the proof is now able to be submitted by anyone, not just the blocks assigned prover. func (p *Prover) checkProofWindowExpired(ctx context.Context, l1Height, blockId uint64) error { - p.currentBlocksWaitingForProofWindowMutex.Lock() - defer p.currentBlocksWaitingForProofWindowMutex.Unlock() - block, err := p.rpc.TaikoL1.GetBlock(nil, new(big.Int).SetUint64(blockId)) if err != nil { return encoding.TryParsingCustomError(err) } - if time.Now().Unix() > int64(block.ProposedAt)+int64(block.ProofWindow) { + isExpired := time.Now().Unix() > int64(block.ProposedAt)+int64(block.ProofWindow) + + if isExpired { + log.Debug( + "Block proof window is expired", "blockID", blockId, "l1Height", l1Height) + // we should remove this block from being watched regardless of whether the block // has a valid proof delete(p.currentBlocksWaitingForProofWindow, blockId) @@ -864,14 +870,17 @@ func (p *Prover) checkProofWindowExpired(ctx context.Context, l1Height, blockId } if forkChoice.Prover == zeroAddress { - log.Info("proof window for proof not assigned to us expired, requesting proof", + log.Info( + "Proof window for proof not assigned to us expired, requesting proof", "blockID", blockId, "l1Height", l1Height, ) // we can generate the proof, no proof came in by proof window expiring - p.proveNotify <- big.NewInt(int64(l1Height)) + if err := p.requestProofForBlockId(new(big.Int).SetUint64(blockId), new(big.Int).SetUint64(l1Height)); err != nil { + return err + } } else { // we need to check the block hash vs the proof's blockHash to see // if the proof is valid or not @@ -883,7 +892,8 @@ func (p *Prover) checkProofWindowExpired(ctx context.Context, l1Height, blockId // if the hashes dont match, we can generate proof even though // a proof came in before proofwindow expired. if block.Hash() != forkChoice.BlockHash { - log.Info("invalid proof detected while watching for proof window expiration, requesting proof", + log.Info( + "Invalid proof detected while watching for proof window expiration, requesting proof", "blockID", blockId, "l1Height", @@ -895,7 +905,9 @@ func (p *Prover) checkProofWindowExpired(ctx context.Context, l1Height, blockId ) // we can generate the proof, the proof is incorrect since blockHash does not match // the correct one but parentHash/gasUsed are correct. - p.proveNotify <- new(big.Int).SetUint64(l1Height) + if err := p.requestProofForBlockId(new(big.Int).SetUint64(blockId), new(big.Int).SetUint64(l1Height)); err != nil { + return err + } } } } @@ -903,3 +915,74 @@ func (p *Prover) checkProofWindowExpired(ctx context.Context, l1Height, blockId // otherwise, keep it in the map and check again next iteration return nil } + +// proveOp performs a proving operation, find current unproven blocks, then +// request generating proofs for them. +func (p *Prover) requestProofForBlockId(blockId *big.Int, l1Height *big.Int) error { + onBlockProposed := func( + ctx context.Context, + event *bindings.TaikoL1ClientBlockProposed, + end eventIterator.EndBlockProposedEventIterFunc, + ) error { + // only filter for exact blockID we want + if event.BlockId.Cmp(blockId) != 0 { + return nil + } + + // Check whether the block has been verified. + isVerified, err := p.isBlockVerified(event.BlockId) + if err != nil { + return fmt.Errorf("failed to check if the current L2 block is verified: %w", err) + } + + if isVerified { + log.Info("📋 Block has been verified", "blockID", event.BlockId) + return nil + } + + ctx, cancelCtx := context.WithCancel(ctx) + p.currentBlocksBeingProvenMutex.Lock() + p.currentBlocksBeingProven[event.BlockId.Uint64()] = cancelFunc(func() { + defer cancelCtx() + if err := p.validProofSubmitter.CancelProof(ctx, event.BlockId); err != nil { + log.Error("Failed to cancel proof", "error", err, "blockID", event.BlockId) + } + }) + p.currentBlocksBeingProvenMutex.Unlock() + + if err := p.validProofSubmitter.RequestProof(ctx, event); err != nil { + return err + } + + return nil + } + + p.proposeConcurrencyGuard <- struct{}{} + + go func() { + defer func() { <-p.proposeConcurrencyGuard }() + + if err := backoff.Retry( + func() error { + iter, err := eventIterator.NewBlockProposedIterator(p.ctx, &eventIterator.BlockProposedIteratorConfig{ + Client: p.rpc.L1, + TaikoL1: p.rpc.TaikoL1, + StartHeight: l1Height, + EndHeight: new(big.Int).Add(l1Height, common.Big1), + OnBlockProposedEvent: onBlockProposed, + FilterQuery: []*big.Int{blockId}, + }) + if err != nil { + return err + } + + return iter.Iter() + }, + backoff.WithMaxRetries(backoff.NewConstantBackOff(p.cfg.BackOffRetryInterval), p.cfg.BackOffMaxRetrys), + ); err != nil { + log.Error("Request proof with a given block ID", "blockID", blockId, "error", err) + } + }() + + return nil +}