Skip to content

Commit

Permalink
Merge pull request #7542 from filecoin-project/feat/gs-metrics
Browse files Browse the repository at this point in the history
Collect and expose graphsync metrics
  • Loading branch information
magik6k authored Oct 20, 2021
2 parents 8182ffc + b7fe165 commit 97e4921
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 4 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ require (
github.com/ipfs/go-ds-measure v0.1.0
github.com/ipfs/go-ds-pebble v0.0.2-0.20200921225637-ce220f8ac459
github.com/ipfs/go-fs-lock v0.0.6
github.com/ipfs/go-graphsync v0.10.1
github.com/ipfs/go-graphsync v0.10.4
github.com/ipfs/go-ipfs-blockstore v1.0.4
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -743,8 +743,8 @@ github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CE
github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0=
github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY=
github.com/ipfs/go-graphsync v0.10.0/go.mod h1:cKIshzTaa5rCZjryH5xmSKZVGX9uk1wvwGvz2WEha5Y=
github.com/ipfs/go-graphsync v0.10.1 h1:m6nNwiRFE2FVBTCxHWVTRApjH0snIjFy7fkDbOlMa/I=
github.com/ipfs/go-graphsync v0.10.1/go.mod h1:cKIshzTaa5rCZjryH5xmSKZVGX9uk1wvwGvz2WEha5Y=
github.com/ipfs/go-graphsync v0.10.4 h1:1WZhyOPxgxLvHTIC2GoLltaBrjZ+JuXC2oKAEiX8f3Y=
github.com/ipfs/go-graphsync v0.10.4/go.mod h1:oei4tnWAKnZ6LPnapZGPYVVbyiKV1UP3f8BeLU7Z4JQ=
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
Expand Down Expand Up @@ -842,8 +842,9 @@ github.com/ipfs/go-path v0.0.7/go.mod h1:6KTKmeRnBXgqrTvzFrPV3CamxcgvXX/4z79tfAd
github.com/ipfs/go-peertaskqueue v0.0.4/go.mod h1:03H8fhyeMfKNFWqzYEVyMbcPUeYrqP1MX6Kd+aN+rMQ=
github.com/ipfs/go-peertaskqueue v0.1.0/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U=
github.com/ipfs/go-peertaskqueue v0.1.1/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U=
github.com/ipfs/go-peertaskqueue v0.2.0 h1:2cSr7exUGKYyDeUyQ7P/nHPs9P7Ht/B+ROrpN1EJOjc=
github.com/ipfs/go-peertaskqueue v0.2.0/go.mod h1:5/eNrBEbtSKWCG+kQK8K8fGNixoYUnr+P7jivavs9lY=
github.com/ipfs/go-peertaskqueue v0.6.0 h1:BT1/PuNViVomiz1PnnP5+WmKsTNHrxIDvkZrkj4JhOg=
github.com/ipfs/go-peertaskqueue v0.6.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
github.com/ipfs/go-todocounter v0.0.1/go.mod h1:l5aErvQc8qKE2r7NDMjmq5UNAvuZy0rC8BHOplkWvZ4=
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k=
github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw=
Expand Down
79 changes: 79 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ var (
PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless)
APIRequestDuration = stats.Float64("api/request_duration_ms", "Duration of API requests", stats.UnitMilliseconds)

// graphsync

GraphsyncReceivingPeersCount = stats.Int64("graphsync/receiving_peers", "number of peers we are receiving graphsync data from", stats.UnitDimensionless)
GraphsyncReceivingActiveCount = stats.Int64("graphsync/receiving_active", "number of active receiving graphsync transfers", stats.UnitDimensionless)
GraphsyncReceivingCountCount = stats.Int64("graphsync/receiving_pending", "number of pending receiving graphsync transfers", stats.UnitDimensionless)
GraphsyncReceivingTotalMemoryAllocated = stats.Int64("graphsync/receiving_total_allocated", "amount of block memory allocated for receiving graphsync data", stats.UnitBytes)
GraphsyncReceivingTotalPendingAllocations = stats.Int64("graphsync/receiving_pending_allocations", "amount of block memory on hold being received pending allocation", stats.UnitBytes)
GraphsyncReceivingPeersPending = stats.Int64("graphsync/receiving_peers_pending", "number of peers we can't receive more data from cause of pending allocations", stats.UnitDimensionless)

GraphsyncSendingPeersCount = stats.Int64("graphsync/sending_peers", "number of peers we are sending graphsync data to", stats.UnitDimensionless)
GraphsyncSendingActiveCount = stats.Int64("graphsync/sending_active", "number of active sending graphsync transfers", stats.UnitDimensionless)
GraphsyncSendingCountCount = stats.Int64("graphsync/sending_pending", "number of pending sending graphsync transfers", stats.UnitDimensionless)
GraphsyncSendingTotalMemoryAllocated = stats.Int64("graphsync/sending_total_allocated", "amount of block memory allocated for sending graphsync data", stats.UnitBytes)
GraphsyncSendingTotalPendingAllocations = stats.Int64("graphsync/sending_pending_allocations", "amount of block memory on hold from sending pending allocation", stats.UnitBytes)
GraphsyncSendingPeersPending = stats.Int64("graphsync/sending_peers_pending", "number of peers we can't send more data to cause of pending allocations", stats.UnitDimensionless)

// chain
ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless)
ChainNodeHeightExpected = stats.Int64("chain/node_height_expected", "Expected Height of the node", stats.UnitDimensionless)
Expand Down Expand Up @@ -380,6 +396,56 @@ var (
Measure: SplitstoreCompactionDead,
Aggregation: view.Sum(),
}

