Skip to content

Commit

Permalink
deps!: Bumps go-header (#2844)
Browse files Browse the repository at this point in the history
Bumps go-header - includes breaks to Getter interface + metrics instantiation for header pkg

Co-authored-by: Hlib Kanunnikov <hlibwondertan@gmail.com>
  • Loading branch information
renaynay and Wondertan committed Oct 13, 2023
1 parent dbf7dbf commit 422dea2
Show file tree
Hide file tree
Showing 15 changed files with 117 additions and 93 deletions.
45 changes: 23 additions & 22 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,29 @@ func (ce *Exchange) GetByHeight(ctx context.Context, height uint64) (*header.Ext
return ce.getExtendedHeaderByHeight(ctx, &intHeight)
}

func (ce *Exchange) GetRangeByHeight(ctx context.Context, from, amount uint64) ([]*header.ExtendedHeader, error) {
func (ce *Exchange) GetRangeByHeight(
ctx context.Context,
from *header.ExtendedHeader,
to uint64,
) ([]*header.ExtendedHeader, error) {
amount := to - (from.Height() + 1)
headers, err := ce.getRangeByHeight(ctx, from.Height()+1, amount)
if err != nil {
return nil, err
}

for _, h := range headers {
err := from.Verify(h)
if err != nil {
return nil, fmt.Errorf("verifying next header against last verified height: %d: %w",
from.Height(), err)
}
from = h
}
return headers, nil
}

func (ce *Exchange) getRangeByHeight(ctx context.Context, from, amount uint64) ([]*header.ExtendedHeader, error) {
if amount == 0 {
return nil, nil
}
Expand Down Expand Up @@ -73,27 +95,6 @@ func (ce *Exchange) GetRangeByHeight(ctx context.Context, from, amount uint64) (
return headers, nil
}

func (ce *Exchange) GetVerifiedRange(
ctx context.Context,
from *header.ExtendedHeader,
amount uint64,
) ([]*header.ExtendedHeader, error) {
headers, err := ce.GetRangeByHeight(ctx, from.Height()+1, amount)
if err != nil {
return nil, err
}

for _, h := range headers {
err := from.Verify(h)
if err != nil {
return nil, fmt.Errorf("verifying next header against last verified height: %d: %w",
from.Height(), err)
}
from = h
}
return headers, nil
}

func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.ExtendedHeader, error) {
log.Debugw("requesting header", "hash", hash.String())
block, err := ce.fetcher.GetBlockByHash(ctx, hash)
Expand Down
23 changes: 21 additions & 2 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
)

func TestCoreExchange_RequestHeaders(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

fetcher, _ := createCoreFetcher(t, DefaultTestConfig())

// generate 10 blocks
Expand All @@ -25,10 +28,26 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
store := createStore(t)

ce := NewExchange(fetcher, store, header.MakeExtendedHeader)
headers, err := ce.GetRangeByHeight(context.Background(), 1, 10)

// initialize store with genesis block
genHeight := int64(1)
genBlock, err := fetcher.GetBlock(ctx, &genHeight)
require.NoError(t, err)
genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes())
require.NoError(t, err)

to := uint64(10)
expectedFirstHeightInRange := genHeader.Height() + 1
expectedLastHeightInRange := to - 1
expectedLenHeaders := to - expectedFirstHeightInRange

// request headers from height 1 to 10 [2:10)
headers, err := ce.GetRangeByHeight(context.Background(), genHeader, to)
require.NoError(t, err)

assert.Equal(t, 10, len(headers))
assert.Len(t, headers, int(expectedLenHeaders))
assert.Equal(t, expectedFirstHeightInRange, headers[0].Height())
assert.Equal(t, expectedLastHeightInRange, headers[len(headers)-1].Height())
}

func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testnode.Context) {
Expand Down
14 changes: 10 additions & 4 deletions core/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@ func TestListener(t *testing.T) {

// create mocknet with two pubsub endpoints
ps0, ps1 := createMocknetWithTwoPubsubEndpoints(ctx, t)
subscriber := p2p.NewSubscriber[*header.ExtendedHeader](ps1, header.MsgID, networkID)
err := subscriber.SetVerifier(func(context.Context, *header.ExtendedHeader) error {
subscriber, err := p2p.NewSubscriber[*header.ExtendedHeader](
ps1,
header.MsgID,
p2p.WithSubscriberNetworkID(networkID),
)
require.NoError(t, err)
err = subscriber.SetVerifier(func(context.Context, *header.ExtendedHeader) error {
return nil
})
require.NoError(t, err)
Expand Down Expand Up @@ -162,8 +167,9 @@ func createListener(
edsSub *shrexsub.PubSub,
store *eds.Store,
) *Listener {
p2pSub := p2p.NewSubscriber[*header.ExtendedHeader](ps, header.MsgID, networkID)
err := p2pSub.Start(ctx)
p2pSub, err := p2p.NewSubscriber[*header.ExtendedHeader](ps, header.MsgID, p2p.WithSubscriberNetworkID(networkID))
require.NoError(t, err)
err = p2pSub.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, p2pSub.Stop(ctx))
Expand Down
6 changes: 1 addition & 5 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,7 @@ func (m getterStub) GetByHeight(_ context.Context, height uint64) (*header.Exten
DAH: &share.Root{RowRoots: make([][]byte, 0)}}, nil
}

func (m getterStub) GetRangeByHeight(context.Context, uint64, uint64) ([]*header.ExtendedHeader, error) {
return nil, nil
}

func (m getterStub) GetVerifiedRange(
func (m getterStub) GetRangeByHeight(
context.Context,
*header.ExtendedHeader,
uint64,
Expand Down
4 changes: 2 additions & 2 deletions docs/adr/adr-009-public-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ GetByHeight(ctx context.Context, height uint64) (*header.ExtendedHeader, error)
// WaitForHeight blocks until the header at the given height has been processed
// by the node's header store or until context deadline is exceeded.
WaitForHeight(ctx context.Context, height uint64) (*header.ExtendedHeader, error)
// GetVerifiedRangeByHeight returns the given range (from:to) of ExtendedHeaders
// GetRangeByHeight returns the given range (from:to) of ExtendedHeaders
// from the node's header store and verifies that the returned headers are
// adjacent to each other.
GetVerifiedRangeByHeight(ctx context.Context, from, to uint64) ([]*ExtendedHeader, error)
GetRangeByHeight(ctx context.Context, from, to uint64) ([]*ExtendedHeader, error)
// Subscribe creates long-living Subscription for newly validated
// ExtendedHeaders. Multiple Subscriptions can be created.
Subscribe(context.Context) (<-chan *header.ExtendedHeader, error)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/celestiaorg/celestia-app v1.0.0
github.com/celestiaorg/go-ds-badger4 v0.0.0-20230712104058-7ede1c814ac5
github.com/celestiaorg/go-fraud v0.2.0
github.com/celestiaorg/go-header v0.3.3
github.com/celestiaorg/go-header v0.4.0
github.com/celestiaorg/go-libp2p-messenger v0.2.0
github.com/celestiaorg/nmt v0.20.0
github.com/celestiaorg/rsmt2d v0.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,8 @@ github.com/celestiaorg/go-ds-badger4 v0.0.0-20230712104058-7ede1c814ac5 h1:MJgXv
github.com/celestiaorg/go-ds-badger4 v0.0.0-20230712104058-7ede1c814ac5/go.mod h1:r6xB3nvGotmlTACpAr3SunxtoXeesbqb57elgMJqflY=
github.com/celestiaorg/go-fraud v0.2.0 h1:aaq2JiW0gTnhEdac3l51UCqSyJ4+VjFGTTpN83V4q7I=
github.com/celestiaorg/go-fraud v0.2.0/go.mod h1:lNY1i4K6kUeeE60Z2VK8WXd+qXb8KRzfBhvwPkK6aUc=
github.com/celestiaorg/go-header v0.3.3 h1:Y04hdJIJfD5hapyqK0ZQMgMTH5PQGV9YpcIf56LGc4E=
github.com/celestiaorg/go-header v0.3.3/go.mod h1:H8xhnDLDLbkpwmWPhCaZyTnIV3dlVxBHPnxNXS2Qu6c=
github.com/celestiaorg/go-header v0.4.0 h1:Ine/xpvFx8o9p6fXW+h2RSPp68rn7VUxTkW1okJxcEY=
github.com/celestiaorg/go-header v0.4.0/go.mod h1:H8xhnDLDLbkpwmWPhCaZyTnIV3dlVxBHPnxNXS2Qu6c=
github.com/celestiaorg/go-libp2p-messenger v0.2.0 h1:/0MuPDcFamQMbw9xTZ73yImqgTO3jHV7wKHvWD/Irao=
github.com/celestiaorg/go-libp2p-messenger v0.2.0/go.mod h1:s9PIhMi7ApOauIsfBcQwbr7m+HBzmVfDIS+QLdgzDSo=
github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc=
Expand Down
3 changes: 3 additions & 0 deletions nodebuilder/header/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
)

// MetricsEnabled will be set during runtime if metrics are enabled on the node.
var MetricsEnabled = false

// Config contains configuration parameters for header retrieval and management.
type Config struct {
// TrustedHash is the Block/Header hash that Nodes use as starting point for header synchronization.
Expand Down
27 changes: 21 additions & 6 deletions nodebuilder/header/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,18 @@ func newP2PExchange[H libhead.Header[H]](
ids[index] = peer.ID
host.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
}
exchange, err := p2p.NewExchange[H](host, ids, conngater,

opts := []p2p.Option[p2p.ClientParameters]{
p2p.WithParams(cfg.Client),
p2p.WithNetworkID[p2p.ClientParameters](network.String()),
p2p.WithChainID(network.String()),
p2p.WithPeerIDStore[p2p.ClientParameters](pidstore),
)
}
if MetricsEnabled {
opts = append(opts, p2p.WithMetrics[p2p.ClientParameters]())
}

exchange, err := p2p.NewExchange[H](host, ids, conngater, opts...)
if err != nil {
return nil, err
}
Expand All @@ -68,10 +74,12 @@ func newSyncer[H libhead.Header[H]](
sub libhead.Subscriber[H],
cfg Config,
) (*sync.Syncer[H], *modfraud.ServiceBreaker[*sync.Syncer[H], H], error) {
syncer, err := sync.NewSyncer[H](ex, store, sub,
sync.WithParams(cfg.Syncer),
sync.WithBlockTime(modp2p.BlockTime),
)
opts := []sync.Option{sync.WithParams(cfg.Syncer), sync.WithBlockTime(modp2p.BlockTime)}
if MetricsEnabled {
opts = append(opts, sync.WithMetrics())
}

syncer, err := sync.NewSyncer[H](ex, store, sub, opts...)
if err != nil {
return nil, nil, err
}
Expand All @@ -96,6 +104,13 @@ func newInitStore[H libhead.Header[H]](
return nil, err
}

if MetricsEnabled {
err = libhead.WithMetrics[H](s)
if err != nil {
return nil, err
}
}

trustedHash, err := cfg.trustedHash(net)
if err != nil {
return nil, err
Expand Down
10 changes: 5 additions & 5 deletions nodebuilder/header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ type Module interface {

// GetByHash returns the header of the given hash from the node's header store.
GetByHash(ctx context.Context, hash libhead.Hash) (*header.ExtendedHeader, error)
// GetVerifiedRangeByHeight returns the given range (from:to) of ExtendedHeaders
// GetRangeByHeight returns the given range (from:to) of ExtendedHeaders
// from the node's header store and verifies that the returned headers are
// adjacent to each other.
GetVerifiedRangeByHeight(
GetRangeByHeight(
ctx context.Context,
from *header.ExtendedHeader,
to uint64,
Expand Down Expand Up @@ -54,7 +54,7 @@ type API struct {
ctx context.Context,
hash libhead.Hash,
) (*header.ExtendedHeader, error) `perm:"read"`
GetVerifiedRangeByHeight func(
GetRangeByHeight func(
context.Context,
*header.ExtendedHeader,
uint64,
Expand All @@ -72,12 +72,12 @@ func (api *API) GetByHash(ctx context.Context, hash libhead.Hash) (*header.Exten
return api.Internal.GetByHash(ctx, hash)
}

func (api *API) GetVerifiedRangeByHeight(
func (api *API) GetRangeByHeight(
ctx context.Context,
from *header.ExtendedHeader,
to uint64,
) ([]*header.ExtendedHeader, error) {
return api.Internal.GetVerifiedRangeByHeight(ctx, from, to)
return api.Internal.GetRangeByHeight(ctx, from, to)
}

func (api *API) GetByHeight(ctx context.Context, u uint64) (*header.ExtendedHeader, error) {
Expand Down
15 changes: 7 additions & 8 deletions nodebuilder/header/mocks/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 14 additions & 4 deletions nodebuilder/header/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ func ConstructModule[H libhead.Header[H]](tp node.Type, cfg *Config) fx.Option {
}),
)),
fx.Provide(fx.Annotate(
func(ps *pubsub.PubSub, network modp2p.Network) *p2p.Subscriber[H] {
return p2p.NewSubscriber[H](ps, header.MsgID, network.String())
func(ps *pubsub.PubSub, network modp2p.Network) (*p2p.Subscriber[H], error) {
opts := []p2p.SubscriberOption{p2p.WithSubscriberNetworkID(network.String())}
if MetricsEnabled {
opts = append(opts, p2p.WithSubscriberMetrics())
}
return p2p.NewSubscriber[H](ps, header.MsgID, opts...)
},
fx.OnStart(func(ctx context.Context, sub *p2p.Subscriber[H]) error {
return sub.Start(ctx)
Expand All @@ -62,14 +66,20 @@ func ConstructModule[H libhead.Header[H]](tp node.Type, cfg *Config) fx.Option {
)),
fx.Provide(fx.Annotate(
func(
cfg Config,
host host.Host,
store libhead.Store[H],
network modp2p.Network,
) (*p2p.ExchangeServer[H], error) {
return p2p.NewExchangeServer[H](host, store,
opts := []p2p.Option[p2p.ServerParameters]{
p2p.WithParams(cfg.Server),
p2p.WithNetworkID[p2p.ServerParameters](network.String()),
)
}
if MetricsEnabled {
opts = append(opts, p2p.WithMetrics[p2p.ServerParameters]())
}

return p2p.NewExchangeServer[H](host, store, opts...)
},
fx.OnStart(func(ctx context.Context, server *p2p.ExchangeServer[H]) error {
return server.Start(ctx)
Expand Down
28 changes: 0 additions & 28 deletions nodebuilder/header/opts.go

This file was deleted.

4 changes: 2 additions & 2 deletions nodebuilder/header/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ func (s *Service) GetByHash(ctx context.Context, hash libhead.Hash) (*header.Ext
return s.store.Get(ctx, hash)
}

func (s *Service) GetVerifiedRangeByHeight(
func (s *Service) GetRangeByHeight(
ctx context.Context,
from *header.ExtendedHeader,
to uint64,
) ([]*header.ExtendedHeader, error) {
return s.store.GetVerifiedRange(ctx, from, to)
return s.store.GetRangeByHeight(ctx, from, to)
}

func (s *Service) GetByHeight(ctx context.Context, height uint64) (*header.ExtendedHeader, error) {
Expand Down
Loading

0 comments on commit 422dea2

Please sign in to comment.