Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benchmarking tools #177

Merged
merged 10 commits into from
Sep 19, 2022
40 changes: 40 additions & 0 deletions cmd/bench/cmd/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright Contributors to the Mir project
//
// SPDX-License-Identifier: Apache-2.0

package cmd

import (
"fmt"

"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/pb/commonpb"
"github.com/filecoin-project/mir/pkg/pb/requestpb"
t "github.com/filecoin-project/mir/pkg/types"
"github.com/filecoin-project/mir/pkg/util/maputil"
)

type App struct {
logging.Logger

Membership map[t.NodeID]t.NodeAddress
}

func (a *App) ApplyTXs(txs []*requestpb.Request) error {
for _, req := range txs {
a.Log(logging.LevelDebug, fmt.Sprintf("Delivered request %v from client %v", req.ReqNo, req.ClientId))
}
return nil
}

func (a *App) NewEpoch(_ t.EpochNr) (map[t.NodeID]t.NodeAddress, error) {
return maputil.Copy(a.Membership), nil
}

func (a *App) Snapshot() ([]byte, error) {
return nil, nil
}

func (a *App) RestoreState(appData []byte, epochConfig *commonpb.EpochConfig) error {
return nil
}
108 changes: 108 additions & 0 deletions cmd/bench/cmd/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright Contributors to the Mir project
//
// SPDX-License-Identifier: Apache-2.0

package cmd

import (
"context"
"crypto"
"errors"
"fmt"
"math/rand"
"net"
"strconv"
"time"

"github.com/spf13/cobra"
rateLimiter "golang.org/x/time/rate"

"github.com/filecoin-project/mir/pkg/dummyclient"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/membership"
t "github.com/filecoin-project/mir/pkg/types"
)

var (
reqSize int
rate float64
burst int
duration time.Duration

clientCmd = &cobra.Command{
Use: "client",
Short: "Generate and submit requests to a Mir cluster",
RunE: func(cmd *cobra.Command, args []string) error {
return runClient()
},
}
)

func init() {
rootCmd.AddCommand(clientCmd)
clientCmd.Flags().IntVarP(&reqSize, "reqSize", "s", 256, "size of each request in bytes")
clientCmd.Flags().Float64VarP(&rate, "rate", "r", 1000, "average number of requests per second")
clientCmd.Flags().IntVarP(&burst, "burst", "b", 1, "maximum number of requests in a burst")
clientCmd.Flags().DurationVarP(&duration, "duration", "T", 10*time.Second, "benchmarking duration")
}

func runClient() error {
var logger logging.Logger
if verbose {
logger = logging.ConsoleDebugLogger
} else {
logger = logging.ConsoleWarnLogger
}

initialMembership, err := membership.FromFileName(membershipFile)
if err != nil {
return fmt.Errorf("could not load membership: %w", err)
}
addresses, err := membership.GetIPs(initialMembership)
if err != nil {
return fmt.Errorf("could not load node IPs: %w", err)
}

// Generate addresses and ports for client request receivers.
// Each node uses different ports for receiving protocol messages and requests.
// These addresses will be used by the client code to know where to send its requests.
reqReceiverAddrs := make(map[t.NodeID]string)
for nodeID, nodeIP := range addresses {
numericID, err := strconv.Atoi(string(nodeID))
if err != nil {
return fmt.Errorf("node IDs must be numeric in the sample app: %w", err)
}
reqReceiverAddrs[nodeID] = net.JoinHostPort(nodeIP, fmt.Sprintf("%d", ReqReceiverBasePort+numericID))
}

ctx, stop := context.WithCancel(context.Background())

client := dummyclient.NewDummyClient(
t.ClientID(id),
crypto.SHA256,
logger,
)
client.Connect(ctx, reqReceiverAddrs)
defer client.Disconnect()

go func() {
time.Sleep(duration)
stop()
}()

limiter := rateLimiter.NewLimiter(rateLimiter.Limit(rate), 1)
reqBytes := make([]byte, reqSize)
for i := 0; ; i++ {
if err := limiter.Wait(ctx); err != nil {
if errors.Is(err, context.Canceled) {
err = nil
}
return err
}
rand.Read(reqBytes) //nolint:gosec
logger.Log(logging.LevelDebug, fmt.Sprintf("Submitting request #%d", i))
if err := client.SubmitRequest(reqBytes); err != nil {
return err
}
}
}
145 changes: 145 additions & 0 deletions cmd/bench/cmd/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright Contributors to the Mir project
//
// SPDX-License-Identifier: Apache-2.0

package cmd

import (
"context"
"encoding/csv"
"fmt"
"os"
"strconv"
"time"

"github.com/spf13/cobra"

"github.com/filecoin-project/mir"
"github.com/filecoin-project/mir/cmd/bench/stats"
"github.com/filecoin-project/mir/pkg/deploytest"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/membership"
"github.com/filecoin-project/mir/pkg/requestreceiver"
"github.com/filecoin-project/mir/pkg/systems/smr"
t "github.com/filecoin-project/mir/pkg/types"
"github.com/filecoin-project/mir/pkg/util/libp2p"
)

