Skip to content

Commit

Permalink
bitswap/network: add new metrics to compare bsnet and httpnet
Browse files Browse the repository at this point in the history
  • Loading branch information
hsanjuan committed Feb 4, 2025
1 parent 3bc8ebd commit aa1f711
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 21 deletions.
14 changes: 9 additions & 5 deletions bitswap/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,26 +392,30 @@ func BlockPresenceSize(c cid.Cid) int {
}

// FromNet generates a new BitswapMessage from incoming data on an io.Reader.
func FromNet(r io.Reader) (BitSwapMessage, error) {
func FromNet(r io.Reader) (BitSwapMessage, int, error) {
reader := msgio.NewVarintReaderSize(r, network.MessageSizeMax)
return FromMsgReader(reader)
}

// FromPBReader generates a new Bitswap message from a gogo-protobuf reader
func FromMsgReader(r msgio.Reader) (BitSwapMessage, error) {
func FromMsgReader(r msgio.Reader) (BitSwapMessage, int, error) {
msg, err := r.ReadMsg()
if err != nil {
return nil, err
return nil, 0, err
}

var pb pb.Message
err = pb.Unmarshal(msg)
r.ReleaseMsg(msg)
if err != nil {
return nil, err
return nil, 0, err
}

return newMessageFromProto(pb)
m, err := newMessageFromProto(pb)
if err != nil {
return nil, 0, err
}
return m, len(msg), nil
}

func (m *impl) ToProtoV0() *pb.Message {
Expand Down
4 changes: 2 additions & 2 deletions bitswap/message/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
t.Fatal(err)
}

copied, err := FromNet(buf)
copied, _, err := FromNet(buf)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestToAndFromNetMessage(t *testing.T) {
t.Fatal(err)
}

m2, err := FromNet(buf)
m2, _, err := FromNet(buf)
if err != nil {
t.Fatal(err)
}
Expand Down
17 changes: 16 additions & 1 deletion bitswap/network/bsnet/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func NewFromIpfsHost(host host.Host, opts ...NetOpt) iface.BitSwapNetwork {
protocolBitswap: s.ProtocolPrefix + ProtocolBitswap,

supportedProtocols: s.SupportedProtocols,

metrics: newMetrics(),
}

return &bitswapNetwork
Expand Down Expand Up @@ -80,6 +82,8 @@ type impl struct {

// inbound messages from the network are forwarded to the receiver
receivers []iface.Receiver

metrics *metrics
}

// interfaceWrapper is concrete type that wraps an interface. Necessary because
Expand Down Expand Up @@ -166,6 +170,15 @@ func (s *streamMessageSender) SupportsHave() bool {

// Send a message to the peer, attempting multiple times
func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error {
if n := len(msg.Wantlist()); n > 0 {
s.bsnet.metrics.WantlistsTotal.Inc()
s.bsnet.metrics.WantlistsItemsTotal.Add(float64(n))
now := time.Now()
defer func() {
s.bsnet.metrics.WantlistsSeconds.Add(float64(time.Since(now)) / float64(time.Second))
}()
}

return s.multiAttempt(ctx, func() error {
return s.send(ctx, msg)
})
Expand Down Expand Up @@ -417,7 +430,7 @@ func (bsnet *impl) handleNewStream(s network.Stream) {

reader := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
for {
received, err := bsmsg.FromMsgReader(reader)
received, size, err := bsmsg.FromMsgReader(reader)
if err != nil {
if err != io.EOF {
_ = s.Reset()
Expand All @@ -429,6 +442,8 @@ func (bsnet *impl) handleNewStream(s network.Stream) {
return
}

bsnet.metrics.ResponseSizes.Observe(float64(size))
bsnet.metrics.ResponseTotalBytes.Add(float64(size))
p := s.Conn().RemotePeer()
ctx := context.Background()
log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer())
Expand Down
51 changes: 51 additions & 0 deletions bitswap/network/bsnet/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package bsnet

import (
"context"

imetrics "github.com/ipfs/go-metrics-interface"
)

//var durationHistogramBuckets = []float64{0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 30, 60, 120, 240, 480, 960, 1920}

var blockSizesHistogramBuckets = []float64{1, 128 << 10, 256 << 10, 512 << 10, 1024 << 10, 2048 << 10, 4092 << 10}

func responseSizes(ctx context.Context) imetrics.Histogram {
return imetrics.NewCtx(ctx, "response_bytes", "Histogram of http response sizes").Histogram(blockSizesHistogramBuckets)
}

func responseTotalBytes(ctx context.Context) imetrics.Counter {
return imetrics.NewCtx(ctx, "response_total_bytes", "Accumulated response bytes").Counter()
}

func wantlistsTotal(ctx context.Context) imetrics.Counter {
return imetrics.NewCtx(ctx, "wantlists_total", "Total number of wantlists sent").Counter()
}

func wantlistsItemsTotal(ctx context.Context) imetrics.Counter {
return imetrics.NewCtx(ctx, "wantlists_items_total", "Total number of elements in sent wantlists").Counter()
}

func wantlistsSeconds(ctx context.Context) imetrics.Counter {
return imetrics.NewCtx(ctx, "wantlists_seconds", "Number of seconds spent sending wantlists").Counter()
}

type metrics struct {
WantlistsTotal imetrics.Counter
WantlistsItemsTotal imetrics.Counter
WantlistsSeconds imetrics.Counter
ResponseSizes imetrics.Histogram
ResponseTotalBytes imetrics.Counter
}

func newMetrics() *metrics {
ctx := imetrics.CtxScope(context.Background(), "exchange_bitswap")

return &metrics{
WantlistsTotal: wantlistsTotal(ctx),
WantlistsItemsTotal: wantlistsItemsTotal(ctx),
WantlistsSeconds: wantlistsSeconds(ctx),
ResponseSizes: responseSizes(ctx),
ResponseTotalBytes: responseTotalBytes(ctx),
}
}
50 changes: 40 additions & 10 deletions bitswap/network/httpnet/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,38 @@ func requestsFailure(ctx context.Context) imetrics.Counter {
return imetrics.NewCtx(ctx, "requests_failure", "Failed (no response, dial error etc) requests count").Counter()
}

func requestSentBytes(ctx context.Context) imetrics.Counter {
return imetrics.NewCtx(ctx, "request_sent_bytes", "Total bytes sent on requests").Counter()
}

func requestTime(ctx context.Context) imetrics.Histogram {
return imetrics.NewCtx(ctx, "request_duration_seconds", "Histogram of request durations").Histogram(durationHistogramBuckets)
}

func requestsBodyFailure(ctx context.Context) imetrics.Counter {
return imetrics.NewCtx(ctx, "requests_body_failure", "Failure count when reading response body").Counter()
}

func responseSizes(ctx context.Context) imetrics.Histogram {
return imetrics.NewCtx(ctx, "response_bytes", "Histogram of http response sizes").Histogram(blockSizesHistogramBuckets)
}

func responseTotalBytes(ctx context.Context) imetrics.Counter {
return imetrics.NewCtx(ctx, "response_total_bytes", "Accumulated response bytes").Counter()
}

func wantlistsTotal(ctx context.Context) imetrics.Counter {
return imetrics.NewCtx(ctx, "wantlists_total", "Total number of wantlists sent").Counter()
}

func wantlistsItemsTotal(ctx context.Context) imetrics.Counter {
return imetrics.NewCtx(ctx, "wantlists_items_total", "Total number of elements in sent wantlists").Counter()
}

func wantlistsSeconds(ctx context.Context) imetrics.Counter {
return imetrics.NewCtx(ctx, "wantlists_seconds", "Number of seconds spent sending wantlists").Counter()
}

func statusNotFound(ctx context.Context) imetrics.Counter {
return imetrics.NewCtx(ctx, "status_404", "Request count with NotFound status").Counter()
}
Expand Down Expand Up @@ -62,18 +90,16 @@ func statusOthers(ctx context.Context) imetrics.Counter {
return imetrics.NewCtx(ctx, "status_others", "Request count with other status codes").Counter()
}

func requestTime(ctx context.Context) imetrics.Histogram {
return imetrics.NewCtx(ctx, "request_duration_seconds", "Histogram of request durations").Histogram(durationHistogramBuckets)
}

func responseSize(ctx context.Context) imetrics.Histogram {
return imetrics.NewCtx(ctx, "response_bytes", "Histogram of http response sizes").Histogram(blockSizesHistogramBuckets)
}

type metrics struct {
RequestsInFlight imetrics.Gauge
RequestsTotal imetrics.Counter
RequestsFailure imetrics.Counter
RequestsSentBytes imetrics.Counter
WantlistsTotal imetrics.Counter
WantlistsItemsTotal imetrics.Counter
WantlistsSeconds imetrics.Counter
ResponseSizes imetrics.Histogram
ResponseTotalBytes imetrics.Counter
RequestsBodyFailure imetrics.Counter
StatusNotFound imetrics.Counter
StatusGone imetrics.Counter
Expand All @@ -85,7 +111,6 @@ type metrics struct {
StatusInternalServerError imetrics.Counter
StatusOthers imetrics.Counter
RequestTime imetrics.Histogram
ResponseSize imetrics.Histogram
}

func newMetrics() *metrics {
Expand All @@ -94,8 +119,14 @@ func newMetrics() *metrics {
return &metrics{
RequestsInFlight: requestsInFlight(ctx),
RequestsTotal: requestsTotal(ctx),
RequestsSentBytes: requestSentBytes(ctx),
RequestsFailure: requestsFailure(ctx),
RequestsBodyFailure: requestsBodyFailure(ctx),
WantlistsTotal: wantlistsTotal(ctx),
WantlistsItemsTotal: wantlistsItemsTotal(ctx),
WantlistsSeconds: wantlistsSeconds(ctx),
ResponseSizes: responseSizes(ctx),
ResponseTotalBytes: responseTotalBytes(ctx),
StatusNotFound: statusNotFound(ctx),
StatusGone: statusGone(ctx),
StatusForbidden: statusForbidden(ctx),
Expand All @@ -106,7 +137,6 @@ func newMetrics() *metrics {
StatusInternalServerError: statusInternalServerError(ctx),
StatusOthers: statusOthers(ctx),
RequestTime: requestTime(ctx),
ResponseSize: responseSize(ctx),
}
}

Expand Down
29 changes: 26 additions & 3 deletions bitswap/network/httpnet/msg_sender.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package httpnet

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -231,6 +232,12 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm
return serr
}

// Record request size
var buf bytes.Buffer
req.Write(&buf)
sender.ht.metrics.RequestsSentBytes.Add(float64((&buf).Len()))

// Handle responses
limReader := &io.LimitedReader{
R: resp.Body,
N: sender.ht.maxBlockSize,
Expand All @@ -248,10 +255,18 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm
Err: err,
}
}

Check warning on line 257 in bitswap/network/httpnet/msg_sender.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/httpnet/msg_sender.go#L248-L257

Added lines #L248 - L257 were not covered by tests
reqDuration := time.Since(reqStart)
sender.ht.metrics.ResponseSize.Observe(float64(len(body)))

// Calculate full response size with headers and everything.
// So this is comparable to bitswap message response sizes.
resp.Body = nil
var respBuf bytes.Buffer
resp.Write(&respBuf)
respLen := (&respBuf).Len() + len(body)

sender.ht.metrics.ResponseSizes.Observe(float64(respLen))
sender.ht.metrics.ResponseTotalBytes.Add(float64(respLen))
sender.ht.metrics.RequestsInFlight.Dec()
sender.ht.metrics.RequestTime.Observe(float64(reqDuration) / float64(time.Second))
sender.ht.metrics.RequestTime.Observe(float64(time.Since(reqStart)) / float64(time.Second))
sender.ht.metrics.updateStatusCounter(resp.StatusCode)

sender.ht.connEvtMgr.OnMessage(sender.peer)
Expand Down Expand Up @@ -353,6 +368,14 @@ func (sender *httpMsgSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessa
return nil
}

Check warning on line 369 in bitswap/network/httpnet/msg_sender.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/httpnet/msg_sender.go#L368-L369

Added lines #L368 - L369 were not covered by tests

// Keep metrics of wantlists sent and how long it took
sender.ht.metrics.WantlistsTotal.Inc()
sender.ht.metrics.WantlistsItemsTotal.Add(float64(len(wantlist)))
now := time.Now()
defer func() {
sender.ht.metrics.WantlistsSeconds.Add(float64(time.Since(now)) / float64(time.Second))
}()

go func() {
select {
case <-sender.closing:
Expand Down

0 comments on commit aa1f711

Please sign in to comment.