Skip to content

Commit

Permalink
Add a simple mempool module prototype
Browse files Browse the repository at this point in the history
The prototype expects that new transactions will be delivered to
it by means of NewRequests events and stores them in an in-memory
map. It uses hashes for transaction and batch IDs.
To support crash-recovery, persistent storage should be added for
the transactions.
  • Loading branch information
xosmig committed Aug 5, 2022
1 parent 4575222 commit e0af2d5
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 0 deletions.
24 changes: 24 additions & 0 deletions pkg/mempool/simplemempool/internal/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package common

import (
"github.com/filecoin-project/mir/pkg/pb/requestpb"
t "github.com/filecoin-project/mir/pkg/types"
)

// ModuleConfig sets the module ids. All replicas are expected to use identical module configurations.
type ModuleConfig struct {
Self t.ModuleID // id of this module
Hasher t.ModuleID
Crypto t.ModuleID
}

// ModuleParams sets the values for the parameters of an instance of the protocol.
// All replicas are expected to use identical module parameters.
type ModuleParams struct {
MaxTransactionsInBatch int
}

// State represents the common state accessible to all parts of the module implementation.
type State struct {
TxByID map[t.TxID]*requestpb.Request
}
66 changes: 66 additions & 0 deletions pkg/mempool/simplemempool/internal/parts/computeids/computeids.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package computeids

import (
"github.com/filecoin-project/mir/pkg/dsl"
mpdsl "github.com/filecoin-project/mir/pkg/mempool/dsl"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common"
mppb "github.com/filecoin-project/mir/pkg/pb/mempoolpb"
"github.com/filecoin-project/mir/pkg/pb/requestpb"
"github.com/filecoin-project/mir/pkg/serializing"
t "github.com/filecoin-project/mir/pkg/types"
)

