Skip to content

Commit

Permalink
Add a simple mempool module prototype
Browse files Browse the repository at this point in the history
The prototype expects that new transactions will be delivered to
it by means of NewRequests events and stores them in an in-memory
map. It uses hashes for transaction and batch IDs.
To support crash-recovery, persistent storage should be added for
the transactions.
  • Loading branch information
xosmig committed Aug 2, 2022
1 parent 3ebade6 commit 88166b4
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 0 deletions.
22 changes: 22 additions & 0 deletions pkg/mempool/simplemempool/internal/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package common

import t "github.com/filecoin-project/mir/pkg/types"

// ModuleConfig sets the module ids. All replicas are expected to use identical module configurations.
type ModuleConfig struct {
Self t.ModuleID // id of this module
Hasher t.ModuleID
Crypto t.ModuleID
}

// ModuleParams sets the values for the parameters of an instance of the protocol.
// All replicas are expected to use identical module parameters.
type ModuleParams struct {
MaxBatchSizeInBytes int
MaxTransactionsInBatch int
}

// State represents the common state accessible to all parts of the module implementation.
type State struct {
TxByID map[t.TxID][]byte
}
64 changes: 64 additions & 0 deletions pkg/mempool/simplemempool/internal/parts/computeids/computeids.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package computeids

import (
"github.com/filecoin-project/mir/pkg/dsl"
mpdsl "github.com/filecoin-project/mir/pkg/mempool/dsl"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common"
mppb "github.com/filecoin-project/mir/pkg/pb/mempoolpb"
t "github.com/filecoin-project/mir/pkg/types"
)