// graphsync
GraphsyncReceivingPeersCountView = &view.View{
Measure: GraphsyncReceivingPeersCount,
Aggregation: view.LastValue(),
}
GraphsyncReceivingActiveCountView = &view.View{
Measure: GraphsyncReceivingActiveCount,
Aggregation: view.LastValue(),
}
GraphsyncReceivingCountCountView = &view.View{
Measure: GraphsyncReceivingCountCount,
Aggregation: view.LastValue(),
}
GraphsyncReceivingTotalMemoryAllocatedView = &view.View{
Measure: GraphsyncReceivingTotalMemoryAllocated,
Aggregation: view.LastValue(),
}
GraphsyncReceivingTotalPendingAllocationsView = &view.View{
Measure: GraphsyncReceivingTotalPendingAllocations,
Aggregation: view.LastValue(),
}
GraphsyncReceivingPeersPendingView = &view.View{
Measure: GraphsyncReceivingPeersPending,
Aggregation: view.LastValue(),
}
GraphsyncSendingPeersCountView = &view.View{
Measure: GraphsyncSendingPeersCount,
Aggregation: view.LastValue(),
}
GraphsyncSendingActiveCountView = &view.View{
Measure: GraphsyncSendingActiveCount,
Aggregation: view.LastValue(),
}
GraphsyncSendingCountCountView = &view.View{
Measure: GraphsyncSendingCountCount,
Aggregation: view.LastValue(),
}
GraphsyncSendingTotalMemoryAllocatedView = &view.View{
Measure: GraphsyncSendingTotalMemoryAllocated,
Aggregation: view.LastValue(),
}
GraphsyncSendingTotalPendingAllocationsView = &view.View{
Measure: GraphsyncSendingTotalPendingAllocations,
Aggregation: view.LastValue(),
}
GraphsyncSendingPeersPendingView = &view.View{
Measure: GraphsyncSendingPeersPending,
Aggregation: view.LastValue(),
}
)

// DefaultViews is an array of OpenCensus views for metric gathering purposes
Expand All @@ -388,6 +454,19 @@ var DefaultViews = func() []*view.View {
InfoView,
PeerCountView,
APIRequestDurationView,

GraphsyncReceivingPeersCountView,
GraphsyncReceivingActiveCountView,
GraphsyncReceivingCountCountView,
GraphsyncReceivingTotalMemoryAllocatedView,
GraphsyncReceivingTotalPendingAllocationsView,
GraphsyncReceivingPeersPendingView,
GraphsyncSendingPeersCountView,
GraphsyncSendingActiveCountView,
GraphsyncSendingCountCountView,
GraphsyncSendingTotalMemoryAllocatedView,
GraphsyncSendingTotalPendingAllocationsView,
GraphsyncSendingPeersPendingView,
}
views = append(views, blockstore.DefaultViews...)
views = append(views, rpcmetrics.DefaultViews...)
Expand Down
47 changes: 47 additions & 0 deletions node/modules/graphsync.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package modules

import (
"context"
"time"

"github.com/ipfs/go-graphsync"
graphsyncimpl "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"go.opencensus.io/stats"
"go.uber.org/fx"

"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
Expand Down Expand Up @@ -49,6 +54,48 @@ func Graphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval
hookActions.UsePersistenceOption("chainstore")
}
})

graphsyncStats(mctx, lc, gs)

return gs, nil
}
}

func graphsyncStats(mctx helpers.MetricsCtx, lc fx.Lifecycle, gs dtypes.Graphsync) {
stopStats := make(chan struct{})
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
go func() {
t := time.NewTicker(10 * time.Second)
for {
select {
case <-t.C:

st := gs.Stats()
stats.Record(mctx, metrics.GraphsyncReceivingPeersCount.M(int64(st.OutgoingRequests.TotalPeers)))
stats.Record(mctx, metrics.GraphsyncReceivingActiveCount.M(int64(st.OutgoingRequests.Active)))
stats.Record(mctx, metrics.GraphsyncReceivingCountCount.M(int64(st.OutgoingRequests.Pending)))
stats.Record(mctx, metrics.GraphsyncReceivingTotalMemoryAllocated.M(int64(st.IncomingResponses.TotalAllocatedAllPeers)))
stats.Record(mctx, metrics.GraphsyncReceivingTotalPendingAllocations.M(int64(st.IncomingResponses.TotalPendingAllocations)))
stats.Record(mctx, metrics.GraphsyncReceivingPeersPending.M(int64(st.IncomingResponses.NumPeersWithPendingAllocations)))
stats.Record(mctx, metrics.GraphsyncSendingPeersCount.M(int64(st.IncomingRequests.TotalPeers)))
stats.Record(mctx, metrics.GraphsyncSendingActiveCount.M(int64(st.IncomingRequests.Active)))
stats.Record(mctx, metrics.GraphsyncSendingCountCount.M(int64(st.IncomingRequests.Pending)))
stats.Record(mctx, metrics.GraphsyncSendingTotalMemoryAllocated.M(int64(st.OutgoingResponses.TotalAllocatedAllPeers)))
stats.Record(mctx, metrics.GraphsyncSendingTotalPendingAllocations.M(int64(st.OutgoingResponses.TotalPendingAllocations)))
stats.Record(mctx, metrics.GraphsyncSendingPeersPending.M(int64(st.OutgoingResponses.NumPeersWithPendingAllocations)))

case <-stopStats:
return
}
}
}()

return nil
},
OnStop: func(ctx context.Context) error {
close(stopStats)
return nil
},
})
}
2 changes: 2 additions & 0 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,8 @@ func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForRe
graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks),
graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks))

graphsyncStats(mctx, lc, gs)

return gs
}
}
Expand Down

0 comments on commit 97e4921

Please sign in to comment.