const (
ReqReceiverBasePort = 20000
)

var (
statFileName string
statPeriod time.Duration

nodeCmd = &cobra.Command{
Use: "node",
Short: "Start a Mir node",
RunE: func(cmd *cobra.Command, args []string) error {
return runNode()
},
}
)

func init() {
rootCmd.AddCommand(nodeCmd)
nodeCmd.Flags().StringVarP(&statFileName, "statFile", "o", "", "output file for statistics")
nodeCmd.Flags().DurationVar(&statPeriod, "statPeriod", time.Second, "statistic record period")
}

func runNode() error {
var logger logging.Logger
if verbose {
logger = logging.ConsoleDebugLogger
} else {
logger = logging.ConsoleWarnLogger
}

nodeAddrs, err := membership.FromFileName(membershipFile)
if err != nil {
return fmt.Errorf("could not load membership: %w", err)
}
initialMembership, err := membership.DummyMultiAddrs(nodeAddrs)
if err != nil {
return fmt.Errorf("could not create dummy multiaddrs: %w", err)
}

ownNumericID, err := strconv.Atoi(id)
if err != nil {
return fmt.Errorf("unable to convert node ID: %w", err)
} else if ownNumericID < 0 || ownNumericID >= len(initialMembership) {
return fmt.Errorf("ID must be in [0, %d]", len(initialMembership)-1)
}
ownID := t.NodeID(id)
localCrypto := deploytest.NewLocalCryptoSystem("pseudo", membership.GetIDs(initialMembership), logger)

benchApp, err := smr.New(
ownID,
libp2p.NewDummyHostKey(ownNumericID),
initialMembership,
localCrypto.Crypto(ownID),
&App{Logger: logger, Membership: nodeAddrs},
logger,
)
if err != nil {
return fmt.Errorf("could not create bench app: %w", err)
}

stat := stats.NewStats()
interceptor := stats.NewStatInterceptor(stat, "app")

nodeConfig := &mir.NodeConfig{Logger: logger}
node, err := mir.NewNode(t.NodeID(id), nodeConfig, benchApp.Modules(), nil, interceptor)
if err != nil {
return fmt.Errorf("could not create node: %w", err)
}

ctx := context.Background()

reqReceiver := requestreceiver.NewRequestReceiver(node, "mempool", logger)
if err := reqReceiver.Start(ReqReceiverBasePort + ownNumericID); err != nil {
return fmt.Errorf("could not start request receiver: %w", err)
}
defer reqReceiver.Stop()

if err := benchApp.Start(ctx); err != nil {
return fmt.Errorf("could not start bench app: %w", err)
}
defer benchApp.Stop()

var statFile *os.File
if statFileName != "" {
statFile, err = os.Create(statFileName)
if err != nil {
return fmt.Errorf("could not open output file for statistics: %w", err)
}
} else {
statFile = os.Stdout
}

statCSV := csv.NewWriter(statFile)
stat.WriteCSVHeader(statCSV)

go func() {
timestamp := time.Now()
for {
ticker := time.NewTicker(statPeriod)
defer ticker.Stop()

select {
case <-ctx.Done():
return
case ts := <-ticker.C:
d := ts.Sub(timestamp)
stat.WriteCSVRecord(statCSV, d)
statCSV.Flush()
timestamp = ts
stat.Reset()
}
}
}()

defer node.Stop()
return node.Run(ctx)
}
40 changes: 40 additions & 0 deletions cmd/bench/cmd/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright Contributors to the Mir project
//
// SPDX-License-Identifier: Apache-2.0

package cmd

import (
"os"

"github.com/spf13/cobra"
)

var (
id string
membershipFile string
verbose bool

rootCmd = &cobra.Command{
Use: "bench",
Short: "Mir benchmarking tool",
xosmig marked this conversation as resolved.
Show resolved Hide resolved
Long: "Mir benchmarking tool can run a Mir nodes, measuring latency and" +
"throughput. The tool can also generate and submit requests to the Mir" +
"cluster at a specified rate.",
}
)

func Execute() {
err := rootCmd.Execute()
if err != nil {
os.Exit(1)
}
}

func init() {
rootCmd.PersistentFlags().StringVarP(&id, "id", "i", "", "node/client ID")
_ = rootCmd.MarkPersistentFlagRequired("id")
rootCmd.PersistentFlags().StringVarP(&membershipFile, "membership", "m", "", "total number of nodes")
_ = rootCmd.MarkPersistentFlagRequired("membership")
rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "verbose mode")
}
11 changes: 11 additions & 0 deletions cmd/bench/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright Contributors to the Mir project
//
// SPDX-License-Identifier: Apache-2.0

package main

import "github.com/filecoin-project/mir/cmd/bench/cmd"

func main() {
cmd.Execute()
}
Loading