//
func IncludeComputationOfTransactionAndBatchIDs(
m dsl.Module,
mc *common.ModuleConfig,
params *common.ModuleParams,
commonState *common.State,
) {
mpdsl.UponRequestTransactionIDs(m, func(txs []*requestpb.Request, origin *mppb.RequestTransactionIDsOrigin) error {
txMsgs := make([][][]byte, len(txs))
for i, tx := range txs {
txMsgs[i] = serializing.RequestForHash(tx)
}

dsl.HashRequest(m, mc.Hasher, txMsgs, &computeHashForTransactionIDsContext{origin})
return nil
})

dsl.UponHashResult(m, func(hashes [][]byte, context *computeHashForTransactionIDsContext) error {
txIDs := make([]t.TxID, len(hashes))
for i, hash := range hashes {
txIDs[i] = t.TxID(hash)
}

mpdsl.TransactionIDsResponse(m, t.ModuleID(context.origin.Module), txIDs, context.origin)
return nil
})

mpdsl.UponRequestBatchID(m, func(txIDs []t.TxID, origin *mppb.RequestBatchIDOrigin) error {
data := make([][]byte, len(txIDs))
for i, txID := range txIDs {
data[i] = txID.Bytes()
}

dsl.HashOneMessage(m, mc.Hasher, data, &computeHashForBatchIDContext{origin})
return nil
})

dsl.UponOneHashResult(m, func(hash []byte, context *computeHashForBatchIDContext) error {
mpdsl.BatchIDResponse(m, t.ModuleID(context.origin.Module), t.BatchID(hash), context.origin)
return nil
})
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Context data structures //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

type computeHashForTransactionIDsContext struct {
origin *mppb.RequestTransactionIDsOrigin
}

type computeHashForBatchIDContext struct {
origin *mppb.RequestBatchIDOrigin
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package formbatches

import (
"github.com/filecoin-project/mir/pkg/dsl"
mpdsl "github.com/filecoin-project/mir/pkg/mempool/dsl"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common"
mppb "github.com/filecoin-project/mir/pkg/pb/mempoolpb"
"github.com/filecoin-project/mir/pkg/pb/requestpb"
t "github.com/filecoin-project/mir/pkg/types"
)

type State struct {
*common.State
NewTxIDs []t.TxID
}

// IncludeBatchCreation registers event handlers for processing new transactions and forming batches.
func IncludeBatchCreation(
m dsl.Module,
mc *common.ModuleConfig,
params *common.ModuleParams,
commonState *common.State,
) {
state := &State{
State: commonState,
NewTxIDs: nil,
}

dsl.UponNewRequests(m, func(txs []*requestpb.Request) error {
mpdsl.RequestTransactionIDs(m, mc.Self, txs, &requestTxIDsContext{txs})
return nil
})

mpdsl.UponTransactionIDsResponse(m, func(txIDs []t.TxID, context *requestTxIDsContext) error {
for i := range txIDs {
state.TxByID[txIDs[i]] = context.txs[i]
}
state.NewTxIDs = append(state.NewTxIDs, txIDs...)
return nil
})

mpdsl.UponRequestBatch(m, func(origin *mppb.RequestBatchOrigin) error {
var txIDs []t.TxID
var txs []*requestpb.Request
batchSize := 0

var i int
var txID t.TxID

for i, txID = range state.NewTxIDs {
tx := state.TxByID[txID]

// TODO: add other limitations (if any) here.
if i == params.MaxTransactionsInBatch {
break
}

txIDs = append(txIDs, txID)
txs = append(txs, tx)
batchSize += len(tx.Data)
}

state.NewTxIDs = state.NewTxIDs[i:]

// Note that a batch may be empty.
mpdsl.NewBatch(m, t.ModuleID(origin.Module), txIDs, txs, origin)
return nil
})
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Context data structures //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

type requestTxIDsContext struct {
txs []*requestpb.Request
}
30 changes: 30 additions & 0 deletions pkg/mempool/simplemempool/internal/parts/lookuptxs/lockuptxs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package lookuptxs

import (
"github.com/filecoin-project/mir/pkg/dsl"
mpdsl "github.com/filecoin-project/mir/pkg/mempool/dsl"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common"
mppb "github.com/filecoin-project/mir/pkg/pb/mempoolpb"
"github.com/filecoin-project/mir/pkg/pb/requestpb"
t "github.com/filecoin-project/mir/pkg/types"
)

// IncludeTransactionLookupByID registers event handlers for transaction looking up transactions in the mempool by
// their IDs.
func IncludeTransactionLookupByID(
m dsl.Module,
mc *common.ModuleConfig,
params *common.ModuleParams,
commonState *common.State,
) {
mpdsl.UponRequestTransactions(m, func(txIDs []t.TxID, origin *mppb.RequestTransactionsOrigin) error {
present := make([]bool, len(txIDs))
txs := make([]*requestpb.Request, len(txIDs))
for i, txID := range txIDs {
txs[i], present[i] = commonState.TxByID[txID]
}

mpdsl.TransactionsResponse(m, t.ModuleID(origin.Module), present, txs, origin)
return nil
})
}
49 changes: 49 additions & 0 deletions pkg/mempool/simplemempool/simplemempool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package simplemempool

import (
"github.com/filecoin-project/mir/pkg/dsl"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/computeids"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/formbatches"
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/lookuptxs"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/pb/requestpb"
t "github.com/filecoin-project/mir/pkg/types"
)

// ModuleConfig sets the module ids. All replicas are expected to use identical module configurations.
type ModuleConfig = common.ModuleConfig

// ModuleParams sets the values for the parameters of an instance of the protocol.
// All replicas are expected to use identical module parameters.
type ModuleParams = common.ModuleParams

// DefaultModuleConfig returns a valid module config with default names for all modules.
func DefaultModuleConfig() *ModuleConfig {
return &ModuleConfig{
Self: "availability",
Hasher: "hasher",
Crypto: "crypto",
}
}

// NewModule creates a new instance of a simple mempool module implementation. It passively waits for
// eventpb.NewRequests events and stores them in a local map.
//
// On a batch request, this implementation creates a batch that consists of as many requests received since the
// previous batch request as possible with respect to params.MaxTransactionsInBatch.
//
// This implementation uses the hash function provided by the mc.Hasher module to compute transaction IDs and batch IDs.
func NewModule(mc *ModuleConfig, params *ModuleParams) modules.Module {
m := dsl.NewModule(mc.Self)

commonState := &common.State{
TxByID: make(map[t.TxID]*requestpb.Request),
}

computeids.IncludeComputationOfTransactionAndBatchIDs(m, mc, params, commonState)
formbatches.IncludeBatchCreation(m, mc, params, commonState)
lookuptxs.IncludeTransactionLookupByID(m, mc, params, commonState)

return m
}

0 comments on commit e0af2d5

Please sign in to comment.