//
func IncludeComputationOfTransactionAndBatchIDs(
m dsl.Module,
mc *common.ModuleConfig,
params *common.ModuleParams,
commonState *common.State,
) {
mpdsl.UponRequestTransactionIDs(m, func(txs [][]byte, origin *mppb.RequestTransactionIDsOrigin) error {
txMsgs := make([][][]byte, len(txs))
for i, tx := range txs {
txMsgs[i] = [][]byte{tx}
}

dsl.HashRequest(m, mc.Hasher, txMsgs, &computeHashForTransactionIDsContext{origin})
return nil
})

dsl.UponHashResult(m, func(hashes [][]byte, context *computeHashForTransactionIDsContext) error {
txIDs := make([]t.TxID, len(hashes))
for i, hash := range hashes {
txIDs[i] = t.TxID(hash)
}

mpdsl.TransactionIDsResponse(m, t.ModuleID(context.origin.Module), txIDs, context.origin)
return nil
})

mpdsl.UponRequestBatchID(m, func(txIDs []t.TxID, origin *mppb.RequestBatchIDOrigin) error {
data := make([][]byte, len(txIDs))
for i, txID := range txIDs {
data[i] = txID.Bytes()
}

dsl.HashOneMessage(m, mc.Hasher, data, &computeHashForBatchIDContext{origin})
return nil
})

dsl.UponOneHashResult(m, func(hash []byte, context *computeHashForBatchIDContext) error {
mpdsl.BatchIDResponse(m, t.ModuleID(context.origin.Module), t.BatchID(hash), context.origin)
return nil
})
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Context data structures //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

type computeHashForTransactionIDsContext struct {
origin *mppb.RequestTransactionIDsOrigin
}

type computeHashForBatchIDContext struct {
origin *mppb.RequestBatchIDOrigin
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package formbatches

import (
"fmt"

"google.golang.org/protobuf/proto"

"github.com/filecoin-project/mir/pkg/dsl"
mpdsl "github.com/filecoin-project/mir/pkg/mempool/dsl"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common"
mppb "github.com/filecoin-project/mir/pkg/pb/mempoolpb"
"github.com/filecoin-project/mir/pkg/pb/requestpb"
t "github.com/filecoin-project/mir/pkg/types"
)

type State struct {
*common.State
NewTxIDs []t.TxID
}

// IncludeBatchCreation registers event handlers for processing new transactions and forming batches.
func IncludeBatchCreation(
m dsl.Module,
mc *common.ModuleConfig,
params *common.ModuleParams,
commonState *common.State,
) {
state := &State{
State: commonState,
NewTxIDs: nil,
}

dsl.UponNewRequests(m, func(requests []*requestpb.Request) error {
txs := make([][]byte, len(requests))
for i, req := range requests {
var err error
txs[i], err = proto.Marshal(req)

if err != nil {
// The module that sent the NewRequests event is responsible to
// make sure that the requests are valid.
return err
}

if len(txs[i]) > params.MaxBatchSizeInBytes {
return fmt.Errorf("transaction is too large (%v bytes)", params.MaxBatchSizeInBytes)
}
}

mpdsl.RequestTransactionIDs(m, mc.Self, txs, &requestTxIDsContext{txs})
return nil
})

mpdsl.UponTransactionIDsResponse(m, func(txIDs []t.TxID, context *requestTxIDsContext) error {
for i := range txIDs {
state.TxByID[txIDs[i]] = context.txs[i]
}
state.NewTxIDs = append(state.NewTxIDs, txIDs...)
return nil
})

mpdsl.UponRequestBatch(m, func(origin *mppb.RequestBatchOrigin) error {
var txIDs []t.TxID
var txs [][]byte
batchSize := 0

var i int
var txID t.TxID

for i, txID = range state.NewTxIDs {
tx := state.TxByID[txID]
if i == params.MaxTransactionsInBatch || batchSize+len(tx) > params.MaxBatchSizeInBytes {
break
}

txIDs = append(txIDs, txID)
txs = append(txs, tx)
batchSize += len(tx)
}

state.NewTxIDs = state.NewTxIDs[i:]

// Note that a batch may be empty.
mpdsl.NewBatch(m, t.ModuleID(origin.Module), txIDs, txs, origin)
return nil
})
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Context data structures //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

type requestTxIDsContext struct {
txs [][]byte
}
29 changes: 29 additions & 0 deletions pkg/mempool/simplemempool/internal/parts/lookuptxs/lockuptxs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package lookuptxs

import (
"github.com/filecoin-project/mir/pkg/dsl"
mpdsl "github.com/filecoin-project/mir/pkg/mempool/dsl"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common"
mppb "github.com/filecoin-project/mir/pkg/pb/mempoolpb"
t "github.com/filecoin-project/mir/pkg/types"
)

// IncludeTransactionLookupByID registers event handlers for transaction looking up transactions in the mempool by
// their IDs.
func IncludeTransactionLookupByID(
m dsl.Module,
mc *common.ModuleConfig,
params *common.ModuleParams,
commonState *common.State,
) {
mpdsl.UponRequestTransactions(m, func(txIDs []t.TxID, origin *mppb.RequestTransactionsOrigin) error {
present := make([]bool, len(txIDs))
txs := make([][]byte, len(txIDs))
for i, txID := range txIDs {
txs[i], present[i] = commonState.TxByID[txID]
}

mpdsl.TransactionsResponse(m, t.ModuleID(origin.Module), present, txs, origin)
return nil
})
}
48 changes: 48 additions & 0 deletions pkg/mempool/simplemempool/simplemempool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package simplemempool

import (
"github.com/filecoin-project/mir/pkg/dsl"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/computeids"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/formbatches"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/lookuptxs"
"github.com/filecoin-project/mir/pkg/modules"
t "github.com/filecoin-project/mir/pkg/types"
)

// ModuleConfig sets the module ids. All replicas are expected to use identical module configurations.
type ModuleConfig = common.ModuleConfig

// ModuleParams sets the values for the parameters of an instance of the protocol.
// All replicas are expected to use identical module parameters.
type ModuleParams = common.ModuleParams

// DefaultModuleConfig returns a valid module config with default names for all modules.
func DefaultModuleConfig() *ModuleConfig {
return &ModuleConfig{
Self: "availability",
Hasher: "hasher",
Crypto: "crypto",
}
}

// NewModule creates a new instance of a simple mempool module implementation. It passively waits for
// eventpb.NewRequests events and stores them in a local map.
//
// On a batch request, this implementation creates a batch that consists of as many requests received since the
// previous batch request as possible with respect to params.MaxBatchSizeInBytes and params.MaxTransactionsInBatch.
//
// This implementation uses the hash function provided by the mc.Hasher module to compute transaction IDs and batch IDs.
func NewModule(mc *ModuleConfig, params *ModuleParams) modules.Module {
m := dsl.NewModule(mc.Self)

commonState := &common.State{
TxByID: make(map[t.TxID][]byte),
}

computeids.IncludeComputationOfTransactionAndBatchIDs(m, mc, params, commonState)
formbatches.IncludeBatchCreation(m, mc, params, commonState)
lookuptxs.IncludeTransactionLookupByID(m, mc, params, commonState)

return m
}

0 comments on commit 88166b4

Please sign in to comment.