Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DA Check For Data Columns #13938

Merged
merged 8 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion beacon-chain/blockchain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ go_library(
"receive_attestation.go",
"receive_blob.go",
"receive_block.go",
"receive_sidecar.go",
"receive_data_column.go",
"service.go",
"tracked_proposer.go",
"weak_subjectivity_checks.go",
Expand All @@ -49,6 +49,7 @@ go_library(
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
Expand Down Expand Up @@ -158,6 +159,7 @@ go_test(
"//beacon-chain/operations/slashings:go_default_library",
"//beacon-chain/operations/voluntaryexits:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/blockchain/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
)

var errMaxBlobsExceeded = errors.New("Expected commitments in block exceeds MAX_BLOBS_PER_BLOCK")
var errMaxDataColumnsExceeded = errors.New("Expected data columns for node exceeds NUMBER_OF_COLUMNS")

// An invalid block is the block that fails state transition based on the core protocol rules.
// The beacon node shall not be accepting nor building blocks that branch off from an invalid block.
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/blockchain/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ func WithBLSToExecPool(p blstoexec.PoolManager) Option {
}

// WithP2PBroadcaster to broadcast messages after appropriate processing.
func WithP2PBroadcaster(p p2p.Broadcaster) Option {
func WithP2PBroadcaster(p p2p.Acceser) Option {
return func(s *Service) error {
s.cfg.P2p = p
s.cfg.P2P = p
return nil
}
}
Expand Down
104 changes: 104 additions & 0 deletions beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"

Expand Down Expand Up @@ -514,12 +515,35 @@ func missingIndices(bs *filesystem.BlobStorage, root [32]byte, expected [][]byte
return missing, nil
}

func missingDataColumns(bs *filesystem.BlobStorage, root [32]byte, expected map[uint64]bool) (map[uint64]bool, error) {
if len(expected) == 0 {
return nil, nil
}
if len(expected) > int(params.BeaconConfig().NumberOfColumns) {
return nil, errMaxDataColumnsExceeded
}
indices, err := bs.ColumnIndices(root)
if err != nil {
return nil, err
}
missing := make(map[uint64]bool, len(expected))
for col := range expected {
if !indices[col] {
missing[col] = true
}
}
return missing, nil
}

// isDataAvailable blocks until all BlobSidecars committed to in the block are available,
// or an error or context cancellation occurs. A nil result means that the data availability check is successful.
// The function will first check the database to see if all sidecars have been persisted. If any
// sidecars are missing, it will then read from the blobNotifier channel for the given root until the channel is
// closed, the context hits cancellation/timeout, or notifications have been received for all the missing sidecars.
func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error {
if features.Get().EnablePeerDAS {
return s.isDataAvailableDataColumns(ctx, root, signed)
}
if signed.Version() < version.Deneb {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe create a s.isDataAbiableBlobs so blobs and data columns are treated equally?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not straightforward due to how we only need a subset of the columns vs all of the blobs. But i can see if its possible

return nil
}
Expand Down Expand Up @@ -591,6 +615,86 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int
}
}

func (s *Service) isDataAvailableDataColumns(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error {
if signed.Version() < version.Deneb {
return nil
}

block := signed.Block()
if block == nil {
return errors.New("invalid nil beacon block")
}
// We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
if !params.WithinDAPeriod(slots.ToEpoch(block.Slot()), slots.ToEpoch(s.CurrentSlot())) {
return nil
}
body := block.Body()
if body == nil {
return errors.New("invalid nil beacon block body")
}
kzgCommitments, err := body.BlobKzgCommitments()
if err != nil {
return errors.Wrap(err, "could not get KZG commitments")
}
// If block has not commitments there is nothing to wait for.
if len(kzgCommitments) == 0 {
return nil
}

colMap, err := peerdas.CustodyColumns(s.cfg.P2P.NodeID(), params.BeaconConfig().CustodyRequirement)
if err != nil {
return err
}
// expected is the number of custodied data columnns a node is expected to have.
expected := len(colMap)
if expected == 0 {
return nil
}
// get a map of data column indices that are not currently available.
missing, err := missingDataColumns(s.blobStorage, root, colMap)
if err != nil {
return err
}
// If there are no missing indices, all data column sidecars are available.
if len(missing) == 0 {
return nil
}

// The gossip handler for data columns writes the index of each verified data column referencing the given
// root to the channel returned by blobNotifiers.forRoot.
nc := s.blobNotifiers.forRoot(root)

// Log for DA checks that cross over into the next slot; helpful for debugging.
nextSlot := slots.BeginsAt(signed.Block().Slot()+1, s.genesisTime)
// Avoid logging if DA check is called after next slot start.
if nextSlot.After(time.Now()) {
nst := time.AfterFunc(time.Until(nextSlot), func() {
if len(missing) == 0 {
return
}
log.WithFields(daCheckLogFields(root, signed.Block().Slot(), expected, len(missing))).
Error("Still waiting for DA check at slot end.")
})
defer nst.Stop()
}
for {
select {
case idx := <-nc:
// Delete each index seen in the notification channel.
delete(missing, idx)
// Read from the channel until there are no more missing sidecars.
if len(missing) > 0 {
continue
}
// Once all sidecars have been observed, clean up the notification channel.
s.blobNotifiers.delete(root)
return nil
case <-ctx.Done():
nalepae marked this conversation as resolved.
Show resolved Hide resolved
return errors.Wrapf(ctx.Err(), "context deadline waiting for blob sidecars slot: %d, BlockRoot: %#x", block.Slot(), root)
}
}
}

func daCheckLogFields(root [32]byte, slot primitives.Slot, expected, missing int) logrus.Fields {
return logrus.Fields{
"slot": slot,
Expand Down
22 changes: 22 additions & 0 deletions beacon-chain/blockchain/receive_data_column.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package blockchain

import (
"context"

ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
)

func (s *Service) ReceiveDataColumn(ctx context.Context, ds *ethpb.DataColumnSidecar) error {
if err := s.blobStorage.SaveDataColumn(ds); err != nil {
return err
}
hRoot, err := ds.SignedBlockHeader.Header.HashTreeRoot()
if err != nil {
return err
}

// TODO use a custom event or new method of for data columns. For speed
// we are simply reusing blob paths here.
s.sendNewBlobEvent(hRoot, uint64(ds.SignedBlockHeader.Header.Slot))
return nil
}
12 changes: 0 additions & 12 deletions beacon-chain/blockchain/receive_sidecar.go

This file was deleted.

2 changes: 1 addition & 1 deletion beacon-chain/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type config struct {
ExitPool voluntaryexits.PoolManager
SlashingPool slashings.PoolManager
BLSToExecPool blstoexec.PoolManager
P2p p2p.Broadcaster
P2P p2p.Acceser
MaxRoutines int
StateNotifier statefeed.Notifier
ForkChoiceStore f.ForkChoicer
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service {
WithAttestationPool(attestations.NewPool()),
WithSlashingPool(slashings.NewPool()),
WithExitPool(voluntaryexits.NewPool()),
WithP2PBroadcaster(&mockBroadcaster{}),
WithP2PBroadcaster(&mockAccesser{}),
WithStateNotifier(&mockBeaconNode{}),
WithForkChoiceStore(fc),
WithAttestationService(attService),
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/blockchain/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/blstoexec"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2pTesting "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
Expand All @@ -44,6 +45,11 @@ type mockBroadcaster struct {
broadcastCalled bool
}

type mockAccesser struct {
mockBroadcaster
p2pTesting.MockPeerManager
}

func (mb *mockBroadcaster) Broadcast(_ context.Context, _ proto.Message) error {
mb.broadcastCalled = true
return nil
Expand Down
Loading
Loading