Skip to content

Commit

Permalink
cmd/bench: Implement sub-command to run Mir node
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Fedorov committed Aug 8, 2022
1 parent ad2d608 commit ac82f29
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 1 deletion.
46 changes: 46 additions & 0 deletions cmd/bench/cmd/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright Contributors to the Mir project
//
// SPDX-License-Identifier: Apache-2.0

package cmd

import (
"fmt"

"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
t "github.com/filecoin-project/mir/pkg/types"
)

type App struct {
logging.Logger
}

func (App) ImplementsModule() {}

func (a *App) ApplyEvents(eventsIn *events.EventList) (*events.EventList, error) {
return modules.ApplyEventsSequentially(eventsIn, a.ApplyEvent)
}

func (a *App) ApplyEvent(event *eventpb.Event) (*events.EventList, error) {
switch e := event.Type.(type) {
case *eventpb.Event_Init:
case *eventpb.Event_Deliver:
for _, req := range e.Deliver.Batch.Requests {
a.Log(logging.LevelDebug, fmt.Sprintf("Delivered request %v from client %v", req.Req.ReqNo, req.Req.ClientId))
}
case *eventpb.Event_AppSnapshotRequest:
return events.ListOf(events.AppSnapshot(
t.ModuleID(e.AppSnapshotRequest.Module),
t.EpochNr(e.AppSnapshotRequest.Epoch),
nil,
)), nil
case *eventpb.Event_AppRestoreState:
default:
return nil, fmt.Errorf("unexpected type of App event: %T", event.Type)
}

return events.EmptyList(), nil
}
140 changes: 140 additions & 0 deletions cmd/bench/cmd/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright Contributors to the Mir project
//
// SPDX-License-Identifier: Apache-2.0

package cmd

import (
"context"
"fmt"
"strconv"
"strings"

"github.com/spf13/cobra"

"github.com/filecoin-project/mir"
mirCrypto "github.com/filecoin-project/mir/pkg/crypto"
"github.com/filecoin-project/mir/pkg/iss"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/net"
"github.com/filecoin-project/mir/pkg/net/grpc"
"github.com/filecoin-project/mir/pkg/net/libp2p"
"github.com/filecoin-project/mir/pkg/requestreceiver"
t "github.com/filecoin-project/mir/pkg/types"
grpctools "github.com/filecoin-project/mir/pkg/util/grpc"
libp2ptools "github.com/filecoin-project/mir/pkg/util/libp2p"
)

const (
NodeBasePort = 10000
ReqReceiverBasePort = 20000
)

var (
transportType string

nodeCmd = &cobra.Command{
Use: "node",
Short: "Start a Mir node",
RunE: func(cmd *cobra.Command, args []string) error {
return runNode()
},
}
)

func init() {
rootCmd.AddCommand(nodeCmd)
nodeCmd.Flags().StringVarP(&transportType, "net", "t", "libp2p", "network transport (libp2p|grpc)")
}

func runNode() error {
var logger logging.Logger
if verbose {
logger = logging.ConsoleDebugLogger
} else {
logger = logging.ConsoleWarnLogger
}

ownID, err := strconv.Atoi(id)
if err != nil {
return fmt.Errorf("unable to convert node ID: %w", err)
} else if ownID < 0 || ownID >= nrNodes {
return fmt.Errorf("ID must be in [0, %d]", nrNodes-1)
}

nodeIDs := make([]t.NodeID, nrNodes)
for i := range nodeIDs {
nodeIDs[i] = t.NewNodeIDFromInt(i)
}

reqReceiverAddrs := make(map[t.NodeID]string)
for i := range nodeIDs {
reqReceiverAddrs[t.NewNodeIDFromInt(i)] = fmt.Sprintf("127.0.0.1:%d", ReqReceiverBasePort+i)
}

var transport net.Transport
nodeAddrs := make(map[t.NodeID]t.NodeAddress)
switch strings.ToLower(transportType) {
case "grpc":
for i := range nodeIDs {
nodeAddrs[t.NewNodeIDFromInt(i)] = t.NodeAddress(grpctools.NewDummyMultiaddr(i + NodeBasePort))
}
transport, err = grpc.NewTransport(t.NodeID(id), nodeAddrs[t.NodeID(id)], logger)
case "libp2p":
h := libp2ptools.NewDummyHost(ownID, NodeBasePort)
for i := range nodeIDs {
nodeAddrs[t.NewNodeIDFromInt(i)] = t.NodeAddress(libp2ptools.NewDummyMultiaddr(i, NodeBasePort))
}
transport, err = libp2p.NewTransport(h, t.NodeID(id), logger)
default:
return fmt.Errorf("unknown network transport %s", strings.ToLower(transportType))
}
if err != nil {
return fmt.Errorf("failed to get network transport %w", err)
}

issConfig := iss.DefaultConfig(nodeIDs)
issProtocol, err := iss.New(t.NodeID(id), issConfig, logger)
if err != nil {
return fmt.Errorf("could not instantiate ISS protocol module: %w", err)
}

// Use dummy crypto module that only produces signatures
// consisting of a single zero byte and treats those signatures as valid.
// TODO: Adjust once a default crypto implementation is provided by Mir.
crypto := mirCrypto.New(&mirCrypto.DummyCrypto{DummySig: []byte{0}})

nodeModules, err := iss.DefaultModules(modules.Modules{
"net": transport,
"crypto": crypto,
"iss": issProtocol,
"app": &App{Logger: logger},
})
if err != nil {
return fmt.Errorf("failed to initialize Mir modules: %w", err)
}

nodeConfig := &mir.NodeConfig{Logger: logger}
node, err := mir.NewNode(t.NodeID(id), nodeConfig, nodeModules, nil, nil)
if err != nil {
return fmt.Errorf("could not create node: %w", err)
}

ctx := context.Background()

reqReceiver := requestreceiver.NewRequestReceiver(node, "iss", logger)
if err := reqReceiver.Start(ReqReceiverBasePort + ownID); err != nil {
return fmt.Errorf("could not start request receiver: %w", err)
}
defer reqReceiver.Stop()

if err := transport.Start(); err != nil {
return fmt.Errorf("could not start network transport: %w", err)
}
transport.Connect(ctx, nodeAddrs)
defer transport.Stop()

defer node.Stop()
return node.Run(ctx)
}
2 changes: 1 addition & 1 deletion pkg/deploytest/fakeapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (fa *FakeApp) ImplementsModule() {}
func (fa *FakeApp) ApplyBatch(batch *requestpb.Batch) error {
for _, req := range batch.Requests {
fa.RequestsProcessed++
fmt.Printf("Received request: \"%s\". Processed requests: %d\n", string(req.Req.Data), fa.RequestsProcessed)
fmt.Printf("Received request: %q. Processed requests: %d\n", string(req.Req.Data), fa.RequestsProcessed)
}
return nil
}
Expand Down

0 comments on commit ac82f29

Please sign in to comment.