Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SMR parameters #274

Merged
merged 5 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions cmd/bench/cmd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,23 @@ func runNode() error {
ownID := t.NodeID(id)
localCrypto := deploytest.NewLocalCryptoSystem("pseudo", membership.GetIDs(initialMembership), logger)
h, err := libp2p.NewDummyHostWithPrivKey(
ownID,
initialMembership[ownID],
libp2p.NewDummyHostKey(ownNumericID),
initialMembership,
)
if err != nil {
return fmt.Errorf("failed to create libp2p host: %w", err)
}

smrParams := smr.DefaultParams(initialMembership)
smrParams.Mempool.MaxTransactionsInBatch = 1024

benchApp, err := smr.New(
ownID,
h,
initialMembership,
localCrypto.Crypto(ownID),
&App{Logger: logger, Membership: nodeAddrs},
&App{Logger: logger, Membership: initialMembership},
smrParams,
logger,
)
if err != nil {
Expand Down
15 changes: 8 additions & 7 deletions cmd/bench/stats/stat-interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,20 @@ func NewStatInterceptor(s *Stats, txConsumer t.ModuleID) *StatInterceptor {

func (i *StatInterceptor) Intercept(events *events.EventList) error {
it := events.Iterator()
for e := it.Next(); e != nil; e = it.Next() {
for evt := it.Next(); evt != nil; evt = it.Next() {

// Skip events destined to other modules than the one consuming the transactions.
if t.ModuleID(e.DestModule) != i.txConsumerModule {
continue
}

switch e := e.Type.(type) {
switch e := evt.Type.(type) {
case *eventpb.Event_NewRequests:
for _, req := range e.NewRequests.Requests {
i.Stats.NewRequest(req)
}
case *eventpb.Event_BatchFetcher:

// Skip events destined to other modules than the one consuming the transactions.
if t.ModuleID(evt.DestModule) != i.txConsumerModule {
continue
}

switch e := e.BatchFetcher.Type.(type) {
case *bfpb.Event_NewOrderedBatch:
for _, req := range e.NewOrderedBatch.Txs {
Expand Down
10 changes: 9 additions & 1 deletion pkg/checkpoint/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,17 @@ func (p *Protocol) applyEvent(event *eventpb.Event) (*events.EventList, error) {

func (p *Protocol) applyAppSnapshot(appSnapshot *eventpb.AppSnapshot) (*events.EventList, error) {

// Treat nil data as an empty byte slice.
var appData []byte
if appSnapshot.AppData != nil {
appData = appSnapshot.AppData
} else {
appData = []byte{}
}

// Save the received app snapshot if there is none yet.
if p.stateSnapshot.AppData == nil {
p.stateSnapshot.AppData = appSnapshot.AppData
p.stateSnapshot.AppData = appData
if p.snapshotReady() {
return p.processStateSnapshot()
}
Expand Down
51 changes: 30 additions & 21 deletions pkg/iss/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,26 +171,35 @@ func DefaultParams(initialMembership map[t.NodeID]t.NodeAddress) *ModuleParams {

// Define auxiliary variables for segment length and maximal propose delay.
// PBFT view change timeouts can then be computed relative to those.

return (&ModuleParams{
InitialMembership: initialMembership,
ConfigOffset: 2,
SegmentLength: 4,
NumBuckets: len(initialMembership),
LeaderPolicy: &SimpleLeaderPolicy{Membership: maputil.GetSortedKeys(initialMembership)},
RequestNAckTimeout: 16,
MsgBufCapacity: 32 * 1024 * 1024, // 32 MiB
RetainedEpochs: 1,
}).AdjustSpeed(time.Second)
}

// AdjustSpeed sets multiple ISS parameters (e.g. view change timeouts)
// to their default values relative to maxProposeDelay.
// It can be useful to make the whole protocol run faster or slower.
// For example, for a large maxProposeDelay, the view change timeouts must be increased correspondingly,
// otherwise the view change can kick in before a node makes a proposal.
// AdjustSpeed makes these adjustments automatically.
func (mp *ModuleParams) AdjustSpeed(maxProposeDelay time.Duration) *ModuleParams {
mp.MaxProposeDelay = maxProposeDelay
mp.CatchUpTimerPeriod = maxProposeDelay
mp.PBFTDoneResendPeriod = maxProposeDelay
mp.PBFTCatchUpDelay = maxProposeDelay
mp.CheckpointResendPeriod = maxProposeDelay
mp.PBFTViewChangeSNTimeout = 4 * maxProposeDelay
mp.PBFTViewChangeSegmentTimeout = 2 * time.Duration(mp.SegmentLength) * maxProposeDelay
mp.PBFTViewChangeResendPeriod = maxProposeDelay
// TODO: Adapt this if needed when PBFT configuration is specified separately.
maxProposeDelay := time.Second
segmentLength := 4

return &ModuleParams{
InitialMembership: initialMembership,
ConfigOffset: 2,
SegmentLength: segmentLength,
MaxProposeDelay: maxProposeDelay,
NumBuckets: len(initialMembership),
LeaderPolicy: &SimpleLeaderPolicy{Membership: maputil.GetSortedKeys(initialMembership)},
RequestNAckTimeout: 16,
MsgBufCapacity: 32 * 1024 * 1024, // 32 MiB
RetainedEpochs: 1,
CatchUpTimerPeriod: maxProposeDelay, // maxProposeDelay is picked quite arbitrarily, could be anything
PBFTDoneResendPeriod: maxProposeDelay,
PBFTCatchUpDelay: maxProposeDelay, // maxProposeDelay is picked quite arbitrarily, could be anything
CheckpointResendPeriod: maxProposeDelay, // maxProposeDelay is picked quite arbitrarily, could be anything
PBFTViewChangeSNTimeout: 4 * maxProposeDelay,
PBFTViewChangeSegmentTimeout: 2 * time.Duration(segmentLength) * maxProposeDelay,
PBFTViewChangeResendPeriod: maxProposeDelay, // maxProposeDelay is picked quite arbitrarily, could be anything
}

return mp
}
6 changes: 6 additions & 0 deletions pkg/mempool/simplemempool/simplemempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ func DefaultModuleConfig() *ModuleConfig {
}
}

func DefaultModuleParams() *ModuleParams {
return &ModuleParams{
MaxTransactionsInBatch: 10,
}
}

// NewModule creates a new instance of a simple mempool module implementation. It passively waits for
// eventpb.NewRequests events and stores them in a local map.
//
Expand Down
26 changes: 26 additions & 0 deletions pkg/systems/smr/params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package smr

import (
"time"

"github.com/filecoin-project/mir/pkg/iss"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool"
t "github.com/filecoin-project/mir/pkg/types"
)

type Params struct {
Mempool *simplemempool.ModuleParams
Iss *iss.ModuleParams
}

func DefaultParams(initialMembership map[t.NodeID]t.NodeAddress) Params {
return Params{
Mempool: simplemempool.DefaultModuleParams(),
Iss: iss.DefaultParams(initialMembership),
}
}

func (p *Params) AdjustSpeed(maxProposeDelay time.Duration) *Params {
p.Iss.AdjustSpeed(maxProposeDelay)
return p
}
10 changes: 5 additions & 5 deletions pkg/systems/smr/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ func New(
// using AppLogicFromStatic.
app AppLogic,

// Parameters of the SMR system, like batch size or batch timeout.
params Params,

// The logger to which the system will pass all its log messages.
logger logging.Logger,
) (*System, error) {
Expand Down Expand Up @@ -126,10 +129,9 @@ func New(
// We use the ISS' default module configuration (the expected IDs of modules it interacts with)
// also to configure other modules of the system.
issModuleConfig := iss.DefaultModuleConfig()
issParams := iss.DefaultParams(initialMembership)
issProtocol, err := iss.New(
ownID,
issModuleConfig, issParams, iss.InitialStateSnapshot(initialSnapshot, issParams),
issModuleConfig, params.Iss, iss.InitialStateSnapshot(initialSnapshot, params.Iss),
logging.Decorate(logger, "ISS: "),
)
if err != nil {
Expand All @@ -145,9 +147,7 @@ func New(
Self: "mempool",
Hasher: issModuleConfig.Hasher,
},
&simplemempool.ModuleParams{
MaxTransactionsInBatch: 10,
},
params.Mempool,
)

// Use fake batch database that only stores batches in memory and does not persist them to disk.
Expand Down
5 changes: 2 additions & 3 deletions pkg/util/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ import (

// NewDummyHostWithPrivKey creates new dummy libp2p host with an identity
// determined by the private key given as an input.
func NewDummyHostWithPrivKey(ownID t.NodeID, privKey libp2pcrypto.PrivKey,
initialMembership map[t.NodeID]t.NodeAddress) (host.Host, error) {
func NewDummyHostWithPrivKey(listenAddr t.NodeAddress, privKey libp2pcrypto.PrivKey) (host.Host, error) {

libp2pPeerID, err := peer.AddrInfoFromP2pAddr(initialMembership[ownID])
libp2pPeerID, err := peer.AddrInfoFromP2pAddr(listenAddr)
if err != nil {
return nil, fmt.Errorf("failed to get own libp2p addr info: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions samples/chat-demo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,8 @@ func run() error {

// Create a dummy libp2p host for network communication (this is why we need a numeric ID)
h, err := libp2p.NewDummyHostWithPrivKey(
args.OwnID,
initialMembership[args.OwnID],
libp2p.NewDummyHostKey(ownNumericID),
initialMembership,
)
if err != nil {
return errors.Wrap(err, "failed to create libp2p host")
Expand All @@ -132,6 +131,7 @@ func run() error {
initialMembership,
&mirCrypto.DummyCrypto{DummySig: []byte{0}},
NewChatApp(initialMembership),
smr.DefaultParams(initialMembership),
logger,
)
if err != nil {
Expand Down