Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/catalyst: in zero-period dev mode and txs/withdrawals pending, commit as many non-empty blocks as possible #30104

Closed
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 66 additions & 29 deletions eth/catalyst/simulated_beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,36 +37,56 @@ import (
)

const devEpochLength = 32
const maxWithdrawalCount = 10

// withdrawalQueue implements a FIFO queue which holds withdrawals that are
// pending inclusion.
type withdrawalQueue struct {
pending chan *types.Withdrawal
queue types.Withdrawals
pending chan struct{} // channel to notify when there are pending withdrawals in the queue
mu sync.Mutex // mutex to gate access to the queue
}

// add queues a withdrawal for future inclusion.
func (w *withdrawalQueue) add(withdrawal *types.Withdrawal) error {
w.mu.Lock()
defer w.mu.Unlock()

if len(w.queue) == maxWithdrawalCount {
return errors.New("withdrawal queue full")
}
w.queue = append(w.queue, withdrawal)
// send a notification to the pending channel without blocking
select {
case w.pending <- withdrawal:
break
case w.pending <- struct{}{}:
default:
return errors.New("withdrawal queue full")
}
return nil
}

// gatherPending returns a number of queued withdrawals up to a maximum count.
func (w *withdrawalQueue) gatherPending(maxCount int) []*types.Withdrawal {
withdrawals := []*types.Withdrawal{}
for {
func (w *withdrawalQueue) gatherPending(gatherCount int) []*types.Withdrawal {
w.mu.Lock()
defer w.mu.Unlock()

if gatherCount > len(w.queue) {
gatherCount = len(w.queue)
}
return w.queue[:gatherCount]
}

// popFront removes a number of withdrawals from the front of the queue
// up to a maximum count.
func (w *withdrawalQueue) popFront(count int) {
w.mu.Lock()
defer w.mu.Unlock()

w.queue = w.queue[count:]
// if the queue is still non-empty, notify so they will be picked up
if len(w.queue) > 0 {
select {
case withdrawal := <-w.pending:
withdrawals = append(withdrawals, withdrawal)
if len(withdrawals) == maxCount {
return withdrawals
}
case w.pending <- struct{}{}:
default:
return withdrawals
}
}
}
Expand Down Expand Up @@ -112,7 +132,7 @@ func NewSimulatedBeacon(period uint64, eth *eth.Ethereum) (*SimulatedBeacon, err
engineAPI: engineAPI,
lastBlockTime: block.Time,
curForkchoiceState: current,
withdrawals: withdrawalQueue{make(chan *types.Withdrawal, 20)},
withdrawals: withdrawalQueue{pending: make(chan struct{})},
}, nil
}

Expand Down Expand Up @@ -142,7 +162,7 @@ func (c *SimulatedBeacon) Stop() error {

// sealBlock initiates payload building for a new block and creates a new block
// with the completed payload.
func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp uint64) error {
func (c *SimulatedBeacon) sealBlock(allowEmpty bool, timestamp uint64) (committed bool, err error) {
if timestamp <= c.lastBlockTime {
timestamp = c.lastBlockTime + 1
}
Expand All @@ -156,6 +176,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
c.setCurrentState(header.Hash(), *finalizedHash)
}

withdrawals := c.withdrawals.gatherPending(maxWithdrawalCount)
var random [32]byte
rand.Read(random[:])
fcResponse, err := c.engineAPI.forkchoiceUpdated(c.curForkchoiceState, &engine.PayloadAttributes{
Expand All @@ -166,23 +187,26 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
BeaconRoot: &common.Hash{},
}, engine.PayloadV3, true)
if err != nil {
return err
return false, err
}
if fcResponse == engine.STATUS_SYNCING {
return errors.New("chain rewind prevented invocation of payload creation")
return false, errors.New("chain rewind prevented invocation of payload creation")
}
envelope, err := c.engineAPI.getPayload(*fcResponse.PayloadID, true)
if err != nil {
return err
return false, err
}
payload := envelope.ExecutionPayload
if !allowEmpty && len(payload.Transactions) == 0 && len(payload.Withdrawals) == 0 {
return false, nil
}

var finalizedHash common.Hash
if payload.Number%devEpochLength == 0 {
finalizedHash = payload.BlockHash
} else {
if fh := c.finalizedBlockHash(payload.Number); fh == nil {
return errors.New("chain rewind interrupted calculation of finalized block hash")
return false, errors.New("chain rewind interrupted calculation of finalized block hash")
} else {
finalizedHash = *fh
}
Expand All @@ -195,24 +219,25 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
for _, commit := range envelope.BlobsBundle.Commitments {
var c kzg4844.Commitment
if len(commit) != len(c) {
return errors.New("invalid commitment length")
return false, errors.New("invalid commitment length")
}
copy(c[:], commit)
blobHashes = append(blobHashes, kzg4844.CalcBlobHashV1(hasher, &c))
}
}
// Mark the payload as canon
if _, err = c.engineAPI.NewPayloadV3(*payload, blobHashes, &common.Hash{}); err != nil {
return err
return false, err
}
c.setCurrentState(payload.BlockHash, finalizedHash)

// Mark the block containing the payload as canonical
if _, err = c.engineAPI.ForkchoiceUpdatedV2(c.curForkchoiceState, nil); err != nil {
return err
return false, err
}
c.lastBlockTime = payload.Timestamp
return nil
c.withdrawals.popFront(len(withdrawals))
return true, nil
}

// loop runs the block production loop for non-zero period configuration
Expand All @@ -223,8 +248,7 @@ func (c *SimulatedBeacon) loop() {
case <-c.shutdownCh:
return
case <-timer.C:
withdrawals := c.withdrawals.gatherPending(10)
if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
if _, err := c.sealBlock(true, uint64(time.Now().Unix())); err != nil {
log.Warn("Error performing sealing work", "err", err)
} else {
timer.Reset(time.Second * time.Duration(c.period))
Expand Down Expand Up @@ -260,13 +284,26 @@ func (c *SimulatedBeacon) setCurrentState(headHash, finalizedHash common.Hash) {

// Commit seals a block on demand.
func (c *SimulatedBeacon) Commit() common.Hash {
withdrawals := c.withdrawals.gatherPending(10)
if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
if _, err := c.sealBlock(true, uint64(time.Now().Unix())); err != nil {
log.Warn("Error performing sealing work", "err", err)
}
return c.eth.BlockChain().CurrentBlock().Hash()
}

// commitUntilEmpty seals blocks until there are now transactions or withdrawals
// left to include
func (c *SimulatedBeacon) commitUntilEmpty() {
for {
committed, err := c.sealBlock(false, uint64(time.Now().Unix()))
if err != nil {
log.Error("failed to seal block", "err", err)
}
if !committed {
return
}
}
}

// Rollback un-sends previously added transactions.
func (c *SimulatedBeacon) Rollback() {
// Flush all transactions from the transaction pools
Expand Down Expand Up @@ -301,8 +338,8 @@ func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error {
if parent == nil {
return errors.New("parent not found")
}
withdrawals := c.withdrawals.gatherPending(10)
return c.sealBlock(withdrawals, parent.Time+uint64(adjustment))
_, err := c.sealBlock(true, parent.Time+uint64(adjustment))
return err
}

func RegisterSimulatedBeaconAPIs(stack *node.Node, sim *SimulatedBeacon) {
Expand Down
11 changes: 3 additions & 8 deletions eth/catalyst/simulated_beacon_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ package catalyst

import (
"context"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)

type api struct {
Expand All @@ -41,13 +39,10 @@ func (a *api) loop() {
select {
case <-a.sim.shutdownCh:
return
case w := <-a.sim.withdrawals.pending:
withdrawals := append(a.sim.withdrawals.gatherPending(9), w)
if err := a.sim.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
log.Warn("Error performing sealing work", "err", err)
}
case <-a.sim.withdrawals.pending:
a.sim.commitUntilEmpty()
case <-newTxs:
a.sim.Commit()
a.sim.commitUntilEmpty()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion eth/catalyst/simulated_beacon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) {
defer subscription.Unsubscribe()

// generate some withdrawals
for i := 0; i < 20; i++ {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case should have been like this originally. The withdrawal queue has a max size of 10.

Previously, this passed because there was no contention between adding withdrawals and committing blocks. Now that access to withdrawals is gated behind a mutex, the contention causes all withdrawals in the loop to be added without simultaneously sealing blocks.

for i := 0; i < 10; i++ {
withdrawals = append(withdrawals, types.Withdrawal{Index: uint64(i)})
if err := mock.withdrawals.add(&withdrawals[i]); err != nil {
t.Fatal("addWithdrawal failed", err)
Expand Down