Skip to content

Commit

Permalink
Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
stompesi committed Feb 7, 2025
1 parent 74ded3d commit 4a9715f
Showing 1 changed file with 99 additions and 101 deletions.
200 changes: 99 additions & 101 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,104 @@ func (f *finalizer) requestFinalizeBlockAndGetRawTransactions(ctx context.Contex
}
}

// finalizeBatches runs the endless loop for processing transactions finalizing batches.
func (f *finalizer) finalizeBatchesWithSbb(ctx context.Context) error {
log.Debug("finalizer init loop with SBB")

for {
if f.wipL2Block.timestamp+uint64(f.cfg.L2BlockMaxDeltaTimestamp.Seconds()) <= uint64(time.Now().Unix()) {
lastL2Block, err := f.stateIntf.GetLastL2Block(ctx, nil)
if err != nil {
log.Fatalf("failed to get last L2 block number, error: %v", err)
}

targetBlockNumber := lastL2Block.Number().Uint64() + 1

txs, exists := f.blockTransactions[targetBlockNumber]

for !exists {
f.workerReadyTxsCond.L.Lock()
f.workerReadyTxsCond.WaitOrTimeout(f.cfg.NewTxsWaitInterval.Duration)
f.workerReadyTxsCond.L.Unlock()

txs, exists = f.blockTransactions[targetBlockNumber]
}

log.Debug("stompesi - submitRawTransactions - targetBlockNumber: ", targetBlockNumber)

if len(txs) > 0 {
fmt.Println("stompesi - submitRawTransactions - targetBlockNumber: ", targetBlockNumber)
fmt.Println("stompesi - submitRawTransactions - tx_count: ", txs.Len())

for _, tx := range txs {
processBatchResponse, err := f.stateIntf.PreProcessTransaction(ctx, tx, nil)

if err != nil {
log.Errorf("failed to pre-process tx %s, error: %v", tx.Hash().String(), err)
continue
}

poolTx := pool.NewTransaction(*tx, "", false)
poolTx.ZKCounters = processBatchResponse.UsedZkCounters
poolTx.ReservedZKCounters = processBatchResponse.ReservedZkCounters

txTracker, _ := f.workerIntf.NewTxTracker(poolTx.Transaction, poolTx.ZKCounters, poolTx.ReservedZKCounters, poolTx.IP)

firstTxProcess := true

for {
var err error
_, err = f.processTransaction(ctx, txTracker, firstTxProcess)
if err != nil {
if err == ErrEffectiveGasPriceReprocess {
firstTxProcess = false
log.Infof("reprocessing tx %s because of effective gas price calculation", txTracker.HashStr)
continue
} else if err == ErrBatchResourceOverFlow {
log.Infof("skipping tx %s due to a batch resource overflow", tx.Hash())
break
} else {
log.Errorf("failed to process tx %s, error: %v", err)
break
}
}
break
}
}
}

f.finalizeWIPL2Block(ctx)
delete(f.blockTransactions, targetBlockNumber)
}

idleTime := time.Now()

// wait for new ready txs in worker
f.workerReadyTxsCond.L.Lock()
f.workerReadyTxsCond.WaitOrTimeout(f.cfg.NewTxsWaitInterval.Duration)
f.workerReadyTxsCond.L.Unlock()

// Increase idle time of the WIP L2Block
f.wipL2Block.metrics.idleTime += time.Since(idleTime)

if f.haltFinalizer.Load() {
// There is a fatal error and we need to halt the finalizer and stop processing new txs
for {
time.Sleep(5 * time.Second) //nolint:gomnd
}
}

if finalize, closeReason := f.checkIfFinalizeBatch(); finalize {
f.finalizeWIPBatch(ctx, closeReason)
}

if err := ctx.Err(); err != nil {
log.Errorf("stopping finalizer because of context, error: %v", err)
return err
}
}
}

