diff --git a/das/daser.go b/das/daser.go index c04a8dcdb8..69d677c970 100644 --- a/das/daser.go +++ b/das/daser.go @@ -132,6 +132,11 @@ func (d *DASer) Stop(ctx context.Context) error { } d.cancel() + + if err := d.sampler.metrics.close(); err != nil { + log.Warnw("closing metrics", "err", err) + } + if err = d.sampler.wait(ctx); err != nil { return fmt.Errorf("DASer force quit: %w", err) } diff --git a/das/metrics.go b/das/metrics.go index 6454e9d138..82cf0afec8 100644 --- a/das/metrics.go +++ b/das/metrics.go @@ -29,6 +29,8 @@ type metrics struct { newHead metric.Int64Counter lastSampledTS uint64 + + clientReg metric.Registration } func (d *DASer) InitMetrics() error { @@ -119,7 +121,7 @@ func (d *DASer) InitMetrics() error { return nil } - _, err = meter.RegisterCallback(callback, + d.sampler.metrics.clientReg, err = meter.RegisterCallback(callback, lastSampledTS, busyWorkers, networkHead, @@ -133,6 +135,13 @@ func (d *DASer) InitMetrics() error { return nil } +func (m *metrics) close() error { + if m == nil { + return nil + } + return m.clientReg.Unregister() +} + // observeSample records the time it took to sample a header + // the amount of sampled contiguous headers func (m *metrics) observeSample( diff --git a/go.mod b/go.mod index 1f78e552df..2a919a4a35 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/BurntSushi/toml v1.3.2 github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b github.com/benbjohnson/clock v1.3.5 - github.com/celestiaorg/celestia-app v1.8.0-rc0 + github.com/celestiaorg/celestia-app v1.8.0 github.com/celestiaorg/go-fraud v0.2.1 github.com/celestiaorg/go-header v0.6.1 github.com/celestiaorg/go-libp2p-messenger v0.2.0 @@ -121,7 +121,7 @@ require ( github.com/cosmos/gogoproto v1.4.11 // indirect github.com/cosmos/gorocksdb v1.2.0 // indirect github.com/cosmos/iavl v0.19.6 // indirect - github.com/cosmos/ibc-go/v6 v6.2.0 // indirect + github.com/cosmos/ibc-go/v6 v6.3.0 // indirect github.com/cosmos/ledger-cosmos-go v0.13.2 // indirect github.com/crate-crypto/go-kzg-4844 v0.3.0 // indirect github.com/creachadair/taskgroup v0.3.2 // indirect diff --git a/go.sum b/go.sum index 907fd782f6..948f2cbd5a 100644 --- a/go.sum +++ b/go.sum @@ -358,8 +358,8 @@ github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7 github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= -github.com/celestiaorg/celestia-app v1.8.0-rc0 h1:rjEwN0Im1+2QChr8uPfbdomhGL3lEmGlt0cPUodc5JU= -github.com/celestiaorg/celestia-app v1.8.0-rc0/go.mod h1:z2H47Gs9gYd3GdQ22d5sbcL8/aBMRcVDtUWT64goMaY= +github.com/celestiaorg/celestia-app v1.8.0 h1:kvIxuUoEkVjo1ax+xUn0SUHLB6Qc+K9uV5ZK83x+gpU= +github.com/celestiaorg/celestia-app v1.8.0/go.mod h1:a4yD4A691nNcjuwy3KJt3fBf+rD1/KE6BGOtZ574gGw= github.com/celestiaorg/celestia-core v1.35.0-tm-v0.34.29 h1:sXERzNXgyHyqTKNQx4S29C/NMDzgav62DaQDNF49HUQ= github.com/celestiaorg/celestia-core v1.35.0-tm-v0.34.29/go.mod h1:weZR4wYx1Vcw3g1Jc5G8VipG4M+KUDSqeIzyyWszmsQ= github.com/celestiaorg/cosmos-sdk v1.20.1-sdk-v0.46.16 h1:9U9UthIJSOyVjabD5PkD6aczvqlWOyAFTOXw0duPT5k= @@ -492,8 +492,8 @@ github.com/cosmos/gorocksdb v1.2.0 h1:d0l3jJG8M4hBouIZq0mDUHZ+zjOx044J3nGRskwTb4 github.com/cosmos/gorocksdb v1.2.0/go.mod h1:aaKvKItm514hKfNJpUJXnnOWeBnk2GL4+Qw9NHizILw= github.com/cosmos/iavl v0.19.6 h1:XY78yEeNPrEYyNCKlqr9chrwoeSDJ0bV2VjocTk//OU= github.com/cosmos/iavl v0.19.6/go.mod h1:X9PKD3J0iFxdmgNLa7b2LYWdsGd90ToV5cAONApkEPw= -github.com/cosmos/ibc-go/v6 v6.2.0 h1:HKS5WNxQrlmjowHb73J9LqlNJfvTnvkbhXZ9QzNTU7Q= -github.com/cosmos/ibc-go/v6 v6.2.0/go.mod h1:+S3sxcNwOhgraYDJAhIFDg5ipXHaUnJrg7tOQqGyWlc= +github.com/cosmos/ibc-go/v6 v6.3.0 h1:2EkkqDEd9hTQvzB/BsPhYZsu7T/dzAVA8+VD2UuJLSQ= +github.com/cosmos/ibc-go/v6 v6.3.0/go.mod h1:Dm14j9s094bGyCEE8W4fD+2t8IneHv+cz+80Mvwjr1w= github.com/cosmos/ledger-cosmos-go v0.13.2 h1:aY0KZSmUwNKbBm9OvbIjvf7Ozz2YzzpAbgvN2C8x2T0= github.com/cosmos/ledger-cosmos-go v0.13.2/go.mod h1:HENcEP+VtahZFw38HZ3+LS3Iv5XV6svsnkk9vdJtLr8= github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= diff --git a/nodebuilder/node/metrics.go b/nodebuilder/node/metrics.go index 560df808e6..5df847b916 100644 --- a/nodebuilder/node/metrics.go +++ b/nodebuilder/node/metrics.go @@ -4,11 +4,15 @@ import ( "context" "time" + logging "github.com/ipfs/go-log/v2" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + "go.uber.org/fx" ) +var log = logging.Logger("module/node") + var meter = otel.Meter("node") var ( @@ -17,7 +21,7 @@ var ( ) // WithMetrics registers node metrics. -func WithMetrics() error { +func WithMetrics(lc fx.Lifecycle) error { nodeStartTS, err := meter.Int64ObservableGauge( "node_start_ts", metric.WithDescription("timestamp when the node was started"), @@ -66,7 +70,18 @@ func WithMetrics() error { return nil } - _, err = meter.RegisterCallback(callback, nodeStartTS, totalNodeRunTime, buildInfoGauge) + clientReg, err := meter.RegisterCallback(callback, nodeStartTS, totalNodeRunTime, buildInfoGauge) + if err != nil { + return nil + } - return err + lc.Append( + fx.Hook{OnStop: func(context.Context) error { + if err := clientReg.Unregister(); err != nil { + log.Warn("failed to close metrics", "err", err) + } + return nil + }}, + ) + return nil } diff --git a/nodebuilder/settings.go b/nodebuilder/settings.go index 72a0b7c960..b9534eb004 100644 --- a/nodebuilder/settings.go +++ b/nodebuilder/settings.go @@ -86,11 +86,11 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti baseComponents := fx.Options( fx.Supply(metricOpts), fx.Invoke(initializeMetrics), - fx.Invoke(func(ca *state.CoreAccessor) { + fx.Invoke(func(lc fx.Lifecycle, ca *state.CoreAccessor) { if ca == nil { return } - state.WithMetrics(ca) + state.WithMetrics(lc, ca) }), fx.Invoke(fraud.WithMetrics[*header.ExtendedHeader]), fx.Invoke(node.WithMetrics), diff --git a/share/eds/byzantine/bad_encoding.go b/share/eds/byzantine/bad_encoding.go index fbb6b592ea..cbcaaaf075 100644 --- a/share/eds/byzantine/bad_encoding.go +++ b/share/eds/byzantine/bad_encoding.go @@ -12,7 +12,6 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/share" pb "github.com/celestiaorg/celestia-node/share/eds/byzantine/pb" - "github.com/celestiaorg/celestia-node/share/ipld" ) const ( @@ -139,34 +138,27 @@ func (p *BadEncodingProof) Validate(hdr *header.ExtendedHeader) error { ) } - // merkleRoots are the roots against which we are going to check the inclusion of the received - // shares. Changing the order of the roots to prove the shares relative to the orthogonal axis, - // because inside the rsmt2d library rsmt2d.Row = 0 and rsmt2d.Col = 1 - merkleRoots := hdr.DAH.RowRoots - if p.Axis == rsmt2d.Row { - merkleRoots = hdr.DAH.ColumnRoots - } - - if int(p.Index) >= len(merkleRoots) { + width := len(hdr.DAH.RowRoots) + if int(p.Index) >= width { log.Debugf("%s:%s (%d >= %d)", - invalidProofPrefix, errIncorrectIndex, int(p.Index), len(merkleRoots), + invalidProofPrefix, errIncorrectIndex, int(p.Index), width, ) return errIncorrectIndex } - if len(p.Shares) != len(merkleRoots) { + if len(p.Shares) != width { // Since p.Shares should contain all the shares from either a row or a // column, it should exactly match the number of row roots. In this // context, the number of row roots is the width of the extended data // square. log.Infof("%s: %s (%d >= %d)", - invalidProofPrefix, errIncorrectAmountOfShares, int(p.Index), len(merkleRoots), + invalidProofPrefix, errIncorrectAmountOfShares, int(p.Index), width, ) return errIncorrectAmountOfShares } - odsWidth := uint64(len(merkleRoots) / 2) - amount := uint64(0) + odsWidth := width / 2 + var amount int for _, share := range p.Shares { if share == nil { continue @@ -184,19 +176,17 @@ func (p *BadEncodingProof) Validate(hdr *header.ExtendedHeader) error { } // verify that Merkle proofs correspond to particular shares. - shares := make([][]byte, len(merkleRoots)) + shares := make([][]byte, width) for index, shr := range p.Shares { if shr == nil { continue } // validate inclusion of the share into one of the DAHeader roots - if ok := shr.Validate(ipld.MustCidFromNamespacedSha256(merkleRoots[index])); !ok { + if ok := shr.Validate(hdr.DAH, p.Axis, int(p.Index), index); !ok { log.Debugf("%s: %s at index %d", invalidProofPrefix, errIncorrectShare, index) return errIncorrectShare } - // NMTree commits the additional namespace while rsmt2d does not know about, so we trim it - // this is ugliness from NMTWrapper that we have to embrace ¯\_(ツ)_/¯ - shares[index] = share.GetData(shr.Share) + shares[index] = shr.Share } codec := share.DefaultRSMT2DCodec() @@ -220,7 +210,7 @@ func (p *BadEncodingProof) Validate(hdr *header.ExtendedHeader) error { } copy(rebuiltShares[odsWidth:], rebuiltExtendedShares) - tree := wrapper.NewErasuredNamespacedMerkleTree(odsWidth, uint(p.Index)) + tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(odsWidth), uint(p.Index)) for _, share := range rebuiltShares { err = tree.Push(share) if err != nil { diff --git a/share/eds/byzantine/bad_encoding_test.go b/share/eds/byzantine/bad_encoding_test.go index 470d792cbe..46f89f3e40 100644 --- a/share/eds/byzantine/bad_encoding_test.go +++ b/share/eds/byzantine/bad_encoding_test.go @@ -40,7 +40,7 @@ func TestBEFP_Validate(t *testing.T) { err = square.Repair(dah.RowRoots, dah.ColumnRoots) require.ErrorAs(t, err, &errRsmt2d) - byzantine := NewErrByzantine(ctx, bServ, &dah, errRsmt2d) + byzantine := NewErrByzantine(ctx, bServ.Blockstore(), &dah, errRsmt2d) var errByz *ErrByzantine require.ErrorAs(t, byzantine, &errByz) @@ -70,7 +70,7 @@ func TestBEFP_Validate(t *testing.T) { err = ipld.ImportEDS(ctx, validSquare, bServ) require.NoError(t, err) validShares := validSquare.Flattened() - errInvalidByz := NewErrByzantine(ctx, bServ, &validDah, + errInvalidByz := NewErrByzantine(ctx, bServ.Blockstore(), &validDah, &rsmt2d.ErrByzantineData{ Axis: rsmt2d.Row, Index: 0, @@ -92,7 +92,7 @@ func TestBEFP_Validate(t *testing.T) { // break the first shareWithProof to test negative case sh := sharetest.RandShares(t, 2) nmtProof := nmt.NewInclusionProof(0, 1, nil, false) - befp.Shares[0] = &ShareWithProof{sh[0], &nmtProof} + befp.Shares[0] = &ShareWithProof{sh[0], &nmtProof, rsmt2d.Row} return proof.Validate(&header.ExtendedHeader{DAH: &dah}) }, expectedResult: func(err error) { @@ -170,16 +170,17 @@ func TestIncorrectBadEncodingFraudProof(t *testing.T) { require.NoError(t, err) // get an arbitrary row - row := uint(squareSize / 2) - rowShares := eds.Row(row) - rowRoot := dah.RowRoots[row] - - shareProofs, err := GetProofsForShares(ctx, bServ, ipld.MustCidFromNamespacedSha256(rowRoot), rowShares) - require.NoError(t, err) + rowIdx := squareSize / 2 + shareProofs := make([]*ShareWithProof, 0, eds.Width()) + for i := range shareProofs { + proof, err := GetShareWithProof(ctx, bServ, dah, shares[i], rsmt2d.Row, rowIdx, i) + require.NoError(t, err) + shareProofs = append(shareProofs, proof) + } // create a fake error for data that was encoded correctly fakeError := ErrByzantine{ - Index: uint32(row), + Index: uint32(rowIdx), Shares: shareProofs, Axis: rsmt2d.Row, } @@ -202,7 +203,7 @@ func TestIncorrectBadEncodingFraudProof(t *testing.T) { } func TestBEFP_ValidateOutOfOrderShares(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) size := 4 @@ -220,9 +221,6 @@ func TestBEFP_ValidateOutOfOrderShares(t *testing.T) { ) require.NoError(t, err, "failure to recompute the extended data square") - err = batchAddr.Commit() - require.NoError(t, err) - dah, err := da.NewDataAvailabilityHeader(eds) require.NoError(t, err) @@ -230,7 +228,10 @@ func TestBEFP_ValidateOutOfOrderShares(t *testing.T) { err = eds.Repair(dah.RowRoots, dah.ColumnRoots) require.ErrorAs(t, err, &errRsmt2d) - byzantine := NewErrByzantine(ctx, bServ, &dah, errRsmt2d) + err = batchAddr.Commit() + require.NoError(t, err) + + byzantine := NewErrByzantine(ctx, bServ.Blockstore(), &dah, errRsmt2d) var errByz *ErrByzantine require.ErrorAs(t, byzantine, &errByz) diff --git a/share/eds/byzantine/byzantine.go b/share/eds/byzantine/byzantine.go index d20b56deed..030453c31a 100644 --- a/share/eds/byzantine/byzantine.go +++ b/share/eds/byzantine/byzantine.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - "github.com/ipfs/boxo/blockservice" + "github.com/ipfs/boxo/blockstore" "github.com/celestiaorg/celestia-app/pkg/da" "github.com/celestiaorg/rsmt2d" @@ -31,50 +31,35 @@ func (e *ErrByzantine) Error() string { // If error happens during proof collection, it terminates the process with os.Exit(1). func NewErrByzantine( ctx context.Context, - bGetter blockservice.BlockGetter, + bStore blockstore.Blockstore, dah *da.DataAvailabilityHeader, errByz *rsmt2d.ErrByzantineData, ) error { - // changing the order to collect proofs against an orthogonal axis - roots := [][][]byte{ - dah.ColumnRoots, - dah.RowRoots, - }[errByz.Axis] - sharesWithProof := make([]*ShareWithProof, len(errByz.Shares)) - - type result struct { - share *ShareWithProof - index int - } - resultCh := make(chan *result) + bGetter := ipld.NewBlockservice(bStore, nil) + var count int for index, share := range errByz.Shares { - if share == nil { + if len(share) == 0 { + continue + } + swp, err := GetShareWithProof(ctx, bGetter, dah, share, errByz.Axis, int(errByz.Index), index) + if err != nil { + log.Warn("requesting proof failed", + "errByz", errByz, + "shareIndex", index, + "err", err) continue } - index := index - go func() { - share, err := getProofsAt( - ctx, bGetter, - ipld.MustCidFromNamespacedSha256(roots[index]), - int(errByz.Index), len(errByz.Shares), - ) - if err != nil { - log.Warn("requesting proof failed", "root", roots[index], "err", err) - return - } - resultCh <- &result{share, index} - }() + sharesWithProof[index] = swp + // it is enough to collect half of the shares to construct the befp + if count++; count >= len(dah.RowRoots)/2 { + break + } } - for i := 0; i < len(dah.RowRoots)/2; i++ { - select { - case t := <-resultCh: - sharesWithProof[t.index] = t.share - case <-ctx.Done(): - return ipld.ErrNodeNotFound - } + if count < len(dah.RowRoots)/2 { + return fmt.Errorf("failed to collect proof") } return &ErrByzantine{ diff --git a/share/eds/byzantine/pb/share.pb.go b/share/eds/byzantine/pb/share.pb.go index 4186eabc64..af79d48b13 100644 --- a/share/eds/byzantine/pb/share.pb.go +++ b/share/eds/byzantine/pb/share.pb.go @@ -49,8 +49,9 @@ func (Axis) EnumDescriptor() ([]byte, []int) { } type Share struct { - Data []byte `protobuf:"bytes,1,opt,name=Data,proto3" json:"Data,omitempty"` - Proof *pb.Proof `protobuf:"bytes,2,opt,name=Proof,proto3" json:"Proof,omitempty"` + Data []byte `protobuf:"bytes,1,opt,name=Data,proto3" json:"Data,omitempty"` + Proof *pb.Proof `protobuf:"bytes,2,opt,name=Proof,proto3" json:"Proof,omitempty"` + ProofAxis Axis `protobuf:"varint,3,opt,name=ProofAxis,proto3,enum=share.eds.byzantine.pb.Axis" json:"ProofAxis,omitempty"` } func (m *Share) Reset() { *m = Share{} } @@ -100,6 +101,13 @@ func (m *Share) GetProof() *pb.Proof { return nil } +func (m *Share) GetProofAxis() Axis { + if m != nil { + return m.ProofAxis + } + return Axis_ROW +} + type BadEncoding struct { HeaderHash []byte `protobuf:"bytes,1,opt,name=HeaderHash,proto3" json:"HeaderHash,omitempty"` Height uint64 `protobuf:"varint,2,opt,name=Height,proto3" json:"Height,omitempty"` @@ -187,26 +195,27 @@ func init() { } var fileDescriptor_d28ce8f160a920d1 = []byte{ - // 297 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0xc1, 0x4a, 0xec, 0x30, - 0x14, 0x86, 0x9b, 0x3b, 0xed, 0x5c, 0x38, 0xa3, 0xe3, 0x10, 0x64, 0x08, 0xa2, 0xa1, 0x0c, 0x08, - 0xc5, 0x45, 0x2a, 0x15, 0x1f, 0xc0, 0xaa, 0x50, 0x41, 0x18, 0x89, 0x0b, 0xd7, 0x89, 0x89, 0xd3, - 0x6e, 0xda, 0xd2, 0x74, 0x51, 0x7d, 0x0a, 0x1f, 0xca, 0x85, 0xcb, 0x59, 0xba, 0x94, 0xf6, 0x45, - 0xa4, 0x69, 0x11, 0x17, 0xba, 0x3b, 0xdf, 0x9f, 0x2f, 0x3f, 0xc9, 0x81, 0x95, 0x49, 0x45, 0xa5, - 0x43, 0xad, 0x4c, 0x28, 0x9f, 0x5f, 0x44, 0x5e, 0x67, 0xb9, 0x0e, 0x4b, 0x19, 0xda, 0x98, 0x95, - 0x55, 0x51, 0x17, 0x78, 0x39, 0x80, 0x56, 0x86, 0x7d, 0x3b, 0xac, 0x94, 0x07, 0xf3, 0x52, 0x86, - 0x65, 0x55, 0x14, 0x4f, 0x83, 0xb7, 0x8a, 0xc1, 0xbb, 0xef, 0x4d, 0x8c, 0xc1, 0xbd, 0x12, 0xb5, - 0x20, 0xc8, 0x47, 0xc1, 0x0e, 0xb7, 0x33, 0x3e, 0x06, 0xef, 0xae, 0x77, 0xc9, 0x3f, 0x1f, 0x05, - 0xb3, 0x68, 0x8f, 0x8d, 0x37, 0x25, 0xb3, 0x31, 0x1f, 0x4e, 0x57, 0x6f, 0x08, 0x66, 0xb1, 0x50, - 0xd7, 0xf9, 0x63, 0xa1, 0xb2, 0x7c, 0x83, 0x29, 0x40, 0xa2, 0x85, 0xd2, 0x55, 0x22, 0x4c, 0x3a, - 0x16, 0xfe, 0x48, 0xf0, 0x12, 0xa6, 0x89, 0xce, 0x36, 0x69, 0x6d, 0x7b, 0x5d, 0x3e, 0x12, 0x3e, - 0x87, 0xa9, 0x7d, 0x8b, 0x21, 0x13, 0x7f, 0x12, 0xcc, 0xa2, 0x23, 0xf6, 0xfb, 0x27, 0x98, 0xb5, - 0xf8, 0x28, 0xe3, 0x7d, 0xf0, 0x6e, 0x72, 0xa5, 0x1b, 0xe2, 0xfa, 0x28, 0xd8, 0xe5, 0x03, 0xe0, - 0x53, 0x70, 0x2f, 0x9a, 0xcc, 0x10, 0xcf, 0x47, 0xc1, 0x3c, 0x3a, 0xfc, 0xab, 0x4a, 0x34, 0x99, - 0xe1, 0xd6, 0x3c, 0x21, 0xe0, 0xf6, 0x84, 0xff, 0xc3, 0x84, 0xaf, 0x1f, 0x16, 0x4e, 0x3f, 0x5c, - 0xae, 0x6f, 0x17, 0x28, 0x26, 0xef, 0x2d, 0x45, 0xdb, 0x96, 0xa2, 0xcf, 0x96, 0xa2, 0xd7, 0x8e, - 0x3a, 0xdb, 0x8e, 0x3a, 0x1f, 0x1d, 0x75, 0xe4, 0xd4, 0x6e, 0xf1, 0xec, 0x2b, 0x00, 0x00, 0xff, - 0xff, 0xb1, 0x96, 0xb9, 0xbe, 0x93, 0x01, 0x00, 0x00, + // 310 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0x31, 0x4b, 0xc4, 0x30, + 0x1c, 0xc5, 0x1b, 0xaf, 0x3d, 0xf1, 0x7f, 0x7a, 0x1e, 0x41, 0x8e, 0x20, 0x1a, 0xca, 0x81, 0x50, + 0x1c, 0x52, 0x39, 0x71, 0x71, 0xf3, 0x54, 0x38, 0x41, 0x38, 0x89, 0x83, 0x73, 0x62, 0xe2, 0xb5, + 0x4b, 0x5b, 0x9a, 0x0e, 0xd5, 0xc1, 0xcf, 0xe0, 0x87, 0x72, 0x70, 0xbc, 0xd1, 0x51, 0xda, 0x2f, + 0x22, 0x4d, 0x8b, 0x3a, 0x28, 0xb8, 0xbd, 0xf7, 0xf2, 0x0b, 0x79, 0x8f, 0xc0, 0xc4, 0x44, 0x22, + 0xd7, 0xa1, 0x56, 0x26, 0x94, 0x8f, 0x4f, 0x22, 0x29, 0xe2, 0x44, 0x87, 0x99, 0x0c, 0x6d, 0xcc, + 0xb2, 0x3c, 0x2d, 0x52, 0x3c, 0x6e, 0x8d, 0x56, 0x86, 0x7d, 0x31, 0x2c, 0x93, 0xbb, 0xc3, 0x4c, + 0x86, 0x59, 0x9e, 0xa6, 0x0f, 0x2d, 0x37, 0x79, 0x06, 0xef, 0xb6, 0x21, 0x31, 0x06, 0xf7, 0x42, + 0x14, 0x82, 0x20, 0x1f, 0x05, 0x9b, 0xdc, 0x6a, 0x7c, 0x00, 0xde, 0x4d, 0xc3, 0x92, 0x35, 0x1f, + 0x05, 0x83, 0xe9, 0x36, 0xeb, 0x6e, 0x4a, 0x66, 0x63, 0xde, 0x9e, 0xe2, 0x53, 0xd8, 0xb0, 0xe2, + 0xac, 0x8c, 0x0d, 0xe9, 0xf9, 0x28, 0x18, 0x4e, 0xf7, 0xd8, 0xef, 0xef, 0x33, 0x51, 0xc6, 0x86, + 0x7f, 0xe3, 0x93, 0x57, 0x04, 0x83, 0x99, 0x50, 0x97, 0xc9, 0x7d, 0xaa, 0xe2, 0x64, 0x89, 0x29, + 0xc0, 0x5c, 0x0b, 0xa5, 0xf3, 0xb9, 0x30, 0x51, 0x57, 0xe6, 0x47, 0x82, 0xc7, 0xd0, 0x9f, 0xeb, + 0x78, 0x19, 0x15, 0xb6, 0x93, 0xcb, 0x3b, 0x87, 0x4f, 0xa0, 0x6f, 0x77, 0x34, 0x05, 0x7a, 0xc1, + 0x60, 0xba, 0xff, 0x57, 0x01, 0x4b, 0xf1, 0x0e, 0xc6, 0x3b, 0xe0, 0x5d, 0x25, 0x4a, 0x97, 0xc4, + 0xf5, 0x51, 0xb0, 0xc5, 0x5b, 0x83, 0x8f, 0xc0, 0xb5, 0x5b, 0xbc, 0x7f, 0x6c, 0xb1, 0xe4, 0x21, + 0x01, 0xb7, 0x71, 0x78, 0x1d, 0x7a, 0x7c, 0x71, 0x37, 0x72, 0x1a, 0x71, 0xbe, 0xb8, 0x1e, 0xa1, + 0x19, 0x79, 0xab, 0x28, 0x5a, 0x55, 0x14, 0x7d, 0x54, 0x14, 0xbd, 0xd4, 0xd4, 0x59, 0xd5, 0xd4, + 0x79, 0xaf, 0xa9, 0x23, 0xfb, 0xf6, 0x07, 0x8e, 0x3f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x0f, 0x5d, + 0x8d, 0x6c, 0xcf, 0x01, 0x00, 0x00, } func (m *Share) Marshal() (dAtA []byte, err error) { @@ -229,6 +238,11 @@ func (m *Share) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.ProofAxis != 0 { + i = encodeVarintShare(dAtA, i, uint64(m.ProofAxis)) + i-- + dAtA[i] = 0x18 + } if m.Proof != nil { { size, err := m.Proof.MarshalToSizedBuffer(dAtA[:i]) @@ -335,6 +349,9 @@ func (m *Share) Size() (n int) { l = m.Proof.Size() n += 1 + l + sovShare(uint64(l)) } + if m.ProofAxis != 0 { + n += 1 + sovShare(uint64(m.ProofAxis)) + } return n } @@ -471,6 +488,25 @@ func (m *Share) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ProofAxis", wireType) + } + m.ProofAxis = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowShare + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ProofAxis |= Axis(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipShare(dAtA[iNdEx:]) diff --git a/share/eds/byzantine/pb/share.proto b/share/eds/byzantine/pb/share.proto index 33e3dae2c2..52abefe7a2 100644 --- a/share/eds/byzantine/pb/share.proto +++ b/share/eds/byzantine/pb/share.proto @@ -6,6 +6,7 @@ import "pb/proof.proto"; message Share { bytes Data = 1; proof.pb.Proof Proof = 2; + axis ProofAxis = 3; } enum axis { diff --git a/share/eds/byzantine/share_proof.go b/share/eds/byzantine/share_proof.go index ae830c0391..a389ec0494 100644 --- a/share/eds/byzantine/share_proof.go +++ b/share/eds/byzantine/share_proof.go @@ -2,6 +2,8 @@ package byzantine import ( "context" + "errors" + "math" "github.com/ipfs/boxo/blockservice" "github.com/ipfs/go-cid" @@ -9,6 +11,7 @@ import ( "github.com/celestiaorg/nmt" nmt_pb "github.com/celestiaorg/nmt/pb" + "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/share" pb "github.com/celestiaorg/celestia-node/share/eds/byzantine/pb" @@ -23,31 +26,30 @@ type ShareWithProof struct { share.Share // Proof is a Merkle Proof of current share Proof *nmt.Proof + // Axis is a proof axis + Axis rsmt2d.Axis } -// NewShareWithProof takes the given leaf and its path, starting from the tree root, -// and computes the nmt.Proof for it. -func NewShareWithProof(index int, share share.Share, pathToLeaf []cid.Cid) *ShareWithProof { - rangeProofs := make([][]byte, 0, len(pathToLeaf)) - for i := len(pathToLeaf) - 1; i >= 0; i-- { - node := ipld.NamespacedSha256FromCID(pathToLeaf[i]) - rangeProofs = append(rangeProofs, node) +// Validate validates inclusion of the share under the given root CID. +func (s *ShareWithProof) Validate(dah *share.Root, axisType rsmt2d.Axis, axisIdx, shrIdx int) bool { + var rootHash []byte + switch axisType { + case rsmt2d.Row: + rootHash = rootHashForCoordinates(dah, s.Axis, shrIdx, axisIdx) + case rsmt2d.Col: + rootHash = rootHashForCoordinates(dah, s.Axis, axisIdx, shrIdx) } - proof := nmt.NewInclusionProof(index, index+1, rangeProofs, true) - return &ShareWithProof{ - share, - &proof, + edsSize := len(dah.RowRoots) + isParity := shrIdx >= edsSize/2 || axisIdx >= edsSize/2 + namespace := share.ParitySharesNamespace + if !isParity { + namespace = share.GetNamespace(s.Share) } -} - -// Validate validates inclusion of the share under the given root CID. -func (s *ShareWithProof) Validate(root cid.Cid) bool { return s.Proof.VerifyInclusion( share.NewSHA256Hasher(), - share.GetNamespace(s.Share).ToNMT(), - [][]byte{share.GetData(s.Share)}, - ipld.NamespacedSha256FromCID(root), + namespace.ToNMT(), + [][]byte{s.Share}, ) } @@ -65,28 +67,54 @@ func (s *ShareWithProof) ShareWithProofToProto() *pb.Share { LeafHash: s.Proof.LeafHash(), IsMaxNamespaceIgnored: s.Proof.IsMaxNamespaceIDIgnored(), }, + ProofAxis: pb.Axis(s.Axis), } } -// GetProofsForShares fetches Merkle proofs for the given shares -// and returns the result as an array of ShareWithProof. -func GetProofsForShares( +// GetShareWithProof attempts to get a share with proof for the given share. It first tries to get a row proof +// and if that fails or proof is invalid, it tries to get a column proof. +func GetShareWithProof( ctx context.Context, bGetter blockservice.BlockGetter, - root cid.Cid, - shares [][]byte, -) ([]*ShareWithProof, error) { - proofs := make([]*ShareWithProof, len(shares)) - for index, share := range shares { - if share != nil { - proof, err := getProofsAt(ctx, bGetter, root, index, len(shares)) - if err != nil { - return nil, err - } - proofs[index] = proof + dah *share.Root, + share share.Share, + axisType rsmt2d.Axis, axisIdx, shrIdx int, +) (*ShareWithProof, error) { + if axisType == rsmt2d.Col { + axisIdx, shrIdx, axisType = shrIdx, axisIdx, rsmt2d.Row + } + width := len(dah.RowRoots) + // try row proofs + root := dah.RowRoots[axisIdx] + rootCid := ipld.MustCidFromNamespacedSha256(root) + proof, err := getProofsAt(ctx, bGetter, rootCid, shrIdx, width) + if err == nil { + shareWithProof := &ShareWithProof{ + Share: share, + Proof: &proof, + Axis: rsmt2d.Row, + } + if shareWithProof.Validate(dah, axisType, axisIdx, shrIdx) { + return shareWithProof, nil } } - return proofs, nil + + // try column proofs + root = dah.ColumnRoots[shrIdx] + rootCid = ipld.MustCidFromNamespacedSha256(root) + proof, err = getProofsAt(ctx, bGetter, rootCid, axisIdx, width) + if err != nil { + return nil, err + } + shareWithProof := &ShareWithProof{ + Share: share, + Proof: &proof, + Axis: rsmt2d.Col, + } + if shareWithProof.Validate(dah, axisType, axisIdx, shrIdx) { + return shareWithProof, nil + } + return nil, errors.New("failed to collect proof") } func getProofsAt( @@ -95,20 +123,20 @@ func getProofsAt( root cid.Cid, index, total int, -) (*ShareWithProof, error) { - proof := make([]cid.Cid, 0) - // TODO(@vgonkivs): Combine GetLeafData and GetProof in one function as the are traversing the same - // tree. Add options that will control what data will be fetched. - node, err := ipld.GetLeaf(ctx, bGetter, root, index, total) +) (nmt.Proof, error) { + proofPath := make([]cid.Cid, 0, int(math.Sqrt(float64(total)))) + proofPath, err := ipld.GetProof(ctx, bGetter, root, proofPath, index, total) if err != nil { - return nil, err + return nmt.Proof{}, err } - proof, err = ipld.GetProof(ctx, bGetter, root, proof, index, total) - if err != nil { - return nil, err + rangeProofs := make([][]byte, 0, len(proofPath)) + for i := len(proofPath) - 1; i >= 0; i-- { + node := ipld.NamespacedSha256FromCID(proofPath[i]) + rangeProofs = append(rangeProofs, node) } - return NewShareWithProof(index, node.RawData(), proof), nil + + return nmt.NewInclusionProof(index, index+1, rangeProofs, true), nil } func ProtoToShare(protoShares []*pb.Share) []*ShareWithProof { @@ -118,7 +146,11 @@ func ProtoToShare(protoShares []*pb.Share) []*ShareWithProof { continue } proof := ProtoToProof(share.Proof) - shares[i] = &ShareWithProof{share.Data, &proof} + shares[i] = &ShareWithProof{ + Share: share.Data, + Proof: &proof, + Axis: rsmt2d.Axis(share.ProofAxis), + } } return shares } @@ -131,3 +163,10 @@ func ProtoToProof(protoProof *nmt_pb.Proof) nmt.Proof { protoProof.IsMaxNamespaceIgnored, ) } + +func rootHashForCoordinates(r *share.Root, axisType rsmt2d.Axis, x, y int) []byte { + if axisType == rsmt2d.Row { + return r.RowRoots[y] + } + return r.ColumnRoots[x] +} diff --git a/share/eds/byzantine/share_proof_test.go b/share/eds/byzantine/share_proof_test.go index a9021d806d..170ba591e2 100644 --- a/share/eds/byzantine/share_proof_test.go +++ b/share/eds/byzantine/share_proof_test.go @@ -2,21 +2,21 @@ package byzantine import ( "context" - "strconv" "testing" "time" - "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" "github.com/celestiaorg/celestia-app/pkg/da" + "github.com/celestiaorg/rsmt2d" + "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/ipld" "github.com/celestiaorg/celestia-node/share/sharetest" ) func TestGetProof(t *testing.T) { - const width = 4 + const width = 8 ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() @@ -28,56 +28,36 @@ func TestGetProof(t *testing.T) { dah, err := da.NewDataAvailabilityHeader(in) require.NoError(t, err) - var tests = []struct { - roots [][]byte - }{ - {dah.RowRoots}, - {dah.ColumnRoots}, - } - for i, tt := range tests { - t.Run(strconv.Itoa(i), func(t *testing.T) { - for _, root := range tt.roots { - rootCid := ipld.MustCidFromNamespacedSha256(root) - for index := 0; uint(index) < in.Width(); index++ { - proof := make([]cid.Cid, 0) - proof, err = ipld.GetProof(ctx, bServ, rootCid, proof, index, int(in.Width())) - require.NoError(t, err) - node, err := ipld.GetLeaf(ctx, bServ, rootCid, index, int(in.Width())) - require.NoError(t, err) - inclusion := NewShareWithProof(index, node.RawData(), proof) - require.True(t, inclusion.Validate(rootCid)) + for _, proofType := range []rsmt2d.Axis{rsmt2d.Row, rsmt2d.Col} { + var roots [][]byte + switch proofType { + case rsmt2d.Row: + roots = dah.RowRoots + case rsmt2d.Col: + roots = dah.ColumnRoots + } + for axisIdx := 0; axisIdx < width*2; axisIdx++ { + rootCid := ipld.MustCidFromNamespacedSha256(roots[axisIdx]) + for shrIdx := 0; shrIdx < width*2; shrIdx++ { + proof, err := getProofsAt(ctx, bServ, rootCid, shrIdx, int(in.Width())) + require.NoError(t, err) + node, err := ipld.GetLeaf(ctx, bServ, rootCid, shrIdx, int(in.Width())) + require.NoError(t, err) + inclusion := &ShareWithProof{ + Share: share.GetData(node.RawData()), + Proof: &proof, + Axis: proofType, + } + require.True(t, inclusion.Validate(&dah, proofType, axisIdx, shrIdx)) + // swap axis indexes to test if validation still works against the orthogonal coordinate + switch proofType { + case rsmt2d.Row: + require.True(t, inclusion.Validate(&dah, rsmt2d.Col, shrIdx, axisIdx)) + case rsmt2d.Col: + require.True(t, inclusion.Validate(&dah, rsmt2d.Row, shrIdx, axisIdx)) } } - }) - } -} - -func TestGetProofs(t *testing.T) { - const width = 4 - ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) - defer cancel() - bServ := ipld.NewMemBlockservice() - - shares := sharetest.RandShares(t, width*width) - in, err := ipld.AddShares(ctx, shares, bServ) - require.NoError(t, err) - - dah, err := da.NewDataAvailabilityHeader(in) - require.NoError(t, err) - for _, root := range dah.ColumnRoots { - rootCid := ipld.MustCidFromNamespacedSha256(root) - data := make([][]byte, 0, in.Width()) - for index := 0; uint(index) < in.Width(); index++ { - node, err := ipld.GetLeaf(ctx, bServ, rootCid, index, int(in.Width())) - require.NoError(t, err) - data = append(data, node.RawData()[9:]) - } - - proves, err := GetProofsForShares(ctx, bServ, rootCid, data) - require.NoError(t, err) - for _, proof := range proves { - require.True(t, proof.Validate(rootCid)) } } } diff --git a/share/eds/cache/accessor_cache.go b/share/eds/cache/accessor_cache.go index 6f937818f8..a45c2542f8 100644 --- a/share/eds/cache/accessor_cache.go +++ b/share/eds/cache/accessor_cache.go @@ -222,10 +222,13 @@ func (bc *AccessorCache) Remove(key shard.Key) error { } // EnableMetrics enables metrics for the cache. -func (bc *AccessorCache) EnableMetrics() error { +func (bc *AccessorCache) EnableMetrics() (CloseMetricsFn, error) { var err error bc.metrics, err = newMetrics(bc) - return err + if err != nil { + return nil, err + } + return bc.metrics.close, err } // refCloser manages references to accessor from provided reader and removes the ref, when the diff --git a/share/eds/cache/cache.go b/share/eds/cache/cache.go index 13e207d7c0..3ec3d2f279 100644 --- a/share/eds/cache/cache.go +++ b/share/eds/cache/cache.go @@ -20,6 +20,8 @@ var ( errCacheMiss = errors.New("accessor not found in blockstore cache") ) +type CloseMetricsFn func() error + // Cache is an interface that defines the basic Cache operations. type Cache interface { // Get retrieves an item from the Cache. @@ -37,7 +39,7 @@ type Cache interface { Remove(shard.Key) error // EnableMetrics enables metrics in Cache - EnableMetrics() error + EnableMetrics() (CloseMetricsFn, error) } // Accessor is a interface type returned by cache, that allows to read raw data by reader or create diff --git a/share/eds/cache/doublecache.go b/share/eds/cache/doublecache.go index a63eadee9e..a7f2a4871e 100644 --- a/share/eds/cache/doublecache.go +++ b/share/eds/cache/doublecache.go @@ -43,9 +43,20 @@ func (mc *DoubleCache) Second() Cache { return mc.second } -func (mc *DoubleCache) EnableMetrics() error { - if err := mc.first.EnableMetrics(); err != nil { - return err +func (mc *DoubleCache) EnableMetrics() (CloseMetricsFn, error) { + firstCloser, err := mc.first.EnableMetrics() + if err != nil { + return nil, err } - return mc.second.EnableMetrics() + secondCloser, err := mc.second.EnableMetrics() + if err != nil { + return nil, err + } + + return func() error { + if err := errors.Join(firstCloser(), secondCloser()); err != nil { + log.Warnw("failed to close metrics", "err", err) + } + return nil + }, nil } diff --git a/share/eds/cache/metrics.go b/share/eds/cache/metrics.go index 565a61a5e0..701b7e3a71 100644 --- a/share/eds/cache/metrics.go +++ b/share/eds/cache/metrics.go @@ -15,6 +15,8 @@ const ( type metrics struct { getCounter metric.Int64Counter evictedCounter metric.Int64Counter + + clientReg metric.Registration } func newMetrics(bc *AccessorCache) (*metrics, error) { @@ -43,12 +45,23 @@ func newMetrics(bc *AccessorCache) (*metrics, error) { observer.ObserveInt64(cacheSize, int64(bc.cache.Len())) return nil } - _, err = meter.RegisterCallback(callback, cacheSize) + clientReg, err := meter.RegisterCallback(callback, cacheSize) + if err != nil { + return nil, err + } return &metrics{ getCounter: getCounter, evictedCounter: evictedCounter, - }, err + clientReg: clientReg, + }, nil +} + +func (m *metrics) close() error { + if m == nil { + return nil + } + return m.clientReg.Unregister() } func (m *metrics) observeEvicted(failed bool) { diff --git a/share/eds/cache/noop.go b/share/eds/cache/noop.go index 0a1a39ec7e..8e1c17924a 100644 --- a/share/eds/cache/noop.go +++ b/share/eds/cache/noop.go @@ -28,8 +28,8 @@ func (n NoopCache) Remove(shard.Key) error { return nil } -func (n NoopCache) EnableMetrics() error { - return nil +func (n NoopCache) EnableMetrics() (CloseMetricsFn, error) { + return func() error { return nil }, nil } var _ Accessor = (*NoopAccessor)(nil) diff --git a/share/eds/metrics.go b/share/eds/metrics.go index 0fd6740154..1ce9fe459d 100644 --- a/share/eds/metrics.go +++ b/share/eds/metrics.go @@ -2,6 +2,7 @@ package eds import ( "context" + "errors" "time" "go.opentelemetry.io/otel" @@ -49,6 +50,9 @@ type metrics struct { longOpTime metric.Float64Histogram gcTime metric.Float64Histogram + + clientReg metric.Registration + closerFn func() error } func (s *Store) WithMetrics() error { @@ -124,7 +128,8 @@ func (s *Store) WithMetrics() error { return err } - if err = s.cache.Load().EnableMetrics(); err != nil { + closerFn, err := s.cache.Load().EnableMetrics() + if err != nil { return err } @@ -139,7 +144,8 @@ func (s *Store) WithMetrics() error { return nil } - if _, err := meter.RegisterCallback(callback, dagStoreShards); err != nil { + clientReg, err := meter.RegisterCallback(callback, dagStoreShards) + if err != nil { return err } @@ -155,10 +161,20 @@ func (s *Store) WithMetrics() error { shardFailureCount: shardFailureCount, longOpTime: longOpTime, gcTime: gcTime, + clientReg: clientReg, + closerFn: closerFn, } return nil } +func (m *metrics) close() error { + if m == nil { + return nil + } + + return errors.Join(m.closerFn(), m.clientReg.Unregister()) +} + func (m *metrics) observeGCtime(ctx context.Context, dur time.Duration, failed bool) { if m == nil { return diff --git a/share/eds/retriever.go b/share/eds/retriever.go index 9389d8b6b2..0a32713098 100644 --- a/share/eds/retriever.go +++ b/share/eds/retriever.go @@ -91,7 +91,7 @@ func (r *Retriever) Retrieve(ctx context.Context, dah *da.DataAvailabilityHeader // computed during the session ses.close(false) span.RecordError(err) - return nil, byzantine.NewErrByzantine(ctx, r.bServ, dah, errByz) + return nil, byzantine.NewErrByzantine(ctx, r.bServ.Blockstore(), dah, errByz) } log.Warnw("not enough shares to reconstruct data square, requesting more...", "err", err) diff --git a/share/eds/retriever_test.go b/share/eds/retriever_test.go index 95da345d17..6bf6b45900 100644 --- a/share/eds/retriever_test.go +++ b/share/eds/retriever_test.go @@ -206,7 +206,7 @@ func BenchmarkNewErrByzantineData(b *testing.B) { b.StartTimer() for i := 0; i < b.N; i++ { - err = byzantine.NewErrByzantine(ctx, bServ, h.DAH, errByz) + err = byzantine.NewErrByzantine(ctx, bServ.Blockstore(), h.DAH, errByz) require.NotNil(t, err) } }) diff --git a/share/eds/store.go b/share/eds/store.go index 816065909e..da26e16ef4 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -159,6 +159,11 @@ func (s *Store) Start(ctx context.Context) error { // Stop stops the underlying DAGStore. func (s *Store) Stop(context.Context) error { defer s.cancel() + + if err := s.metrics.close(); err != nil { + log.Warnw("failed to close metrics", "err", err) + } + if err := s.invertedIdx.close(); err != nil { return err } diff --git a/share/p2p/discovery/discovery.go b/share/p2p/discovery/discovery.go index 65925819d7..c5cbd88b68 100644 --- a/share/p2p/discovery/discovery.go +++ b/share/p2p/discovery/discovery.go @@ -114,7 +114,12 @@ func (d *Discovery) Start(context.Context) error { func (d *Discovery) Stop(context.Context) error { d.cancel() - return d.metrics.close() + + if err := d.metrics.close(); err != nil { + log.Warnw("failed to close metrics", "err", err) + } + + return nil } // Peers provides a list of discovered peers in the given topic. diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 0ae21ff015..ca85a85ea6 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -166,6 +166,10 @@ func (m *Manager) Start(startCtx context.Context) error { func (m *Manager) Stop(ctx context.Context) error { m.cancel() + if err := m.metrics.close(); err != nil { + log.Warnw("closing metrics", "err", err) + } + // we do not need to wait for headersub and disconnected peers to finish // here, since they were never started if m.headerSub == nil && m.shrexSub == nil { diff --git a/share/p2p/peers/metrics.go b/share/p2p/peers/metrics.go index 094d81a5e3..da52856425 100644 --- a/share/p2p/peers/metrics.go +++ b/share/p2p/peers/metrics.go @@ -68,6 +68,8 @@ type metrics struct { fullNodesPool metric.Int64ObservableGauge // attributes: pool_status blacklistedPeersByReason sync.Map blacklistedPeers metric.Int64ObservableGauge // attributes: blacklist_reason + + clientReg metric.Registration } func initMetrics(manager *Manager) (*metrics, error) { @@ -154,13 +156,20 @@ func initMetrics(manager *Manager) (*metrics, error) { }) return nil } - _, err = meter.RegisterCallback(callback, shrexPools, fullNodesPool, blacklisted) + metrics.clientReg, err = meter.RegisterCallback(callback, shrexPools, fullNodesPool, blacklisted) if err != nil { return nil, fmt.Errorf("registering metrics callback: %w", err) } return metrics, nil } +func (m *metrics) close() error { + if m == nil { + return nil + } + return m.clientReg.Unregister() +} + func (m *metrics) observeGetPeer( ctx context.Context, source peerSource, poolSize int, waitTime time.Duration, diff --git a/state/metrics.go b/state/metrics.go index 3672ef9b36..a9f0074176 100644 --- a/state/metrics.go +++ b/state/metrics.go @@ -5,11 +5,12 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" + "go.uber.org/fx" ) var meter = otel.Meter("state") -func WithMetrics(ca *CoreAccessor) { +func WithMetrics(lc fx.Lifecycle, ca *CoreAccessor) { pfbCounter, _ := meter.Int64ObservableCounter( "pfb_count", metric.WithDescription("Total count of submitted PayForBlob transactions"), @@ -24,8 +25,18 @@ func WithMetrics(ca *CoreAccessor) { observer.ObserveInt64(lastPfbTimestamp, ca.LastPayForBlob()) return nil } - _, err := meter.RegisterCallback(callback, pfbCounter, lastPfbTimestamp) + + clientReg, err := meter.RegisterCallback(callback, pfbCounter, lastPfbTimestamp) if err != nil { panic(err) } + + lc.Append(fx.Hook{ + OnStop: func(context.Context) error { + if err := clientReg.Unregister(); err != nil { + log.Warnw("failed to close metrics", "err", err) + } + return nil + }, + }) }