Skip to content

Commit

Permalink
cmd/bench: Output statistics from cluster nodes
Browse files Browse the repository at this point in the history
Nodes gather the following statistics as they run:
 - the number of delivered requests,
 - the average number of delivered requests per second,
 - the average latency to deliver a request.

Nodes periodically output statistic records in CSV format to the
specified file or to the standard output.
  • Loading branch information
Sergey Fedorov committed Aug 8, 2022
1 parent f876eb6 commit a16c5e4
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 1 deletion.
44 changes: 43 additions & 1 deletion cmd/bench/cmd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ package cmd

import (
"context"
"encoding/csv"
"fmt"
"os"
"strconv"
"strings"
"time"

"github.com/spf13/cobra"

Expand All @@ -33,6 +36,8 @@ const (

var (
transportType string
statFileName string
statPeriod time.Duration

nodeCmd = &cobra.Command{
Use: "node",
Expand All @@ -46,6 +51,8 @@ var (
func init() {
rootCmd.AddCommand(nodeCmd)
nodeCmd.Flags().StringVarP(&transportType, "net", "t", "libp2p", "network transport (libp2p|grpc)")
nodeCmd.Flags().StringVarP(&statFileName, "statFile", "o", "", "output file for statistics")
nodeCmd.Flags().DurationVar(&statPeriod, "statPeriod", time.Second, "statistic record period")
}

func runNode() error {
Expand Down Expand Up @@ -105,6 +112,9 @@ func runNode() error {
// TODO: Adjust once a default crypto implementation is provided by Mir.
crypto := mirCrypto.New(&mirCrypto.DummyCrypto{DummySig: []byte{0}})

stats := NewStats()
interceptor := NewStatInterceptor(stats)

nodeModules, err := iss.DefaultModules(modules.Modules{
"net": transport,
"crypto": crypto,
Expand All @@ -116,7 +126,7 @@ func runNode() error {
}

nodeConfig := &mir.NodeConfig{Logger: logger}
node, err := mir.NewNode(t.NodeID(id), nodeConfig, nodeModules, nil, nil)
node, err := mir.NewNode(t.NodeID(id), nodeConfig, nodeModules, nil, interceptor)
if err != nil {
return fmt.Errorf("could not create node: %w", err)
}
Expand All @@ -135,6 +145,38 @@ func runNode() error {
transport.Connect(ctx, nodeAddrs)
defer transport.Stop()

var statFile *os.File
if statFileName != "" {
statFile, err = os.Create(statFileName)
if err != nil {
return fmt.Errorf("could not open output file for statistics: %w", err)
}
} else {
statFile = os.Stdout
}

statsCSV := csv.NewWriter(statFile)
stats.WriteCSVHeader(statsCSV)

go func() {
timestamp := time.Now()
for {
ticker := time.NewTicker(statPeriod)
defer ticker.Stop()

select {
case <-ctx.Done():
return
case t := <-ticker.C:
d := t.Sub(timestamp)
stats.WriteCSVRecord(statsCSV, d)
statsCSV.Flush()
timestamp = t
stats.Reset()
}
}
}()

defer node.Stop()
return node.Run(ctx)
}
35 changes: 35 additions & 0 deletions cmd/bench/cmd/stat-interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright Contributors to the Mir project
//
// SPDX-License-Identifier: Apache-2.0

package cmd

import (
"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
)

type StatInterceptor struct {
*Stats
}

func NewStatInterceptor(s *Stats) *StatInterceptor {
return &StatInterceptor{s}
}

func (i *StatInterceptor) Intercept(events *events.EventList) error {
it := events.Iterator()
for e := it.Next(); e != nil; e = it.Next() {
switch e := e.Type.(type) {
case *eventpb.Event_NewRequests:
for _, req := range e.NewRequests.Requests {
i.Stats.NewRequest(req)
}
case *eventpb.Event_Deliver:
for _, req := range e.Deliver.Batch.Requests {
i.Stats.Delivered(req.Req)
}
}
}
return nil
}
99 changes: 99 additions & 0 deletions cmd/bench/cmd/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright Contributors to the Mir project
//
// SPDX-License-Identifier: Apache-2.0

package cmd

import (
"encoding/csv"
"strconv"
"sync"
"time"

"github.com/filecoin-project/mir/pkg/pb/requestpb"
)

type Stats struct {
lock sync.RWMutex
reqTimestamps map[reqKey]time.Time
avgLatency float64
timestampedRequests int
deliveredRequests int
}

type reqKey struct {
ClientID string
ReqNo uint64
}

func NewStats() *Stats {
return &Stats{
reqTimestamps: make(map[reqKey]time.Time),
}
}

func (s *Stats) NewRequest(req *requestpb.Request) {
s.lock.Lock()
k := reqKey{req.ClientId, req.ReqNo}
s.reqTimestamps[k] = time.Now()
s.lock.Unlock()
}

func (s *Stats) Delivered(req *requestpb.Request) {
s.lock.Lock()
s.deliveredRequests++
k := reqKey{req.ClientId, req.ReqNo}
if t, ok := s.reqTimestamps[k]; ok {
delete(s.reqTimestamps, k)
s.timestampedRequests++
d := time.Since(t)

// $CA_{n+1} = CA_n + {x_{n+1} - CA_n \over n + 1}$
s.avgLatency += (float64(d) - s.avgLatency) / float64(s.timestampedRequests)
}
s.lock.Unlock()
}

func (s *Stats) AvgLatency() time.Duration {
s.lock.RLock()
defer s.lock.RUnlock()

return time.Duration(s.avgLatency)
}

func (s *Stats) DeliveredRequests() int {
s.lock.RLock()
defer s.lock.RUnlock()

return s.deliveredRequests
}

func (s *Stats) Reset() {
s.lock.Lock()
s.avgLatency = 0
s.timestampedRequests = 0
s.deliveredRequests = 0
s.lock.Unlock()
}

func (s *Stats) WriteCSVHeader(w *csv.Writer) {
record := []string{
"nrDelivered",
"tps",
"avgLatency",
}
_ = w.Write(record)
}

func (s *Stats) WriteCSVRecord(w *csv.Writer, d time.Duration) {
s.lock.RLock()
defer s.lock.RUnlock()

tps := float64(s.deliveredRequests) / (float64(d) / float64(time.Second))
record := []string{
strconv.Itoa(s.deliveredRequests),
strconv.Itoa(int(tps)),
strconv.FormatFloat(s.avgLatency, 'f', 0, 64),
}
_ = w.Write(record)
}

0 comments on commit a16c5e4

Please sign in to comment.