Skip to content

Commit

Permalink
Merge branch 'main' into refactor/sha-256
Browse files Browse the repository at this point in the history
  • Loading branch information
kien6034 committed Apr 23, 2024
2 parents 30c4b03 + 116af74 commit 8953faf
Show file tree
Hide file tree
Showing 26 changed files with 358 additions and 218 deletions.
5 changes: 5 additions & 0 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ func (d *DASer) Stop(ctx context.Context) error {
}

d.cancel()

if err := d.sampler.metrics.close(); err != nil {
log.Warnw("closing metrics", "err", err)
}

if err = d.sampler.wait(ctx); err != nil {
return fmt.Errorf("DASer force quit: %w", err)
}
Expand Down
11 changes: 10 additions & 1 deletion das/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type metrics struct {
newHead metric.Int64Counter

lastSampledTS uint64

clientReg metric.Registration
}

func (d *DASer) InitMetrics() error {
Expand Down Expand Up @@ -119,7 +121,7 @@ func (d *DASer) InitMetrics() error {
return nil
}

_, err = meter.RegisterCallback(callback,
d.sampler.metrics.clientReg, err = meter.RegisterCallback(callback,
lastSampledTS,
busyWorkers,
networkHead,
Expand All @@ -133,6 +135,13 @@ func (d *DASer) InitMetrics() error {
return nil
}

func (m *metrics) close() error {
if m == nil {
return nil
}
return m.clientReg.Unregister()
}

// observeSample records the time it took to sample a header +
// the amount of sampled contiguous headers
func (m *metrics) observeSample(
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/BurntSushi/toml v1.3.2
github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b
github.com/benbjohnson/clock v1.3.5
github.com/celestiaorg/celestia-app v1.8.0-rc0
github.com/celestiaorg/celestia-app v1.8.0
github.com/celestiaorg/go-fraud v0.2.1
github.com/celestiaorg/go-header v0.6.1
github.com/celestiaorg/go-libp2p-messenger v0.2.0
Expand Down Expand Up @@ -121,7 +121,7 @@ require (
github.com/cosmos/gogoproto v1.4.11 // indirect
github.com/cosmos/gorocksdb v1.2.0 // indirect
github.com/cosmos/iavl v0.19.6 // indirect
github.com/cosmos/ibc-go/v6 v6.2.0 // indirect
github.com/cosmos/ibc-go/v6 v6.3.0 // indirect
github.com/cosmos/ledger-cosmos-go v0.13.2 // indirect
github.com/crate-crypto/go-kzg-4844 v0.3.0 // indirect
github.com/creachadair/taskgroup v0.3.2 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7
github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/celestiaorg/celestia-app v1.8.0-rc0 h1:rjEwN0Im1+2QChr8uPfbdomhGL3lEmGlt0cPUodc5JU=
github.com/celestiaorg/celestia-app v1.8.0-rc0/go.mod h1:z2H47Gs9gYd3GdQ22d5sbcL8/aBMRcVDtUWT64goMaY=
github.com/celestiaorg/celestia-app v1.8.0 h1:kvIxuUoEkVjo1ax+xUn0SUHLB6Qc+K9uV5ZK83x+gpU=
github.com/celestiaorg/celestia-app v1.8.0/go.mod h1:a4yD4A691nNcjuwy3KJt3fBf+rD1/KE6BGOtZ574gGw=
github.com/celestiaorg/celestia-core v1.35.0-tm-v0.34.29 h1:sXERzNXgyHyqTKNQx4S29C/NMDzgav62DaQDNF49HUQ=
github.com/celestiaorg/celestia-core v1.35.0-tm-v0.34.29/go.mod h1:weZR4wYx1Vcw3g1Jc5G8VipG4M+KUDSqeIzyyWszmsQ=
github.com/celestiaorg/cosmos-sdk v1.20.1-sdk-v0.46.16 h1:9U9UthIJSOyVjabD5PkD6aczvqlWOyAFTOXw0duPT5k=
Expand Down Expand Up @@ -492,8 +492,8 @@ github.com/cosmos/gorocksdb v1.2.0 h1:d0l3jJG8M4hBouIZq0mDUHZ+zjOx044J3nGRskwTb4
github.com/cosmos/gorocksdb v1.2.0/go.mod h1:aaKvKItm514hKfNJpUJXnnOWeBnk2GL4+Qw9NHizILw=
github.com/cosmos/iavl v0.19.6 h1:XY78yEeNPrEYyNCKlqr9chrwoeSDJ0bV2VjocTk//OU=
github.com/cosmos/iavl v0.19.6/go.mod h1:X9PKD3J0iFxdmgNLa7b2LYWdsGd90ToV5cAONApkEPw=
github.com/cosmos/ibc-go/v6 v6.2.0 h1:HKS5WNxQrlmjowHb73J9LqlNJfvTnvkbhXZ9QzNTU7Q=
github.com/cosmos/ibc-go/v6 v6.2.0/go.mod h1:+S3sxcNwOhgraYDJAhIFDg5ipXHaUnJrg7tOQqGyWlc=
github.com/cosmos/ibc-go/v6 v6.3.0 h1:2EkkqDEd9hTQvzB/BsPhYZsu7T/dzAVA8+VD2UuJLSQ=
github.com/cosmos/ibc-go/v6 v6.3.0/go.mod h1:Dm14j9s094bGyCEE8W4fD+2t8IneHv+cz+80Mvwjr1w=
github.com/cosmos/ledger-cosmos-go v0.13.2 h1:aY0KZSmUwNKbBm9OvbIjvf7Ozz2YzzpAbgvN2C8x2T0=
github.com/cosmos/ledger-cosmos-go v0.13.2/go.mod h1:HENcEP+VtahZFw38HZ3+LS3Iv5XV6svsnkk9vdJtLr8=
github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk=
Expand Down
21 changes: 18 additions & 3 deletions nodebuilder/node/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import (
"context"
"time"

logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/fx"
)

var log = logging.Logger("module/node")

var meter = otel.Meter("node")

var (
Expand All @@ -17,7 +21,7 @@ var (
)

// WithMetrics registers node metrics.
func WithMetrics() error {
func WithMetrics(lc fx.Lifecycle) error {
nodeStartTS, err := meter.Int64ObservableGauge(
"node_start_ts",
metric.WithDescription("timestamp when the node was started"),
Expand Down Expand Up @@ -66,7 +70,18 @@ func WithMetrics() error {
return nil
}

_, err = meter.RegisterCallback(callback, nodeStartTS, totalNodeRunTime, buildInfoGauge)
clientReg, err := meter.RegisterCallback(callback, nodeStartTS, totalNodeRunTime, buildInfoGauge)
if err != nil {
return nil
}

return err
lc.Append(
fx.Hook{OnStop: func(context.Context) error {
if err := clientReg.Unregister(); err != nil {
log.Warn("failed to close metrics", "err", err)
}
return nil
}},
)
return nil
}
4 changes: 2 additions & 2 deletions nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti
baseComponents := fx.Options(
fx.Supply(metricOpts),
fx.Invoke(initializeMetrics),
fx.Invoke(func(ca *state.CoreAccessor) {
fx.Invoke(func(lc fx.Lifecycle, ca *state.CoreAccessor) {
if ca == nil {
return
}
state.WithMetrics(ca)
state.WithMetrics(lc, ca)
}),
fx.Invoke(fraud.WithMetrics[*header.ExtendedHeader]),
fx.Invoke(node.WithMetrics),
Expand Down
32 changes: 11 additions & 21 deletions share/eds/byzantine/bad_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share"
pb "github.com/celestiaorg/celestia-node/share/eds/byzantine/pb"
"github.com/celestiaorg/celestia-node/share/ipld"
)

const (
Expand Down Expand Up @@ -139,34 +138,27 @@ func (p *BadEncodingProof) Validate(hdr *header.ExtendedHeader) error {
)
}

// merkleRoots are the roots against which we are going to check the inclusion of the received
// shares. Changing the order of the roots to prove the shares relative to the orthogonal axis,
// because inside the rsmt2d library rsmt2d.Row = 0 and rsmt2d.Col = 1
merkleRoots := hdr.DAH.RowRoots
if p.Axis == rsmt2d.Row {
merkleRoots = hdr.DAH.ColumnRoots
}

if int(p.Index) >= len(merkleRoots) {
width := len(hdr.DAH.RowRoots)
if int(p.Index) >= width {
log.Debugf("%s:%s (%d >= %d)",
invalidProofPrefix, errIncorrectIndex, int(p.Index), len(merkleRoots),
invalidProofPrefix, errIncorrectIndex, int(p.Index), width,
)
return errIncorrectIndex
}

if len(p.Shares) != len(merkleRoots) {
if len(p.Shares) != width {
// Since p.Shares should contain all the shares from either a row or a
// column, it should exactly match the number of row roots. In this
// context, the number of row roots is the width of the extended data
// square.
log.Infof("%s: %s (%d >= %d)",
invalidProofPrefix, errIncorrectAmountOfShares, int(p.Index), len(merkleRoots),
invalidProofPrefix, errIncorrectAmountOfShares, int(p.Index), width,
)
return errIncorrectAmountOfShares
}

odsWidth := uint64(len(merkleRoots) / 2)
amount := uint64(0)
odsWidth := width / 2
var amount int
for _, share := range p.Shares {
if share == nil {
continue
Expand All @@ -184,19 +176,17 @@ func (p *BadEncodingProof) Validate(hdr *header.ExtendedHeader) error {
}

// verify that Merkle proofs correspond to particular shares.
shares := make([][]byte, len(merkleRoots))
shares := make([][]byte, width)
for index, shr := range p.Shares {
if shr == nil {
continue
}
// validate inclusion of the share into one of the DAHeader roots
if ok := shr.Validate(ipld.MustCidFromNamespacedSha256(merkleRoots[index])); !ok {
if ok := shr.Validate(hdr.DAH, p.Axis, int(p.Index), index); !ok {
log.Debugf("%s: %s at index %d", invalidProofPrefix, errIncorrectShare, index)
return errIncorrectShare
}
// NMTree commits the additional namespace while rsmt2d does not know about, so we trim it
// this is ugliness from NMTWrapper that we have to embrace ¯\_(ツ)_/¯
shares[index] = share.GetData(shr.Share)
shares[index] = shr.Share
}

codec := share.DefaultRSMT2DCodec()
Expand All @@ -220,7 +210,7 @@ func (p *BadEncodingProof) Validate(hdr *header.ExtendedHeader) error {
}
copy(rebuiltShares[odsWidth:], rebuiltExtendedShares)

tree := wrapper.NewErasuredNamespacedMerkleTree(odsWidth, uint(p.Index))
tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(odsWidth), uint(p.Index))
for _, share := range rebuiltShares {
err = tree.Push(share)
if err != nil {
Expand Down
31 changes: 16 additions & 15 deletions share/eds/byzantine/bad_encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestBEFP_Validate(t *testing.T) {
err = square.Repair(dah.RowRoots, dah.ColumnRoots)
require.ErrorAs(t, err, &errRsmt2d)

byzantine := NewErrByzantine(ctx, bServ, &dah, errRsmt2d)
byzantine := NewErrByzantine(ctx, bServ.Blockstore(), &dah, errRsmt2d)
var errByz *ErrByzantine
require.ErrorAs(t, byzantine, &errByz)

Expand Down Expand Up @@ -70,7 +70,7 @@ func TestBEFP_Validate(t *testing.T) {
err = ipld.ImportEDS(ctx, validSquare, bServ)
require.NoError(t, err)
validShares := validSquare.Flattened()
errInvalidByz := NewErrByzantine(ctx, bServ, &validDah,
errInvalidByz := NewErrByzantine(ctx, bServ.Blockstore(), &validDah,
&rsmt2d.ErrByzantineData{
Axis: rsmt2d.Row,
Index: 0,
Expand All @@ -92,7 +92,7 @@ func TestBEFP_Validate(t *testing.T) {
// break the first shareWithProof to test negative case
sh := sharetest.RandShares(t, 2)
nmtProof := nmt.NewInclusionProof(0, 1, nil, false)
befp.Shares[0] = &ShareWithProof{sh[0], &nmtProof}
befp.Shares[0] = &ShareWithProof{sh[0], &nmtProof, rsmt2d.Row}
return proof.Validate(&header.ExtendedHeader{DAH: &dah})
},
expectedResult: func(err error) {
Expand Down Expand Up @@ -170,16 +170,17 @@ func TestIncorrectBadEncodingFraudProof(t *testing.T) {
require.NoError(t, err)

// get an arbitrary row
row := uint(squareSize / 2)
rowShares := eds.Row(row)
rowRoot := dah.RowRoots[row]

shareProofs, err := GetProofsForShares(ctx, bServ, ipld.MustCidFromNamespacedSha256(rowRoot), rowShares)
require.NoError(t, err)
rowIdx := squareSize / 2
shareProofs := make([]*ShareWithProof, 0, eds.Width())
for i := range shareProofs {
proof, err := GetShareWithProof(ctx, bServ, dah, shares[i], rsmt2d.Row, rowIdx, i)
require.NoError(t, err)
shareProofs = append(shareProofs, proof)
}

// create a fake error for data that was encoded correctly
fakeError := ErrByzantine{
Index: uint32(row),
Index: uint32(rowIdx),
Shares: shareProofs,
Axis: rsmt2d.Row,
}
Expand All @@ -202,7 +203,7 @@ func TestIncorrectBadEncodingFraudProof(t *testing.T) {
}

func TestBEFP_ValidateOutOfOrderShares(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
t.Cleanup(cancel)

size := 4
Expand All @@ -220,17 +221,17 @@ func TestBEFP_ValidateOutOfOrderShares(t *testing.T) {
)
require.NoError(t, err, "failure to recompute the extended data square")

err = batchAddr.Commit()
require.NoError(t, err)

dah, err := da.NewDataAvailabilityHeader(eds)
require.NoError(t, err)

var errRsmt2d *rsmt2d.ErrByzantineData
err = eds.Repair(dah.RowRoots, dah.ColumnRoots)
require.ErrorAs(t, err, &errRsmt2d)

byzantine := NewErrByzantine(ctx, bServ, &dah, errRsmt2d)
err = batchAddr.Commit()
require.NoError(t, err)

byzantine := NewErrByzantine(ctx, bServ.Blockstore(), &dah, errRsmt2d)
var errByz *ErrByzantine
require.ErrorAs(t, byzantine, &errByz)

Expand Down
55 changes: 20 additions & 35 deletions share/eds/byzantine/byzantine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"

"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/blockstore"

"github.com/celestiaorg/celestia-app/pkg/da"
"github.com/celestiaorg/rsmt2d"
Expand All @@ -31,50 +31,35 @@ func (e *ErrByzantine) Error() string {
// If error happens during proof collection, it terminates the process with os.Exit(1).
func NewErrByzantine(
ctx context.Context,
bGetter blockservice.BlockGetter,
bStore blockstore.Blockstore,
dah *da.DataAvailabilityHeader,
errByz *rsmt2d.ErrByzantineData,
) error {
// changing the order to collect proofs against an orthogonal axis
roots := [][][]byte{
dah.ColumnRoots,
dah.RowRoots,
}[errByz.Axis]

sharesWithProof := make([]*ShareWithProof, len(errByz.Shares))

type result struct {
share *ShareWithProof
index int
}
resultCh := make(chan *result)
bGetter := ipld.NewBlockservice(bStore, nil)
var count int
for index, share := range errByz.Shares {
if share == nil {
if len(share) == 0 {
continue
}
swp, err := GetShareWithProof(ctx, bGetter, dah, share, errByz.Axis, int(errByz.Index), index)
if err != nil {
log.Warn("requesting proof failed",
"errByz", errByz,
"shareIndex", index,
"err", err)
continue
}

index := index
go func() {
share, err := getProofsAt(
ctx, bGetter,
ipld.MustCidFromNamespacedSha256(roots[index]),
int(errByz.Index), len(errByz.Shares),
)
if err != nil {
log.Warn("requesting proof failed", "root", roots[index], "err", err)
return
}
resultCh <- &result{share, index}
}()
sharesWithProof[index] = swp
// it is enough to collect half of the shares to construct the befp
if count++; count >= len(dah.RowRoots)/2 {
break
}
}

for i := 0; i < len(dah.RowRoots)/2; i++ {
select {
case t := <-resultCh:
sharesWithProof[t.index] = t.share
case <-ctx.Done():
return ipld.ErrNodeNotFound
}
if count < len(dah.RowRoots)/2 {
return fmt.Errorf("failed to collect proof")
}

return &ErrByzantine{
Expand Down
Loading

0 comments on commit 8953faf

Please sign in to comment.