Skip to content

Commit

Permalink
feat(shwap/bitswap): add blockstore metrics (#3862)
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss authored Feb 25, 2025
1 parent 68a5e66 commit d60dda8
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 9 deletions.
1 change: 1 addition & 0 deletions nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti
fx.Invoke(fraud.WithMetrics[*header.ExtendedHeader]),
fx.Invoke(node.WithMetrics),
fx.Invoke(share.WithDiscoveryMetrics),
fx.Invoke(share.WithBlockStoreMetrics),
)

samplingMetrics := fx.Options(
Expand Down
12 changes: 7 additions & 5 deletions nodebuilder/share/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,23 @@ func dataExchange(tp node.Type, params bitSwapParams) exchange.SessionExchange {
}
}

func blockstoreFromDatastore(ds datastore.Batching) (blockstore.Blockstore, error) {
return blockstore.NewBlockstore(ds), nil
func blockstoreFromDatastore(ds datastore.Batching) (*bitswap.BlockstoreWithMetrics, error) {
bs := blockstore.NewBlockstore(ds)
return bitswap.NewBlockstoreWithMetrics(bs)
}

func blockstoreFromEDSStore(store *store.Store, blockStoreCacheSize int) (blockstore.Blockstore, error) {
func blockstoreFromEDSStore(store *store.Store, blockStoreCacheSize int) (*bitswap.BlockstoreWithMetrics, error) {
if blockStoreCacheSize == 0 {
// no cache, return plain blockstore
return &bitswap.Blockstore{Getter: store}, nil
bs := &bitswap.Blockstore{Getter: store}
return bitswap.NewBlockstoreWithMetrics(bs)
}
withCache, err := store.WithCache("blockstore", blockStoreCacheSize)
if err != nil {
return nil, fmt.Errorf("create cached store for blockstore:%w", err)
}
bs := &bitswap.Blockstore{Getter: withCache}
return bs, nil
return bitswap.NewBlockstoreWithMetrics(bs)
}

type bitSwapParams struct {
Expand Down
21 changes: 17 additions & 4 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/availability/light"
"github.com/celestiaorg/celestia-node/share/shwap"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexeds"
Expand Down Expand Up @@ -69,14 +70,26 @@ func bitswapComponents(tp node.Type, cfg *Config) fx.Option {
case node.Light:
return fx.Options(
opts,
fx.Provide(blockstoreFromDatastore),
fx.Provide(
fx.Annotate(
blockstoreFromDatastore,
fx.As(fx.Self()),
fx.As(new(blockstore.Blockstore)),
),
),
)
case node.Full, node.Bridge:
return fx.Options(
opts,
fx.Provide(func(store *store.Store) (blockstore.Blockstore, error) {
return blockstoreFromEDSStore(store, int(cfg.BlockStoreCacheSize))
}),
fx.Provide(
fx.Annotate(
func(store *store.Store) (*bitswap.BlockstoreWithMetrics, error) {
return blockstoreFromEDSStore(store, int(cfg.BlockStoreCacheSize))
},
fx.As(fx.Self()),
fx.As(new(blockstore.Blockstore)),
),
),
)
default:
panic("invalid node type")
Expand Down
5 changes: 5 additions & 0 deletions nodebuilder/share/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package share
import (
"errors"

"github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
Expand Down Expand Up @@ -56,3 +57,7 @@ func WithShrexGetterMetrics(sg *shrex_getter.Getter) error {
func WithStoreMetrics(s *store.Store) error {
return s.WithMetrics()
}

func WithBlockStoreMetrics(bs *bitswap.BlockstoreWithMetrics) error {
return bs.WithMetrics()
}
208 changes: 208 additions & 0 deletions share/shwap/p2p/bitswap/blockstore_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package bitswap

import (
"context"
"errors"
"fmt"

bstore "github.com/ipfs/boxo/blockstore"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
notFoundKey = "not_found"
failedKey = "failed"
)

var (
meter = otel.Meter("bitswap_blockstore")
_ bstore.Blockstore = (*BlockstoreWithMetrics)(nil)
)

// BlockstoreWithMetrics is a blockstore that collects metrics on blockstore operations.
type BlockstoreWithMetrics struct {
bs bstore.Blockstore
metrics *metrics
}

type metrics struct {
delete metric.Int64Counter
has metric.Int64Counter
get metric.Int64Counter
getSize metric.Int64Counter
put metric.Int64Counter
putMany metric.Int64Counter
allKeysChan metric.Int64Counter
}

// NewBlockstoreWithMetrics creates a new BlockstoreWithMetrics.
func NewBlockstoreWithMetrics(bs bstore.Blockstore) (*BlockstoreWithMetrics, error) {
return &BlockstoreWithMetrics{
bs: bs,
}, nil
}

// WithMetrics enables metrics collection on the blockstore.
func (w *BlockstoreWithMetrics) WithMetrics() error {
del, err := meter.Int64Counter(
"blockstore_delete_block",
metric.WithDescription("Blockstore delete block operation"),
)
if err != nil {
return fmt.Errorf("failed to create blockstore delete block counter: %w", err)
}

has, err := meter.Int64Counter(
"blockstore_has",
metric.WithDescription("Blockstore has operation"),
)
if err != nil {
return fmt.Errorf("failed to create blockstore has counter: %w", err)
}

get, err := meter.Int64Counter(
"blockstore_get",
metric.WithDescription("Blockstore get operation"),
)
if err != nil {
return fmt.Errorf("failed to create blockstore get counter: %w", err)
}

getSize, err := meter.Int64Counter(
"blockstore_get_size",
metric.WithDescription("Blockstore get size operation"),
)
if err != nil {
return fmt.Errorf("failed to create blockstore get size counter: %w", err)
}

put, err := meter.Int64Counter(
"blockstore_put",
metric.WithDescription("Blockstore put operation"),
)
if err != nil {
return fmt.Errorf("failed to create blockstore put counter: %w", err)
}

putMany, err := meter.Int64Counter(
"blockstore_put_many",
metric.WithDescription("Blockstore put many operation"),
)
if err != nil {
return fmt.Errorf("failed to create blockstore put many counter: %w", err)
}

allKeysChan, err := meter.Int64Counter(
"blockstore_all_keys_chan",
metric.WithDescription("Blockstore all keys chan operation"),
)
if err != nil {
return fmt.Errorf("failed to create blockstore all keys chan counter: %w", err)
}

w.metrics = &metrics{
delete: del,
has: has,
get: get,
getSize: getSize,
put: put,
putMany: putMany,
allKeysChan: allKeysChan,
}
return nil
}

func (w *BlockstoreWithMetrics) DeleteBlock(ctx context.Context, cid cid.Cid) error {
if w.metrics == nil {
return w.bs.DeleteBlock(ctx, cid)
}
err := w.bs.DeleteBlock(ctx, cid)
w.metrics.delete.Add(ctx, 1, metric.WithAttributes(
attribute.Bool(failedKey, err != nil),
))
return err
}

func (w *BlockstoreWithMetrics) Has(ctx context.Context, cid cid.Cid) (bool, error) {
if w.metrics == nil {
return w.bs.Has(ctx, cid)
}
has, err := w.bs.Has(ctx, cid)
notFound := errors.Is(err, ipld.ErrNotFound{})
failed := err != nil && !notFound
w.metrics.has.Add(ctx, 1, metric.WithAttributes(
attribute.Bool(failedKey, failed),
attribute.Bool(notFoundKey, notFound),
))
return has, err
}

func (w *BlockstoreWithMetrics) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
if w.metrics == nil {
return w.bs.Get(ctx, cid)
}
get, err := w.bs.Get(ctx, cid)
notFound := errors.Is(err, ipld.ErrNotFound{})
failed := err != nil && !notFound
w.metrics.get.Add(ctx, 1, metric.WithAttributes(
attribute.Bool(failedKey, failed),
attribute.Bool(notFoundKey, notFound),
))
return get, err
}

func (w *BlockstoreWithMetrics) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
if w.metrics == nil {
return w.bs.GetSize(ctx, cid)
}
size, err := w.bs.GetSize(ctx, cid)
notFound := errors.Is(err, ipld.ErrNotFound{})
failed := err != nil && !notFound
w.metrics.getSize.Add(ctx, 1, metric.WithAttributes(
attribute.Bool(failedKey, failed),
attribute.Bool(notFoundKey, notFound),
))
return size, err
}

func (w *BlockstoreWithMetrics) Put(ctx context.Context, block blocks.Block) error {
if w.metrics == nil {
return w.bs.Put(ctx, block)
}
err := w.bs.Put(ctx, block)
w.metrics.put.Add(ctx, 1, metric.WithAttributes(
attribute.Bool(failedKey, err != nil),
))
return err
}

func (w *BlockstoreWithMetrics) PutMany(ctx context.Context, blocks []blocks.Block) error {
if w.metrics == nil {
return w.bs.PutMany(ctx, blocks)
}
err := w.bs.PutMany(ctx, blocks)
w.metrics.putMany.Add(ctx, 1, metric.WithAttributes(
attribute.Bool(failedKey, err != nil),
))
return err
}

func (w *BlockstoreWithMetrics) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if w.metrics == nil {
return w.bs.AllKeysChan(ctx)
}
ch, err := w.bs.AllKeysChan(ctx)
w.metrics.allKeysChan.Add(ctx, 1, metric.WithAttributes(
attribute.Bool(failedKey, err != nil),
))
return ch, err
}

func (w *BlockstoreWithMetrics) HashOnRead(enabled bool) {
w.bs.HashOnRead(enabled)
}

0 comments on commit d60dda8

Please sign in to comment.