// updateProverIdAndFlushId updates the prover id and flush id
func (f *finalizer) updateProverIdAndFlushId(ctx context.Context) {
for {
Expand Down Expand Up @@ -706,8 +804,8 @@ func (f *finalizer) getRawTransactions(ctx context.Context, finalizedBlockNumber
if err = f.increaseLeaderSequencerIndex(uint64(sequencerCount), leaderSequencerIndex); err != nil {
return nil, err
}
continue
}

var transactions []*types.Transaction
for _, hexString := range res.RawTransactions {
transaction, _ := hexToTransaction(hexString)
Expand Down Expand Up @@ -735,106 +833,6 @@ func Retry(ctx context.Context, fn func() error, retryInterval time.Duration) {
}
}

// finalizeBatches runs the endless loop for processing transactions finalizing batches.
func (f *finalizer) finalizeBatchesWithSbb(ctx context.Context) error {
log.Debug("finalizer init loop with SBB")

for {
if f.wipL2Block.timestamp+uint64(f.cfg.L2BlockMaxDeltaTimestamp.Seconds()) <= uint64(time.Now().Unix()) {

lastL2Block, err := f.stateIntf.GetLastL2Block(ctx, nil)
if err != nil {
log.Fatalf("failed to get last L2 block number, error: %v", err)
}

targetBlockNumber := lastL2Block.Number().Uint64() + 1

txs, exists := f.blockTransactions[targetBlockNumber]

for !exists {
f.workerReadyTxsCond.L.Lock()
f.workerReadyTxsCond.WaitOrTimeout(f.cfg.NewTxsWaitInterval.Duration)
f.workerReadyTxsCond.L.Unlock()

txs, exists = f.blockTransactions[targetBlockNumber]
}

log.Debug("stompesi - submitRawTransactions - targetBlockNumber: ", targetBlockNumber)

if len(txs) > 0 {
fmt.Println("stompesi - submitRawTransactions - targetBlockNumber: ", targetBlockNumber)
fmt.Println("stompesi - submitRawTransactions - tx_count: ", txs.Len())

for _, tx := range txs {
processBatchResponse, err := f.stateIntf.PreProcessTransaction(ctx, tx, nil)

if err != nil {
log.Errorf("failed to pre-process tx %s, error: %v", tx.Hash().String(), err)
continue
}

poolTx := pool.NewTransaction(*tx, "", false)
poolTx.ZKCounters = processBatchResponse.UsedZkCounters
poolTx.ReservedZKCounters = processBatchResponse.ReservedZkCounters

txTracker, _ := f.workerIntf.NewTxTracker(poolTx.Transaction, poolTx.ZKCounters, poolTx.ReservedZKCounters, poolTx.IP)

firstTxProcess := true

for {
var err error
_, err = f.processTransaction(ctx, txTracker, firstTxProcess)
if err != nil {
if err == ErrEffectiveGasPriceReprocess {
firstTxProcess = false
log.Infof("reprocessing tx %s because of effective gas price calculation", txTracker.HashStr)
continue
} else if err == ErrBatchResourceOverFlow {
log.Infof("Batch resource overflow", txTracker.HashStr)
f.finalizeWIPBatchSbbVersion(ctx, state.ResourceMarginExhaustedClosingReason)
continue
} else {
log.Errorf("failed to process tx %s, error: %v", err)
break
}
}
break
}
}
}

f.finalizeWIPL2Block(ctx)
delete(f.blockTransactions, targetBlockNumber)
}

idleTime := time.Now()

// wait for new ready txs in worker
f.workerReadyTxsCond.L.Lock()
f.workerReadyTxsCond.WaitOrTimeout(f.cfg.NewTxsWaitInterval.Duration)
f.workerReadyTxsCond.L.Unlock()

// Increase idle time of the WIP L2Block
f.wipL2Block.metrics.idleTime += time.Since(idleTime)

if f.haltFinalizer.Load() {
// There is a fatal error and we need to halt the finalizer and stop processing new txs
for {
time.Sleep(5 * time.Second) //nolint:gomnd
}
}

if finalize, closeReason := f.checkIfFinalizeBatch(); finalize {
f.finalizeWIPBatchSbbVersion(ctx, closeReason)
}

if err := ctx.Err(); err != nil {
log.Errorf("stopping finalizer because of context, error: %v", err)
return err
}
}
}

// finalizeBatches runs the endless loop for processing transactions finalizing batches.
func (f *finalizer) finalizeBatches(ctx context.Context) {
log.Debug("finalizer init loop")
Expand Down

0 comments on commit 4a9715f

Please sign in to comment.