From a16c5e42d15ab4da597a5f89dadcb958b1440f20 Mon Sep 17 00:00:00 2001 From: Sergey Fedorov Date: Thu, 4 Aug 2022 18:01:50 +0200 Subject: [PATCH] cmd/bench: Output statistics from cluster nodes Nodes gather the following statistics as they run: - the number of delivered requests, - the average number of delivered requests per second, - the average latency to deliver a request. Nodes periodically output statistic records in CSV format to the specified file or to the standard output. --- cmd/bench/cmd/node.go | 44 +++++++++++++- cmd/bench/cmd/stat-interceptor.go | 35 +++++++++++ cmd/bench/cmd/stats.go | 99 +++++++++++++++++++++++++++++++ 3 files changed, 177 insertions(+), 1 deletion(-) create mode 100644 cmd/bench/cmd/stat-interceptor.go create mode 100644 cmd/bench/cmd/stats.go diff --git a/cmd/bench/cmd/node.go b/cmd/bench/cmd/node.go index 57edea30b..6364ac6bc 100644 --- a/cmd/bench/cmd/node.go +++ b/cmd/bench/cmd/node.go @@ -6,9 +6,12 @@ package cmd import ( "context" + "encoding/csv" "fmt" + "os" "strconv" "strings" + "time" "github.com/spf13/cobra" @@ -33,6 +36,8 @@ const ( var ( transportType string + statFileName string + statPeriod time.Duration nodeCmd = &cobra.Command{ Use: "node", @@ -46,6 +51,8 @@ var ( func init() { rootCmd.AddCommand(nodeCmd) nodeCmd.Flags().StringVarP(&transportType, "net", "t", "libp2p", "network transport (libp2p|grpc)") + nodeCmd.Flags().StringVarP(&statFileName, "statFile", "o", "", "output file for statistics") + nodeCmd.Flags().DurationVar(&statPeriod, "statPeriod", time.Second, "statistic record period") } func runNode() error { @@ -105,6 +112,9 @@ func runNode() error { // TODO: Adjust once a default crypto implementation is provided by Mir. crypto := mirCrypto.New(&mirCrypto.DummyCrypto{DummySig: []byte{0}}) + stats := NewStats() + interceptor := NewStatInterceptor(stats) + nodeModules, err := iss.DefaultModules(modules.Modules{ "net": transport, "crypto": crypto, @@ -116,7 +126,7 @@ func runNode() error { } nodeConfig := &mir.NodeConfig{Logger: logger} - node, err := mir.NewNode(t.NodeID(id), nodeConfig, nodeModules, nil, nil) + node, err := mir.NewNode(t.NodeID(id), nodeConfig, nodeModules, nil, interceptor) if err != nil { return fmt.Errorf("could not create node: %w", err) } @@ -135,6 +145,38 @@ func runNode() error { transport.Connect(ctx, nodeAddrs) defer transport.Stop() + var statFile *os.File + if statFileName != "" { + statFile, err = os.Create(statFileName) + if err != nil { + return fmt.Errorf("could not open output file for statistics: %w", err) + } + } else { + statFile = os.Stdout + } + + statsCSV := csv.NewWriter(statFile) + stats.WriteCSVHeader(statsCSV) + + go func() { + timestamp := time.Now() + for { + ticker := time.NewTicker(statPeriod) + defer ticker.Stop() + + select { + case <-ctx.Done(): + return + case t := <-ticker.C: + d := t.Sub(timestamp) + stats.WriteCSVRecord(statsCSV, d) + statsCSV.Flush() + timestamp = t + stats.Reset() + } + } + }() + defer node.Stop() return node.Run(ctx) } diff --git a/cmd/bench/cmd/stat-interceptor.go b/cmd/bench/cmd/stat-interceptor.go new file mode 100644 index 000000000..87bc09b81 --- /dev/null +++ b/cmd/bench/cmd/stat-interceptor.go @@ -0,0 +1,35 @@ +// Copyright Contributors to the Mir project +// +// SPDX-License-Identifier: Apache-2.0 + +package cmd + +import ( + "github.com/filecoin-project/mir/pkg/events" + "github.com/filecoin-project/mir/pkg/pb/eventpb" +) + +type StatInterceptor struct { + *Stats +} + +func NewStatInterceptor(s *Stats) *StatInterceptor { + return &StatInterceptor{s} +} + +func (i *StatInterceptor) Intercept(events *events.EventList) error { + it := events.Iterator() + for e := it.Next(); e != nil; e = it.Next() { + switch e := e.Type.(type) { + case *eventpb.Event_NewRequests: + for _, req := range e.NewRequests.Requests { + i.Stats.NewRequest(req) + } + case *eventpb.Event_Deliver: + for _, req := range e.Deliver.Batch.Requests { + i.Stats.Delivered(req.Req) + } + } + } + return nil +} diff --git a/cmd/bench/cmd/stats.go b/cmd/bench/cmd/stats.go new file mode 100644 index 000000000..c5313e555 --- /dev/null +++ b/cmd/bench/cmd/stats.go @@ -0,0 +1,99 @@ +// Copyright Contributors to the Mir project +// +// SPDX-License-Identifier: Apache-2.0 + +package cmd + +import ( + "encoding/csv" + "strconv" + "sync" + "time" + + "github.com/filecoin-project/mir/pkg/pb/requestpb" +) + +type Stats struct { + lock sync.RWMutex + reqTimestamps map[reqKey]time.Time + avgLatency float64 + timestampedRequests int + deliveredRequests int +} + +type reqKey struct { + ClientID string + ReqNo uint64 +} + +func NewStats() *Stats { + return &Stats{ + reqTimestamps: make(map[reqKey]time.Time), + } +} + +func (s *Stats) NewRequest(req *requestpb.Request) { + s.lock.Lock() + k := reqKey{req.ClientId, req.ReqNo} + s.reqTimestamps[k] = time.Now() + s.lock.Unlock() +} + +func (s *Stats) Delivered(req *requestpb.Request) { + s.lock.Lock() + s.deliveredRequests++ + k := reqKey{req.ClientId, req.ReqNo} + if t, ok := s.reqTimestamps[k]; ok { + delete(s.reqTimestamps, k) + s.timestampedRequests++ + d := time.Since(t) + + // $CA_{n+1} = CA_n + {x_{n+1} - CA_n \over n + 1}$ + s.avgLatency += (float64(d) - s.avgLatency) / float64(s.timestampedRequests) + } + s.lock.Unlock() +} + +func (s *Stats) AvgLatency() time.Duration { + s.lock.RLock() + defer s.lock.RUnlock() + + return time.Duration(s.avgLatency) +} + +func (s *Stats) DeliveredRequests() int { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.deliveredRequests +} + +func (s *Stats) Reset() { + s.lock.Lock() + s.avgLatency = 0 + s.timestampedRequests = 0 + s.deliveredRequests = 0 + s.lock.Unlock() +} + +func (s *Stats) WriteCSVHeader(w *csv.Writer) { + record := []string{ + "nrDelivered", + "tps", + "avgLatency", + } + _ = w.Write(record) +} + +func (s *Stats) WriteCSVRecord(w *csv.Writer, d time.Duration) { + s.lock.RLock() + defer s.lock.RUnlock() + + tps := float64(s.deliveredRequests) / (float64(d) / float64(time.Second)) + record := []string{ + strconv.Itoa(s.deliveredRequests), + strconv.Itoa(int(tps)), + strconv.FormatFloat(s.avgLatency, 'f', 0, 64), + } + _ = w.Write(record) +}