From 25c05d53c5f0195688918a9e6f494c874fdb3768 Mon Sep 17 00:00:00 2001 From: ramin Date: Wed, 22 Nov 2023 12:20:15 +0000 Subject: [PATCH 01/13] set a .go-version from file, set as output in github actions, set up golangci-lint so we can get to lint checks and fieldalignement --- .github/workflows/go-ci.yml | 37 ++++++++++++++++++- .go-version | 1 + .golangci.yml | 72 +++++++++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 1 deletion(-) create mode 100644 .go-version create mode 100644 .golangci.yml diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml index c1727901..7ee97884 100644 --- a/.github/workflows/go-ci.yml +++ b/.github/workflows/go-ci.yml @@ -5,10 +5,45 @@ on: - v* branches: - main + - chore/ramin/field-alignment pull_request: + jobs: + setup: + runs-on: ubuntu-latest + outputs: + go-version: ${{ steps.go-version.outputs.version }} + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Read .go-version file + id: go-version + run: echo "version=$(cat .go-version)" >> $GITHUB_ENV + + lint: + needs: [setup] + name: Lint + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-go@v4 + with: + go-version: ${{ needs.setup.outputs.go-version }} + + - name: golangci-lint + uses: golangci/golangci-lint-action@v3.7.0 + with: + args: --timeout 10m + version: v1.55 + skip-pkg-cache: true + skip-build-cache: true build: + needs: [lint] runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 @@ -16,7 +51,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: 1.21 + go-version: ${{ needs.setup.outputs.go-version }} - name: Build run: go build -v ./... diff --git a/.go-version b/.go-version new file mode 100644 index 00000000..d2ab029d --- /dev/null +++ b/.go-version @@ -0,0 +1 @@ +1.21 diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 00000000..02c51acb --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,72 @@ +run: + timeout: 5m + +linters: + enable: + - bodyclose + # - depguard as of v1.54.2, the default config throws errors on our repo + - dogsled + - dupl + - errcheck + # - funlen + # - gochecknoglobals + # - gochecknoinits + - goconst + - gocritic + # - gocyclo + # - godox + - gofmt + - goimports + # - golint - deprecated since v1.41. revive will be used instead + - revive + - gosec + - gosimple + - govet + - ineffassign + # - interfacer + - lll + - misspell + # - maligned + - nakedret + - prealloc + # - scopelint - deprecated since v1.39. exportloopref will be used instead + - exportloopref + - staticcheck + - stylecheck + - typecheck + - unconvert + # - unparam + - unused + # - whitespace + # - wsl + # - gocognit + - nolintlint + - asciicheck + +issues: + exclude-rules: + - path: _test\.go + linters: + - gosec + - govet + - linters: + - lll + source: "https://" + max-same-issues: 50 + +linters-settings: + dogsled: + max-blank-identifiers: 3 + golint: + min-confidence: 0 + maligned: + suggest-new: true + misspell: + locale: US + goimports: + local-prefixes: github.com/celestiaorg/celestia-node + dupl: + threshold: 200 + govet: + enable: + - fieldalignment From f85b358c4a73be11cae7018e18c80bd9bae337e7 Mon Sep 17 00:00:00 2001 From: ramin Date: Wed, 22 Nov 2023 12:23:31 +0000 Subject: [PATCH 02/13] debug --- .github/workflows/go-ci.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml index 7ee97884..a62da742 100644 --- a/.github/workflows/go-ci.yml +++ b/.github/workflows/go-ci.yml @@ -12,7 +12,7 @@ jobs: setup: runs-on: ubuntu-latest outputs: - go-version: ${{ steps.go-version.outputs.version }} + go-version: ${{ steps.go-version.outputs.go-version }} steps: - name: Checkout code @@ -20,7 +20,11 @@ jobs: - name: Read .go-version file id: go-version - run: echo "version=$(cat .go-version)" >> $GITHUB_ENV + run: cat .go-version + + - name: Read .go-version file + id: go-version + run: echo "go-version=$(cat .go-version)" >> $GITHUB_ENV lint: needs: [setup] From 8675a6c7264bd8558495e591bca4405386238299 Mon Sep 17 00:00:00 2001 From: ramin Date: Wed, 22 Nov 2023 12:24:12 +0000 Subject: [PATCH 03/13] debug 2 --- .github/workflows/go-ci.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml index a62da742..708d073e 100644 --- a/.github/workflows/go-ci.yml +++ b/.github/workflows/go-ci.yml @@ -18,8 +18,7 @@ jobs: - name: Checkout code uses: actions/checkout@v4 - - name: Read .go-version file - id: go-version + - name: debug .go-version file run: cat .go-version - name: Read .go-version file From beb1185370b6ab21dc34bb90b2fb492dca212029 Mon Sep 17 00:00:00 2001 From: ramin Date: Wed, 22 Nov 2023 12:40:47 +0000 Subject: [PATCH 04/13] set output correctly --- .github/workflows/go-ci.yml | 3 ++- sync/sync_head_test.go | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml index 708d073e..5f1698dd 100644 --- a/.github/workflows/go-ci.yml +++ b/.github/workflows/go-ci.yml @@ -23,7 +23,8 @@ jobs: - name: Read .go-version file id: go-version - run: echo "go-version=$(cat .go-version)" >> $GITHUB_ENV + run: | + echo "go-version=$(cat .go-version)" >> $GITHUB_OUTPUT lint: needs: [setup] diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index 29b75ff4..8c6ad27d 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -126,12 +126,12 @@ func (t *wrappedGetter) Head(ctx context.Context, options ...header.HeadOption[* } func (t *wrappedGetter) Get(ctx context.Context, hash header.Hash) (*headertest.DummyHeader, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (t *wrappedGetter) GetByHeight(ctx context.Context, u uint64) (*headertest.DummyHeader, error) { - //TODO implement me + // TODO implement me panic("implement me") } @@ -140,6 +140,6 @@ func (t *wrappedGetter) GetRangeByHeight( from *headertest.DummyHeader, to uint64, ) ([]*headertest.DummyHeader, error) { - //TODO implement me + // TODO implement me panic("implement me") } From 5d26d56fd385a8b9cf11e5b057b237def67258ca Mon Sep 17 00:00:00 2001 From: ramin Date: Wed, 22 Nov 2023 13:08:58 +0000 Subject: [PATCH 05/13] fix lint errors: unused parameters, locale not US spelling, long lines, unecessary conversions (lots of uint64 conversions around uint64 return types) --- headertest/store.go | 12 ++++++------ headertest/subscriber.go | 2 +- p2p/exchange_test.go | 19 ++++++++++++++++--- p2p/options.go | 8 +++----- p2p/server.go | 4 ++-- p2p/subscription_test.go | 12 ++++++++++-- store/heightsub.go | 6 +++++- store/init.go | 7 ++++++- store/testing.go | 6 +++++- sync/ranges.go | 4 ++-- sync/sync.go | 6 +++--- sync/sync_getter_test.go | 13 ++++++++++--- sync/sync_head_test.go | 21 +++++++++++++++------ sync/sync_test.go | 16 ++++++++-------- verify.go | 23 ++++++++++++++++++++--- 15 files changed, 112 insertions(+), 47 deletions(-) diff --git a/headertest/store.go b/headertest/store.go index 7a663360..cffcefab 100644 --- a/headertest/store.go +++ b/headertest/store.go @@ -23,7 +23,7 @@ func NewDummyStore(t *testing.T) *Store[*DummyHeader] { } // NewStore creates a generic mock store supporting different type of Headers based on Generator. -func NewStore[H header.Header[H]](t *testing.T, gen Generator[H], numHeaders int) *Store[H] { +func NewStore[H header.Header[H]](_ *testing.T, gen Generator[H], numHeaders int) *Store[H] { store := &Store[H]{ Headers: make(map[uint64]H), HeadHeight: 0, @@ -43,14 +43,14 @@ func NewStore[H header.Header[H]](t *testing.T, gen Generator[H], numHeaders int func (m *Store[H]) Init(context.Context, H) error { return nil } func (m *Store[H]) Height() uint64 { - return uint64(m.HeadHeight) + return m.HeadHeight } func (m *Store[H]) Head(context.Context, ...header.HeadOption[H]) (H, error) { return m.Headers[m.HeadHeight], nil } -func (m *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) { +func (m *Store[H]) Get(_ context.Context, hash header.Hash) (H, error) { for _, header := range m.Headers { if bytes.Equal(header.Hash(), hash) { return header, nil @@ -60,7 +60,7 @@ func (m *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) { return zero, header.ErrNotFound } -func (m *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { +func (m *Store[H]) GetByHeight(_ context.Context, height uint64) (H, error) { return m.Headers[height], nil } @@ -74,7 +74,7 @@ func (m *Store[H]) GetRangeByHeight(ctx context.Context, fromHead H, to uint64) return m.getRangeByHeight(ctx, from, to) } -func (m *Store[H]) getRangeByHeight(ctx context.Context, from, to uint64) ([]H, error) { +func (m *Store[H]) getRangeByHeight(_ context.Context, from, to uint64) ([]H, error) { amount := to - from headers := make([]H, amount) @@ -99,7 +99,7 @@ func (m *Store[H]) HasAt(_ context.Context, height uint64) bool { return height != 0 && m.HeadHeight >= height } -func (m *Store[H]) Append(ctx context.Context, headers ...H) error { +func (m *Store[H]) Append(_ context.Context, headers ...H) error { for _, header := range headers { m.Headers[header.Height()] = header // set head diff --git a/headertest/subscriber.go b/headertest/subscriber.go index 64e92b98..b64e3b99 100644 --- a/headertest/subscriber.go +++ b/headertest/subscriber.go @@ -22,7 +22,7 @@ func (mhs *Subscriber[H]) Subscribe() (header.Subscription[H], error) { return mhs, nil } -func (mhs *Subscriber[H]) NextHeader(ctx context.Context) (H, error) { +func (mhs *Subscriber[H]) NextHeader(_ context.Context) (H, error) { defer func() { if len(mhs.Headers) > 1 { // pop the already-returned header diff --git a/p2p/exchange_test.go b/p2p/exchange_test.go index 38298e0e..a7b72f11 100644 --- a/p2p/exchange_test.go +++ b/p2p/exchange_test.go @@ -607,7 +607,12 @@ func quicHosts(t *testing.T, n int) []libhost.Host { return hosts } -func client(ctx context.Context, t *testing.T, host libhost.Host, trusted []peer.ID) *Exchange[*headertest.DummyHeader] { +func client( + ctx context.Context, + t *testing.T, + host libhost.Host, + trusted []peer.ID, +) *Exchange[*headertest.DummyHeader] { client, err := NewExchange[*headertest.DummyHeader](host, trusted, nil) require.NoError(t, err) @@ -621,7 +626,12 @@ func client(ctx context.Context, t *testing.T, host libhost.Host, trusted []peer return client } -func server(ctx context.Context, t *testing.T, host libhost.Host, store header.Store[*headertest.DummyHeader]) *ExchangeServer[*headertest.DummyHeader] { +func server( + ctx context.Context, + t *testing.T, + host libhost.Host, + store header.Store[*headertest.DummyHeader], +) *ExchangeServer[*headertest.DummyHeader] { server, err := NewExchangeServer[*headertest.DummyHeader](host, store) require.NoError(t, err) err = server.Start(ctx) @@ -643,7 +653,10 @@ func (t *timedOutStore) HasAt(_ context.Context, _ uint64) bool { return true } -func (t *timedOutStore) Head(context.Context, ...header.HeadOption[*headertest.DummyHeader]) (*headertest.DummyHeader, error) { +func (t *timedOutStore) Head( + context.Context, + ...header.HeadOption[*headertest.DummyHeader], +) (*headertest.DummyHeader, error) { time.Sleep(t.timeout) return nil, header.ErrNoHead } diff --git a/p2p/options.go b/p2p/options.go index b1b4b9cf..1e3f8ad0 100644 --- a/p2p/options.go +++ b/p2p/options.go @@ -57,7 +57,7 @@ func (p *ServerParameters) Validate() error { func WithMetrics[T parameters]() Option[T] { return func(p *T) { - switch t := any(p).(type) { //nolint:gocritic + switch t := any(p).(type) { case *ServerParameters: t.metrics = true case *ClientParameters: @@ -178,8 +178,7 @@ func WithMaxHeadersPerRangeRequest[T ClientParameters](amount uint64) Option[T] // `chainID` parameter. func WithChainID[T ClientParameters](chainID string) Option[T] { return func(p *T) { - switch t := any(p).(type) { //nolint:gocritic - case *ClientParameters: + if t, ok := any(p).(*ClientParameters); ok { t.chainID = chainID } } @@ -189,8 +188,7 @@ func WithChainID[T ClientParameters](chainID string) Option[T] { // inside the peerTracker. func WithPeerIDStore[T ClientParameters](pidstore PeerIDStore) Option[T] { return func(p *T) { - switch t := any(p).(type) { //nolint:gocritic - case *ClientParameters: + if t, ok := any(p).(*ClientParameters); ok { t.pidstore = pidstore } } diff --git a/p2p/server.go b/p2p/server.go index 327b9158..08df2030 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -242,10 +242,10 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) { log.Debugw("server: serving partial range", "prevMaxHeight", to, - "newMaxHeight", uint64(head.Height())+1, + "newMaxHeight", head.Height()+1, ) // change `to` height to return a partial range - to = uint64(head.Height()) + 1 + to = head.Height() + 1 } headersByRange, err := serv.store.GetRange(ctx, from, to) diff --git a/p2p/subscription_test.go b/p2p/subscription_test.go index e9e53700..81430872 100644 --- a/p2p/subscription_test.go +++ b/p2p/subscription_test.go @@ -30,7 +30,11 @@ func TestSubscriber(t *testing.T) { require.NoError(t, err) // create sub-service lifecycles for header service 1 - p2pSub1, err := NewSubscriber[*headertest.DummyHeader](pubsub1, pubsub.DefaultMsgIdFn, WithSubscriberNetworkID(networkID)) + p2pSub1, err := NewSubscriber[*headertest.DummyHeader]( + pubsub1, + pubsub.DefaultMsgIdFn, + WithSubscriberNetworkID(networkID), + ) require.NoError(t, err) err = p2pSub1.Start(context.Background()) require.NoError(t, err) @@ -41,7 +45,11 @@ func TestSubscriber(t *testing.T) { require.NoError(t, err) // create sub-service lifecycles for header service 2 - p2pSub2, err := NewSubscriber[*headertest.DummyHeader](pubsub2, pubsub.DefaultMsgIdFn, WithSubscriberNetworkID(networkID)) + p2pSub2, err := NewSubscriber[*headertest.DummyHeader]( + pubsub2, + pubsub.DefaultMsgIdFn, + WithSubscriberNetworkID(networkID), + ) require.NoError(t, err) err = p2pSub2.Start(context.Background()) require.NoError(t, err) diff --git a/store/heightsub.go b/store/heightsub.go index a69f28f6..074ed644 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -84,7 +84,11 @@ func (hs *heightSub[H]) Pub(headers ...H) { height := hs.Height() from, to := headers[0].Height(), headers[ln-1].Height() if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1 - log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from) + log.Fatalf( + "PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", + height+1, + from, + ) return } hs.SetHeight(to) diff --git a/store/init.go b/store/init.go index 5d26a97b..ae35b588 100644 --- a/store/init.go +++ b/store/init.go @@ -9,7 +9,12 @@ import ( // Init ensures a Store is initialized. If it is not already initialized, // it initializes the Store by requesting the header with the given hash. -func Init[H header.Header[H]](ctx context.Context, store header.Store[H], ex header.Exchange[H], hash header.Hash) error { +func Init[H header.Header[H]]( + ctx context.Context, + store header.Store[H], + ex header.Exchange[H], + hash header.Hash, +) error { _, err := store.Head(ctx) switch { default: diff --git a/store/testing.go b/store/testing.go index b415bcea..3b64c060 100644 --- a/store/testing.go +++ b/store/testing.go @@ -13,7 +13,11 @@ import ( ) // NewTestStore creates initialized and started in memory header Store which is useful for testing. -func NewTestStore(ctx context.Context, t *testing.T, head *headertest.DummyHeader) header.Store[*headertest.DummyHeader] { +func NewTestStore( + ctx context.Context, + t *testing.T, + head *headertest.DummyHeader, +) header.Store[*headertest.DummyHeader] { store, err := NewStoreWithHead(ctx, sync.MutexWrap(datastore.NewMapDatastore()), head) require.NoError(t, err) diff --git a/sync/ranges.go b/sync/ranges.go index 4329ab0c..5e07ac23 100644 --- a/sync/ranges.go +++ b/sync/ranges.go @@ -94,7 +94,7 @@ type headerRange[H header.Header[H]] struct { func newRange[H header.Header[H]](h H) *headerRange[H] { return &headerRange[H]{ - start: uint64(h.Height()), + start: h.Height(), headers: []H{h}, } } @@ -142,7 +142,7 @@ func (r *headerRange[H]) Remove(end uint64) { amnt := r.rangeAmount(end) r.headers = r.headers[amnt:] if len(r.headers) != 0 { - r.start = uint64(r.headers[0].Height()) + r.start = r.headers[0].Height() } } diff --git a/sync/sync.go b/sync/sync.go index 1dc53c27..8538c046 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -158,7 +158,7 @@ func (s *Syncer[H]) State() State { head, err := s.store.Head(s.ctx) if err == nil { - state.Height = uint64(head.Height()) + state.Height = head.Height() } else if state.Error == "" { // don't ignore the error if we can show it in the state state.Error = err.Error() @@ -239,8 +239,8 @@ func (s *Syncer[H]) sync(ctx context.Context) { func (s *Syncer[H]) doSync(ctx context.Context, fromHead, toHead H) (err error) { s.stateLk.Lock() s.state.ID++ - s.state.FromHeight = uint64(fromHead.Height()) + 1 - s.state.ToHeight = uint64(toHead.Height()) + s.state.FromHeight = fromHead.Height() + 1 + s.state.ToHeight = toHead.Height() s.state.FromHash = fromHead.Hash() s.state.ToHash = toHead.Hash() s.state.Start = time.Now() diff --git a/sync/sync_getter_test.go b/sync/sync_getter_test.go index 16ddc196..2722e3c3 100644 --- a/sync/sync_getter_test.go +++ b/sync/sync_getter_test.go @@ -59,14 +59,21 @@ func (f *fakeGetter[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (h return } -func (f *fakeGetter[H]) Get(ctx context.Context, hash header.Hash) (H, error) { +func (f *fakeGetter[H]) Get( + _ context.Context, + _ header.Hash, +) (H, error) { panic("implement me") } -func (f *fakeGetter[H]) GetByHeight(ctx context.Context, u uint64) (H, error) { +func (f *fakeGetter[H]) GetByHeight(_ context.Context, _ uint64) (H, error) { panic("implement me") } -func (f *fakeGetter[H]) GetRangeByHeight(ctx context.Context, from H, to uint64) ([]H, error) { +func (f *fakeGetter[H]) GetRangeByHeight( + _ context.Context, + _ H, + _ uint64, +) ([]H, error) { panic("implement me") } diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index 8c6ad27d..bdfb3f76 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -114,7 +114,10 @@ func newWrappedGetter(ex header.Exchange[*headertest.DummyHeader]) *wrappedGette } } -func (t *wrappedGetter) Head(ctx context.Context, options ...header.HeadOption[*headertest.DummyHeader]) (*headertest.DummyHeader, error) { +func (t *wrappedGetter) Head( + ctx context.Context, + options ...header.HeadOption[*headertest.DummyHeader], +) (*headertest.DummyHeader, error) { params := header.HeadParams[*headertest.DummyHeader]{} for _, opt := range options { opt(¶ms) @@ -125,20 +128,26 @@ func (t *wrappedGetter) Head(ctx context.Context, options ...header.HeadOption[* return t.ex.Head(ctx, options...) } -func (t *wrappedGetter) Get(ctx context.Context, hash header.Hash) (*headertest.DummyHeader, error) { +func (t *wrappedGetter) Get( + _ context.Context, + _ header.Hash, +) (*headertest.DummyHeader, error) { // TODO implement me panic("implement me") } -func (t *wrappedGetter) GetByHeight(ctx context.Context, u uint64) (*headertest.DummyHeader, error) { +func (t *wrappedGetter) GetByHeight( + _ context.Context, + _ uint64, +) (*headertest.DummyHeader, error) { // TODO implement me panic("implement me") } func (t *wrappedGetter) GetRangeByHeight( - ctx context.Context, - from *headertest.DummyHeader, - to uint64, + _ context.Context, + _ *headertest.DummyHeader, + _ uint64, ) ([]*headertest.DummyHeader, error) { // TODO implement me panic("implement me") diff --git a/sync/sync_test.go b/sync/sync_test.go index dc108cc5..585b9b78 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -54,9 +54,9 @@ func TestSyncSimpleRequestingHead(t *testing.T) { assert.Empty(t, syncer.pending.Head()) state := syncer.State() - assert.Equal(t, uint64(exp.Height()), state.Height) + assert.Equal(t, exp.Height(), state.Height) assert.Equal(t, uint64(2), state.FromHeight) - assert.Equal(t, uint64(exp.Height()), state.ToHeight) + assert.Equal(t, exp.Height(), state.ToHeight) assert.True(t, state.Finished(), state) } @@ -144,9 +144,9 @@ func TestSyncCatchUp(t *testing.T) { assert.Empty(t, syncer.pending.Head()) state := syncer.State() - assert.Equal(t, uint64(exp.Height()+1), state.Height) + assert.Equal(t, exp.Height()+1, state.Height) assert.Equal(t, uint64(2), state.FromHeight) - assert.Equal(t, uint64(exp.Height()+1), state.ToHeight) + assert.Equal(t, exp.Height()+1, state.ToHeight) assert.True(t, state.Finished(), state) } @@ -290,7 +290,7 @@ func TestSyncerIncomingDuplicate(t *testing.T) { // TestSync_InvalidSyncTarget tests the possible case that a sync target // passes non-adjacent verification but is actually invalid once it is processed -// via VerifyAdjacent during sync. The expected behaviour is that the syncer would +// via VerifyAdjacent during sync. The expected behavior is that the syncer would // discard the invalid sync target and listen for a new sync target from headersub // and sync the valid chain. func TestSync_InvalidSyncTarget(t *testing.T) { @@ -300,7 +300,7 @@ func TestSync_InvalidSyncTarget(t *testing.T) { suite := headertest.NewTestSuite(t) head := suite.Head() - // create a local store which is initialised at genesis height + // create a local store which is initialized at genesis height localStore := store.NewTestStore(ctx, t, head) // create a peer which is already on height 100 remoteStore := headertest.NewStore[*headertest.DummyHeader](t, suite, 100) @@ -341,7 +341,7 @@ func TestSync_InvalidSyncTarget(t *testing.T) { cancel() // ensure that syncer still expects to sync to the bad sync target's // height - require.Equal(t, uint64(maliciousHeader.Height()), syncer.State().ToHeight) + require.Equal(t, maliciousHeader.Height(), syncer.State().ToHeight) // ensure syncer could only sync up to one header below the bad sync target h, err := localStore.Head(ctx) require.NoError(t, err) @@ -370,7 +370,7 @@ func TestSync_InvalidSyncTarget(t *testing.T) { // ensure that maliciousHeader height was re-requested and a good one was // found - rerequested, err := localStore.GetByHeight(ctx, uint64(maliciousHeader.Height())) + rerequested, err := localStore.GetByHeight(ctx, maliciousHeader.Height()) require.NoError(t, err) require.False(t, rerequested.VerifyFailure) diff --git a/verify.go b/verify.go index 0c1f3e9e..a2ec58ce 100644 --- a/verify.go +++ b/verify.go @@ -59,12 +59,23 @@ func verify[H Header[H]](trstd, untrstd H, heightThreshold uint64) error { } if untrstd.Time().Before(trstd.Time()) { - return fmt.Errorf("%w: timestamp '%s' < current '%s'", ErrUnorderedTime, formatTime(untrstd.Time()), formatTime(trstd.Time())) + return fmt.Errorf( + "%w: timestamp '%s' < current '%s'", + ErrUnorderedTime, + formatTime(untrstd.Time()), + formatTime(trstd.Time()), + ) } now := time.Now() if untrstd.Time().After(now.Add(clockDrift)) { - return fmt.Errorf("%w: timestamp '%s' > now '%s', clock_drift '%v'", ErrFromFuture, formatTime(untrstd.Time()), formatTime(now), clockDrift) + return fmt.Errorf( + "%w: timestamp '%s' > now '%s', clock_drift '%v'", + ErrFromFuture, + formatTime(untrstd.Time()), + formatTime(now), + clockDrift, + ) } known := untrstd.Height() <= trstd.Height() @@ -76,7 +87,13 @@ func verify[H Header[H]](trstd, untrstd H, heightThreshold uint64) error { // yet taken as sync target adequateHeight := untrstd.Height()-trstd.Height() < heightThreshold if !adequateHeight { - return fmt.Errorf("%w: '%d' - current '%d' >= threshold '%d'", ErrHeightFromFuture, untrstd.Height(), trstd.Height(), heightThreshold) + return fmt.Errorf( + "%w: '%d' - current '%d' >= threshold '%d'", + ErrHeightFromFuture, + untrstd.Height(), + trstd.Height(), + heightThreshold, + ) } return nil From 0f12e512f16ba5e650183aaf64a2ff2f4023441e Mon Sep 17 00:00:00 2001 From: ramin Date: Wed, 22 Nov 2023 13:13:24 +0000 Subject: [PATCH 06/13] fix alignment of all structs --- headertest/dummy_header.go | 6 ++++-- p2p/exchange.go | 16 +++++++--------- p2p/exchange_metrics.go | 9 ++++++--- p2p/exchange_test.go | 4 ++-- p2p/options.go | 18 +++++++++--------- p2p/peer_stats.go | 14 +++++++------- p2p/peer_tracker.go | 16 +++++++++------- p2p/session.go | 21 +++++++++++---------- p2p/subscriber.go | 9 ++++----- p2p/subscriber_metrics.go | 6 ++++-- store/batch.go | 2 +- store/heightsub.go | 2 +- store/metrics.go | 2 +- store/options.go | 9 +++++---- sync/metrics.go | 2 +- sync/ranges.go | 4 ++-- sync/sync.go | 38 ++++++++++++++++++++------------------ sync/sync_getter.go | 2 +- 18 files changed, 95 insertions(+), 85 deletions(-) diff --git a/headertest/dummy_header.go b/headertest/dummy_header.go index e8480dde..b1fb7d46 100644 --- a/headertest/dummy_header.go +++ b/headertest/dummy_header.go @@ -17,13 +17,15 @@ import ( var ErrDummyVerify = errors.New("dummy verify error") type DummyHeader struct { + Timestamp time.Time + Chainid string PreviousHash header.Hash - HeightI uint64 - Timestamp time.Time hash header.Hash + HeightI uint64 + // VerifyFailure allows for testing scenarios where a header would fail // verification. When set to true, it forces a failure. VerifyFailure bool diff --git a/p2p/exchange.go b/p2p/exchange.go index 607bfb0f..15f5c4aa 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -34,17 +34,15 @@ var maxUntrustedHeadRequests = 4 // Exchange enables sending outbound HeaderRequests to the network as well as // handling inbound HeaderRequests from the network. type Exchange[H header.Header[H]] struct { - ctx context.Context - cancel context.CancelFunc - - protocolID protocol.ID - host host.Host + host host.Host + ctx context.Context + peerTracker *peerTracker + metrics *exchangeMetrics + cancel context.CancelFunc trustedPeers func() peer.IDSlice - peerTracker *peerTracker - metrics *exchangeMetrics - - Params ClientParameters + protocolID protocol.ID + Params ClientParameters } func NewExchange[H header.Header[H]]( diff --git a/p2p/exchange_metrics.go b/p2p/exchange_metrics.go index 82456c2f..b642e867 100644 --- a/p2p/exchange_metrics.go +++ b/p2p/exchange_metrics.go @@ -31,17 +31,20 @@ type exchangeMetrics struct { responseSizeInst metric.Int64Histogram responseTimeInst metric.Float64Histogram - trackerPeersNum atomic.Int64 trackedPeersNumInst metric.Int64ObservableGauge trackedPeersNumReg metric.Registration - disconnectedPeersNum atomic.Int64 disconnectedPeersNumInst metric.Int64ObservableGauge disconnectedPeersNumReg metric.Registration - blockedPeersNum atomic.Int64 blockedPeersNumInst metric.Int64ObservableGauge blockedPeersNumReg metric.Registration + + trackerPeersNum atomic.Int64 + + disconnectedPeersNum atomic.Int64 + + blockedPeersNum atomic.Int64 } func newExchangeMetrics() (m *exchangeMetrics, err error) { diff --git a/p2p/exchange_test.go b/p2p/exchange_test.go index a7b72f11..74c43c1a 100644 --- a/p2p/exchange_test.go +++ b/p2p/exchange_test.go @@ -51,10 +51,10 @@ func TestExchange_RequestHead(t *testing.T) { }) tests := []struct { - requestFromTrusted bool lastHeader *headertest.DummyHeader - expectedHeight uint64 expectedHash header.Hash + expectedHeight uint64 + requestFromTrusted bool }{ // routes to trusted peer only { diff --git a/p2p/options.go b/p2p/options.go index 1e3f8ad0..fd16ac4e 100644 --- a/p2p/options.go +++ b/p2p/options.go @@ -18,6 +18,9 @@ type Option[T parameters] func(*T) // ServerParameters is the set of parameters that must be configured for the exchange. type ServerParameters struct { + // networkID is a network that will be used to create a protocol.ID + // Is empty by default + networkID string // WriteDeadline sets the timeout for sending messages to the stream WriteDeadline time.Duration // ReadDeadline sets the timeout for reading messages from the stream @@ -25,9 +28,6 @@ type ServerParameters struct { // RangeRequestTimeout defines a timeout after which the session will try to re-request headers // from another peer. RangeRequestTimeout time.Duration - // networkID is a network that will be used to create a protocol.ID - // Is empty by default - networkID string // metrics is a flag that enables metrics collection. metrics bool } @@ -123,19 +123,19 @@ func WithParams[T parameters](params T) Option[T] { // ClientParameters is the set of parameters that must be configured for the exchange. type ClientParameters struct { + // pidstore is an optional interface used to periodically dump peers + pidstore PeerIDStore + // networkID is a network that will be used to create a protocol.ID + networkID string + // chainID is an identifier of the chain. + chainID string // MaxHeadersPerRangeRequest defines the max amount of headers that can be requested per 1 request. MaxHeadersPerRangeRequest uint64 // RangeRequestTimeout defines a timeout after which the session will try to re-request headers // from another peer. RangeRequestTimeout time.Duration - // networkID is a network that will be used to create a protocol.ID - networkID string - // chainID is an identifier of the chain. - chainID string // metrics is a flag that enables metrics collection. metrics bool - // pidstore is an optional interface used to periodically dump peers - pidstore PeerIDStore } // DefaultClientParameters returns the default params to configure the store. diff --git a/p2p/peer_stats.go b/p2p/peer_stats.go index 0277fcd4..fac733a8 100644 --- a/p2p/peer_stats.go +++ b/p2p/peer_stats.go @@ -11,13 +11,13 @@ import ( // peerStat represents a peer's average statistics. type peerStat struct { - sync.RWMutex - peerID peer.ID - // score is the average speed per single request - peerScore float32 // pruneDeadline specifies when disconnected peer will be removed if // it does not return online. pruneDeadline time.Time + peerID peer.ID + sync.RWMutex + // score is the average speed per single request + peerScore float32 } // updateStats recalculates peer.score by averaging the last score @@ -100,10 +100,10 @@ func (ps *peerStats) Pop() any { type peerQueue struct { ctx context.Context - statsLk sync.RWMutex - stats peerStats - havePeer chan struct{} + stats peerStats + + statsLk sync.RWMutex } func newPeerQueue(ctx context.Context, stats []*peerStat) *peerQueue { diff --git a/p2p/peer_tracker.go b/p2p/peer_tracker.go index 33fffc91..02feaeac 100644 --- a/p2p/peer_tracker.go +++ b/p2p/peer_tracker.go @@ -28,11 +28,16 @@ var ( ) type peerTracker struct { - host host.Host + host host.Host + + // an optional interface used to periodically dump + // good peers during garbage collection + pidstore PeerIDStore + + ctx context.Context connGater *conngater.BasicConnectionGater metrics *exchangeMetrics - peerLk sync.RWMutex // trackedPeers contains active peers that we can request to. // we cache the peer once they disconnect, // so we can guarantee that peerQueue will only contain active peers @@ -41,15 +46,12 @@ type peerTracker struct { // online until pruneDeadline, it will be removed and its score will be lost disconnectedPeers map[libpeer.ID]*peerStat - // an optional interface used to periodically dump - // good peers during garbage collection - pidstore PeerIDStore - - ctx context.Context cancel context.CancelFunc // done is used to gracefully stop the peerTracker. // It allows to wait until track() and gc() will be stopped. done chan struct{} + + peerLk sync.RWMutex } func newPeerTracker( diff --git a/p2p/session.go b/p2p/session.go index 37f1e258..2943a64f 100644 --- a/p2p/session.go +++ b/p2p/session.go @@ -29,21 +29,22 @@ func withValidation[H header.Header[H]](from H) option[H] { // session aims to divide a range of headers // into several smaller requests among different peers. type session[H header.Header[H]] struct { - host host.Host - protocolID protocol.ID - queue *peerQueue + host host.Host + + // Otherwise, it will be nil. + // `from` is set when additional validation for range is needed. + from H + + ctx context.Context + queue *peerQueue // peerTracker contains discovered peers with records that describes their activity. peerTracker *peerTracker metrics *exchangeMetrics - // Otherwise, it will be nil. - // `from` is set when additional validation for range is needed. - from H + cancel context.CancelFunc + reqCh chan *p2p_pb.HeaderRequest + protocolID protocol.ID requestTimeout time.Duration - - ctx context.Context - cancel context.CancelFunc - reqCh chan *p2p_pb.HeaderRequest } func newSession[H header.Header[H]]( diff --git a/p2p/subscriber.go b/p2p/subscriber.go index c5f70dcc..eded8539 100644 --- a/p2p/subscriber.go +++ b/p2p/subscriber.go @@ -24,12 +24,11 @@ type SubscriberParams struct { // Subscriber manages the lifecycle and relationship of header Module // with the "header-sub" gossipsub topic. type Subscriber[H header.Header[H]] struct { + metrics *subscriberMetrics + pubsub *pubsub.PubSub + topic *pubsub.Topic + msgID pubsub.MsgIdFunction pubsubTopicID string - - metrics *subscriberMetrics - pubsub *pubsub.PubSub - topic *pubsub.Topic - msgID pubsub.MsgIdFunction } // WithSubscriberMetrics enables metrics collection for the Subscriber. diff --git a/p2p/subscriber_metrics.go b/p2p/subscriber_metrics.go index 164ff371..1ed56034 100644 --- a/p2p/subscriber_metrics.go +++ b/p2p/subscriber_metrics.go @@ -20,12 +20,14 @@ type subscriberMetrics struct { messageNumInst metric.Int64Counter messageSizeInst metric.Int64Histogram - messageTimeLast atomic.Pointer[time.Time] messageTimeInst metric.Float64Histogram - subscriptionNum atomic.Int64 subscriptionNumInst metric.Int64ObservableGauge subscriptionNumReg metric.Registration + + messageTimeLast atomic.Pointer[time.Time] + + subscriptionNum atomic.Int64 } func newSubscriberMetrics() (m *subscriberMetrics, err error) { diff --git a/store/batch.go b/store/batch.go index 7785953f..11191f6d 100644 --- a/store/batch.go +++ b/store/batch.go @@ -14,9 +14,9 @@ import ( // The approach simplifies implementation for the batch and // makes it better optimized for the GetByHeight case which is what we need. type batch[H header.Header[H]] struct { - lk sync.RWMutex heights map[string]uint64 headers []H + lk sync.RWMutex } // newBatch creates the batch with the given pre-allocated size. diff --git a/store/heightsub.go b/store/heightsub.go index 074ed644..d72c7d47 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -14,11 +14,11 @@ var errElapsedHeight = errors.New("elapsed height") // heightSub provides a minimalistic mechanism to wait till header for a height becomes available. type heightSub[H header.Header[H]] struct { + heightReqs map[uint64][]chan H // height refers to the latest locally available header height // that has been fully verified and inserted into the subjective chain height atomic.Uint64 heightReqsLk sync.Mutex - heightReqs map[uint64][]chan H } // newHeightSub instantiates new heightSub. diff --git a/store/metrics.go b/store/metrics.go index e5f14211..ee72132f 100644 --- a/store/metrics.go +++ b/store/metrics.go @@ -13,7 +13,6 @@ import ( var meter = otel.Meter("header/store") type metrics struct { - headHeight atomic.Int64 headHeightInst metric.Int64ObservableGauge headHeightReg metric.Registration @@ -21,6 +20,7 @@ type metrics struct { readTimeInst metric.Float64Histogram writesQueueBlockedInst metric.Int64Counter + headHeight atomic.Int64 } func newMetrics() (m *metrics, err error) { diff --git a/store/options.go b/store/options.go index b0c01d62..95da0c41 100644 --- a/store/options.go +++ b/store/options.go @@ -12,6 +12,11 @@ type Option func(*Parameters) // Parameters is the set of parameters that must be configured for the store. type Parameters struct { + + // storePrefix defines the prefix used to wrap the store + // OPTIONAL + storePrefix datastore.Key + // StoreCacheSize defines the maximum amount of entries in the Header Store cache. StoreCacheSize int @@ -22,10 +27,6 @@ type Parameters struct { // Headers are written in batches not to thrash the underlying Datastore with writes. WriteBatchSize int - // storePrefix defines the prefix used to wrap the store - // OPTIONAL - storePrefix datastore.Key - // metrics is a flag that enables metrics collection metrics bool } diff --git a/sync/metrics.go b/sync/metrics.go index 1f2c99f7..a4466175 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -11,8 +11,8 @@ import ( var meter = otel.Meter("header/sync") type metrics struct { - totalSynced atomic.Int64 totalSyncedGauge metric.Float64ObservableGauge + totalSynced atomic.Int64 } func newMetrics() (*metrics, error) { diff --git a/sync/ranges.go b/sync/ranges.go index 5e07ac23..b0fc485b 100644 --- a/sync/ranges.go +++ b/sync/ranges.go @@ -10,8 +10,8 @@ import ( // ascending order). This prevents unnecessary / duplicate network requests for additional headers // during sync. type ranges[H header.Header[H]] struct { - lk sync.RWMutex ranges []*headerRange[H] + lk sync.RWMutex } // Head returns the highest Header in all ranges if any. @@ -87,9 +87,9 @@ func (rs *ranges[H]) First() (*headerRange[H], bool) { } type headerRange[H header.Header[H]] struct { - lk sync.RWMutex headers []H start uint64 + lk sync.RWMutex } func newRange[H header.Header[H]](h H) *headerRange[H] { diff --git a/sync/sync.go b/sync/sync.go index 8538c046..d2fe473e 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -31,27 +31,29 @@ var log = logging.Logger("header/sync") // - if there is a gap between the previous and the new Subjective Head // - Triggers s.syncLoop and saves the Subjective Head in the pending so s.syncLoop can access it type Syncer[H header.Header[H]] struct { - sub header.Subscriber[H] // to subscribe for new Network Heads - store syncStore[H] // to store all the headers to - getter syncGetter[H] // to fetch headers from - metrics *metrics + store syncStore[H] // to store all the headers to + sub header.Subscriber[H] // to subscribe for new Network Heads - // stateLk protects state which represents the current or latest sync - stateLk sync.RWMutex - state State + // controls lifecycle for syncLoop + ctx context.Context + metrics *metrics // signals to start syncing triggerSync chan struct{} + cancel context.CancelFunc + + Params *Parameters + getter syncGetter[H] // to fetch headers from + // pending keeps ranges of valid new network headers awaiting to be appended to store pending ranges[H] - // incomingMu ensures only one incoming network head candidate is processed at the time - incomingMu sync.Mutex - // controls lifecycle for syncLoop - ctx context.Context - cancel context.CancelFunc + state State - Params *Parameters + // stateLk protects state which represents the current or latest sync + stateLk sync.RWMutex + // incomingMu ensures only one incoming network head candidate is processed at the time + incomingMu sync.Mutex } // NewSyncer creates a new instance of Syncer. @@ -126,15 +128,15 @@ func (s *Syncer[H]) SyncWait(ctx context.Context) error { // State collects all the information about a sync. type State struct { + Start time.Time `json:"start"` + End time.Time `json:"end"` + Error string `json:"error,omitempty"` // the error that might happen within a sync + FromHash header.Hash `json:"from_hash"` + ToHash header.Hash `json:"to_hash"` ID uint64 `json:"id"` // incrementing ID of a sync Height uint64 `json:"height"` // height at the moment when State is requested for a sync FromHeight uint64 `json:"from_height"` // the starting point of a sync ToHeight uint64 `json:"to_height"` // the ending point of a sync - FromHash header.Hash `json:"from_hash"` - ToHash header.Hash `json:"to_hash"` - Start time.Time `json:"start"` - End time.Time `json:"end"` - Error string `json:"error,omitempty"` // the error that might happen within a sync } // Finished returns true if sync is done, false otherwise. diff --git a/sync/sync_getter.go b/sync/sync_getter.go index 267240c5..faf08829 100644 --- a/sync/sync_getter.go +++ b/sync/sync_getter.go @@ -10,9 +10,9 @@ import ( // syncGetter is a Getter wrapper that ensure only one Head call happens at the time type syncGetter[H header.Header[H]] struct { + header.Getter[H] getterLk sync.RWMutex isGetterLk atomic.Bool - header.Getter[H] } // Lock locks the getter for single user. From 529d286154da89b0cfafb0920cf3bb0f7e51943d Mon Sep 17 00:00:00 2001 From: ramin Date: Wed, 22 Nov 2023 13:16:27 +0000 Subject: [PATCH 07/13] update names --- .github/workflows/go-ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml index 5f1698dd..32c5e93c 100644 --- a/.github/workflows/go-ci.yml +++ b/.github/workflows/go-ci.yml @@ -10,6 +10,7 @@ on: jobs: setup: + name: Setup runs-on: ubuntu-latest outputs: go-version: ${{ steps.go-version.outputs.go-version }} @@ -48,6 +49,7 @@ jobs: build: needs: [lint] + name: Build runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 From 9c5fea4f3b03da68e2e8e539a3306fd36a45ad32 Mon Sep 17 00:00:00 2001 From: ramin Date: Wed, 22 Nov 2023 13:16:57 +0000 Subject: [PATCH 08/13] remove trigger for PR branch --- .github/workflows/go-ci.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml index 32c5e93c..9e095081 100644 --- a/.github/workflows/go-ci.yml +++ b/.github/workflows/go-ci.yml @@ -5,7 +5,6 @@ on: - v* branches: - main - - chore/ramin/field-alignment pull_request: jobs: From 2010b37eb2c0a72235af7a50ab5661fb8a7fed5a Mon Sep 17 00:00:00 2001 From: ramin Date: Wed, 22 Nov 2023 13:22:14 +0000 Subject: [PATCH 09/13] trigger build --- .github/workflows/go-ci.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml index 9e095081..3f527565 100644 --- a/.github/workflows/go-ci.yml +++ b/.github/workflows/go-ci.yml @@ -5,6 +5,7 @@ on: - v* branches: - main + - chore/ramin/field-alignment pull_request: jobs: @@ -54,7 +55,7 @@ jobs: - uses: actions/checkout@v3 - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v4 with: go-version: ${{ needs.setup.outputs.go-version }} From 6d3e8b102e97e8ff9de37bd626c7cd01d1d7003f Mon Sep 17 00:00:00 2001 From: ramin Date: Wed, 22 Nov 2023 13:24:17 +0000 Subject: [PATCH 10/13] explicitly set reliance on setup --- .github/workflows/go-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml index 3f527565..47ff29bf 100644 --- a/.github/workflows/go-ci.yml +++ b/.github/workflows/go-ci.yml @@ -48,7 +48,7 @@ jobs: skip-build-cache: true build: - needs: [lint] + needs: [setup, lint] name: Build runs-on: ubuntu-latest steps: From 51372099352fff2f38d5c3075f65f7caec44b830 Mon Sep 17 00:00:00 2001 From: ramin Date: Wed, 22 Nov 2023 22:30:12 +0000 Subject: [PATCH 11/13] experiment with //nolint for some structs which will prioritize readability --- .github/workflows/go-ci.yml | 4 ---- .golangci.yml | 2 +- sync/sync.go | 12 +++++++----- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml index 47ff29bf..8f2d92fb 100644 --- a/.github/workflows/go-ci.yml +++ b/.github/workflows/go-ci.yml @@ -5,7 +5,6 @@ on: - v* branches: - main - - chore/ramin/field-alignment pull_request: jobs: @@ -19,9 +18,6 @@ jobs: - name: Checkout code uses: actions/checkout@v4 - - name: debug .go-version file - run: cat .go-version - - name: Read .go-version file id: go-version run: | diff --git a/.golangci.yml b/.golangci.yml index 02c51acb..975a818f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -64,7 +64,7 @@ linters-settings: misspell: locale: US goimports: - local-prefixes: github.com/celestiaorg/celestia-node + local-prefixes: github.com/celestiaorg/go-header dupl: threshold: 200 govet: diff --git a/sync/sync.go b/sync/sync.go index d2fe473e..324d3545 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -30,7 +30,14 @@ var log = logging.Logger("header/sync") // - Sets as the new Subjective Head, which // - if there is a gap between the previous and the new Subjective Head // - Triggers s.syncLoop and saves the Subjective Head in the pending so s.syncLoop can access it +// +//nolint:govet type Syncer[H header.Header[H]] struct { + // stateLk protects state which represents the current or latest sync + stateLk sync.RWMutex + // incomingMu ensures only one incoming network head candidate is processed at the time + incomingMu sync.Mutex + store syncStore[H] // to store all the headers to sub header.Subscriber[H] // to subscribe for new Network Heads @@ -49,11 +56,6 @@ type Syncer[H header.Header[H]] struct { pending ranges[H] state State - - // stateLk protects state which represents the current or latest sync - stateLk sync.RWMutex - // incomingMu ensures only one incoming network head candidate is processed at the time - incomingMu sync.Mutex } // NewSyncer creates a new instance of Syncer. From aaf2357326721cca23286da1257cbd923da4eb42 Mon Sep 17 00:00:00 2001 From: ramin Date: Thu, 23 Nov 2023 10:45:41 +0000 Subject: [PATCH 12/13] syncer ordering closer to original --- sync/sync.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sync/sync.go b/sync/sync.go index 324d3545..4beaf763 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -35,27 +35,27 @@ var log = logging.Logger("header/sync") type Syncer[H header.Header[H]] struct { // stateLk protects state which represents the current or latest sync stateLk sync.RWMutex + state State + // incomingMu ensures only one incoming network head candidate is processed at the time incomingMu sync.Mutex - store syncStore[H] // to store all the headers to - sub header.Subscriber[H] // to subscribe for new Network Heads + store syncStore[H] // to store all the headers to + sub header.Subscriber[H] // to subscribe for new Network Heads + getter syncGetter[H] // to fetch headers from + metrics *metrics // controls lifecycle for syncLoop - ctx context.Context - metrics *metrics + ctx context.Context + cancel context.CancelFunc // signals to start syncing triggerSync chan struct{} - cancel context.CancelFunc - - Params *Parameters - getter syncGetter[H] // to fetch headers from // pending keeps ranges of valid new network headers awaiting to be appended to store pending ranges[H] - state State + Params *Parameters } // NewSyncer creates a new instance of Syncer. From e2d4b1678191e35397985723571a92b362c42c62 Mon Sep 17 00:00:00 2001 From: ramin Date: Thu, 23 Nov 2023 10:56:31 +0000 Subject: [PATCH 13/13] re-ordered back to a cleaner organization for some complex singleton structs with sync primitives at top of structs for better reading vs optimization, utilize //nolint:govet where we decide on form over function --- p2p/exchange.go | 16 +++++++++------- p2p/peer_stats.go | 4 +++- p2p/peer_tracker.go | 6 +++--- store/batch.go | 4 +++- sync/ranges.go | 3 +++ sync/sync_getter.go | 4 +++- 6 files changed, 24 insertions(+), 13 deletions(-) diff --git a/p2p/exchange.go b/p2p/exchange.go index 15f5c4aa..331f9fbc 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -34,15 +34,17 @@ var maxUntrustedHeadRequests = 4 // Exchange enables sending outbound HeaderRequests to the network as well as // handling inbound HeaderRequests from the network. type Exchange[H header.Header[H]] struct { - host host.Host - ctx context.Context - peerTracker *peerTracker - metrics *exchangeMetrics + cancel context.CancelFunc + ctx context.Context + + protocolID protocol.ID + host host.Host - cancel context.CancelFunc trustedPeers func() peer.IDSlice - protocolID protocol.ID - Params ClientParameters + peerTracker *peerTracker + metrics *exchangeMetrics + + Params ClientParameters } func NewExchange[H header.Header[H]]( diff --git a/p2p/peer_stats.go b/p2p/peer_stats.go index fac733a8..dc94488a 100644 --- a/p2p/peer_stats.go +++ b/p2p/peer_stats.go @@ -10,12 +10,14 @@ import ( ) // peerStat represents a peer's average statistics. +// +//nolint:govet type peerStat struct { + sync.RWMutex // pruneDeadline specifies when disconnected peer will be removed if // it does not return online. pruneDeadline time.Time peerID peer.ID - sync.RWMutex // score is the average speed per single request peerScore float32 } diff --git a/p2p/peer_tracker.go b/p2p/peer_tracker.go index 02feaeac..67ca1d32 100644 --- a/p2p/peer_tracker.go +++ b/p2p/peer_tracker.go @@ -27,8 +27,10 @@ var ( gcCycle = time.Minute * 5 ) +//nolint:govet type peerTracker struct { - host host.Host + peerLk sync.RWMutex + host host.Host // an optional interface used to periodically dump // good peers during garbage collection @@ -50,8 +52,6 @@ type peerTracker struct { // done is used to gracefully stop the peerTracker. // It allows to wait until track() and gc() will be stopped. done chan struct{} - - peerLk sync.RWMutex } func newPeerTracker( diff --git a/store/batch.go b/store/batch.go index 11191f6d..db50c2bb 100644 --- a/store/batch.go +++ b/store/batch.go @@ -13,10 +13,12 @@ import ( // unlike the Store which keeps 'hash -> header' and 'height -> hash'. // The approach simplifies implementation for the batch and // makes it better optimized for the GetByHeight case which is what we need. +// +//nolint:govet type batch[H header.Header[H]] struct { + lk sync.RWMutex heights map[string]uint64 headers []H - lk sync.RWMutex } // newBatch creates the batch with the given pre-allocated size. diff --git a/sync/ranges.go b/sync/ranges.go index b0fc485b..3b5cf2cf 100644 --- a/sync/ranges.go +++ b/sync/ranges.go @@ -9,6 +9,9 @@ import ( // ranges keeps non-overlapping and non-adjacent header ranges which are used to cache headers (in // ascending order). This prevents unnecessary / duplicate network requests for additional headers // during sync. +// +// @ramin: allowing this one to place the syncRWMutex at bottom of +// struct as the alignment allows 32bytes -> 8 type ranges[H header.Header[H]] struct { ranges []*headerRange[H] lk sync.RWMutex diff --git a/sync/sync_getter.go b/sync/sync_getter.go index faf08829..b14e9cb5 100644 --- a/sync/sync_getter.go +++ b/sync/sync_getter.go @@ -9,10 +9,12 @@ import ( ) // syncGetter is a Getter wrapper that ensure only one Head call happens at the time +// +//nolint:govet type syncGetter[H header.Header[H]] struct { - header.Getter[H] getterLk sync.RWMutex isGetterLk atomic.Bool + header.Getter[H] } // Lock locks the getter for single user.