diff --git a/pkg/mempool/simplemempool/internal/common/common.go b/pkg/mempool/simplemempool/internal/common/common.go new file mode 100644 index 000000000..c850c7f5a --- /dev/null +++ b/pkg/mempool/simplemempool/internal/common/common.go @@ -0,0 +1,22 @@ +package common + +import t "github.com/filecoin-project/mir/pkg/types" + +// ModuleConfig sets the module ids. All replicas are expected to use identical module configurations. +type ModuleConfig struct { + Self t.ModuleID // id of this module + Hasher t.ModuleID + Crypto t.ModuleID +} + +// ModuleParams sets the values for the parameters of an instance of the protocol. +// All replicas are expected to use identical module parameters. +type ModuleParams struct { + MaxBatchSizeInBytes int + MaxTransactionsInBatch int +} + +// State represents the common state accessible to all parts of the module implementation. +type State struct { + TxByID map[t.TxID][]byte +} diff --git a/pkg/mempool/simplemempool/internal/parts/computeids/computeids.go b/pkg/mempool/simplemempool/internal/parts/computeids/computeids.go new file mode 100644 index 000000000..3ba209f73 --- /dev/null +++ b/pkg/mempool/simplemempool/internal/parts/computeids/computeids.go @@ -0,0 +1,64 @@ +package computeids + +import ( + "github.com/filecoin-project/mir/pkg/dsl" + mpdsl "github.com/filecoin-project/mir/pkg/mempool/dsl" + "github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common" + mppb "github.com/filecoin-project/mir/pkg/pb/mempoolpb" + t "github.com/filecoin-project/mir/pkg/types" +) + +// +func IncludeComputationOfTransactionAndBatchIDs( + m dsl.Module, + mc *common.ModuleConfig, + params *common.ModuleParams, + commonState *common.State, +) { + mpdsl.UponRequestTransactionIDs(m, func(txs [][]byte, origin *mppb.RequestTransactionIDsOrigin) error { + txMsgs := make([][][]byte, len(txs)) + for i, tx := range txs { + txMsgs[i] = [][]byte{tx} + } + + dsl.HashRequest(m, mc.Hasher, txMsgs, &computeHashForTransactionIDsContext{origin}) + return nil + }) + + dsl.UponHashResult(m, func(hashes [][]byte, context *computeHashForTransactionIDsContext) error { + txIDs := make([]t.TxID, len(hashes)) + for i, hash := range hashes { + txIDs[i] = t.TxID(hash) + } + + mpdsl.TransactionIDsResponse(m, t.ModuleID(context.origin.Module), txIDs, context.origin) + return nil + }) + + mpdsl.UponRequestBatchID(m, func(txIDs []t.TxID, origin *mppb.RequestBatchIDOrigin) error { + data := make([][]byte, len(txIDs)) + for i, txID := range txIDs { + data[i] = txID.Bytes() + } + + dsl.HashOneMessage(m, mc.Hasher, data, &computeHashForBatchIDContext{origin}) + return nil + }) + + dsl.UponOneHashResult(m, func(hash []byte, context *computeHashForBatchIDContext) error { + mpdsl.BatchIDResponse(m, t.ModuleID(context.origin.Module), t.BatchID(hash), context.origin) + return nil + }) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Context data structures // +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +type computeHashForTransactionIDsContext struct { + origin *mppb.RequestTransactionIDsOrigin +} + +type computeHashForBatchIDContext struct { + origin *mppb.RequestBatchIDOrigin +} diff --git a/pkg/mempool/simplemempool/internal/parts/formbatches/formbatches.go b/pkg/mempool/simplemempool/internal/parts/formbatches/formbatches.go new file mode 100644 index 000000000..3a6c44e7b --- /dev/null +++ b/pkg/mempool/simplemempool/internal/parts/formbatches/formbatches.go @@ -0,0 +1,95 @@ +package formbatches + +import ( + "fmt" + + "google.golang.org/protobuf/proto" + + "github.com/filecoin-project/mir/pkg/dsl" + mpdsl "github.com/filecoin-project/mir/pkg/mempool/dsl" + "github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common" + mppb "github.com/filecoin-project/mir/pkg/pb/mempoolpb" + "github.com/filecoin-project/mir/pkg/pb/requestpb" + t "github.com/filecoin-project/mir/pkg/types" +) + +type State struct { + *common.State + NewTxIDs []t.TxID +} + +// IncludeBatchCreation registers event handlers for processing new transactions and forming batches. +func IncludeBatchCreation( + m dsl.Module, + mc *common.ModuleConfig, + params *common.ModuleParams, + commonState *common.State, +) { + state := &State{ + State: commonState, + NewTxIDs: nil, + } + + dsl.UponNewRequests(m, func(requests []*requestpb.Request) error { + txs := make([][]byte, len(requests)) + for i, req := range requests { + var err error + txs[i], err = proto.Marshal(req) + + if err != nil { + // The module that sent the NewRequests event is responsible to + // make sure that the requests are valid. + return err + } + + if len(txs[i]) > params.MaxBatchSizeInBytes { + return fmt.Errorf("transaction is too large (%v bytes)", params.MaxBatchSizeInBytes) + } + } + + mpdsl.RequestTransactionIDs(m, mc.Self, txs, &requestTxIDsContext{txs}) + return nil + }) + + mpdsl.UponTransactionIDsResponse(m, func(txIDs []t.TxID, context *requestTxIDsContext) error { + for i := range txIDs { + state.TxByID[txIDs[i]] = context.txs[i] + } + state.NewTxIDs = append(state.NewTxIDs, txIDs...) + return nil + }) + + mpdsl.UponRequestBatch(m, func(origin *mppb.RequestBatchOrigin) error { + var txIDs []t.TxID + var txs [][]byte + batchSize := 0 + + var i int + var txID t.TxID + + for i, txID = range state.NewTxIDs { + tx := state.TxByID[txID] + if i == params.MaxTransactionsInBatch || batchSize+len(tx) > params.MaxBatchSizeInBytes { + break + } + + txIDs = append(txIDs, txID) + txs = append(txs, tx) + batchSize += len(tx) + } + + state.NewTxIDs = state.NewTxIDs[i:] + + // Note that a batch may be empty. + mpdsl.NewBatch(m, t.ModuleID(origin.Module), txIDs, txs, origin) + return nil + }) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Context data structures // +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +type requestTxIDsContext struct { + txs [][]byte +} diff --git a/pkg/mempool/simplemempool/internal/parts/lookuptxs/lockuptxs.go b/pkg/mempool/simplemempool/internal/parts/lookuptxs/lockuptxs.go new file mode 100644 index 000000000..7f276aa9f --- /dev/null +++ b/pkg/mempool/simplemempool/internal/parts/lookuptxs/lockuptxs.go @@ -0,0 +1,29 @@ +package lookuptxs + +import ( + "github.com/filecoin-project/mir/pkg/dsl" + mpdsl "github.com/filecoin-project/mir/pkg/mempool/dsl" + "github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common" + mppb "github.com/filecoin-project/mir/pkg/pb/mempoolpb" + t "github.com/filecoin-project/mir/pkg/types" +) + +// IncludeTransactionLookupByID registers event handlers for transaction looking up transactions in the mempool by +// their IDs. +func IncludeTransactionLookupByID( + m dsl.Module, + mc *common.ModuleConfig, + params *common.ModuleParams, + commonState *common.State, +) { + mpdsl.UponRequestTransactions(m, func(txIDs []t.TxID, origin *mppb.RequestTransactionsOrigin) error { + present := make([]bool, len(txIDs)) + txs := make([][]byte, len(txIDs)) + for i, txID := range txIDs { + txs[i], present[i] = commonState.TxByID[txID] + } + + mpdsl.TransactionsResponse(m, t.ModuleID(origin.Module), present, txs, origin) + return nil + }) +} diff --git a/pkg/mempool/simplemempool/simplemempool.go b/pkg/mempool/simplemempool/simplemempool.go new file mode 100644 index 000000000..19325416e --- /dev/null +++ b/pkg/mempool/simplemempool/simplemempool.go @@ -0,0 +1,48 @@ +package simplemempool + +import ( + "github.com/filecoin-project/mir/pkg/dsl" + "github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common" + "github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/computeids" + "github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/formbatches" + "github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/lookuptxs" + "github.com/filecoin-project/mir/pkg/modules" + t "github.com/filecoin-project/mir/pkg/types" +) + +// ModuleConfig sets the module ids. All replicas are expected to use identical module configurations. +type ModuleConfig = common.ModuleConfig + +// ModuleParams sets the values for the parameters of an instance of the protocol. +// All replicas are expected to use identical module parameters. +type ModuleParams = common.ModuleParams + +// DefaultModuleConfig returns a valid module config with default names for all modules. +func DefaultModuleConfig() *ModuleConfig { + return &ModuleConfig{ + Self: "availability", + Hasher: "hasher", + Crypto: "crypto", + } +} + +// NewModule creates a new instance of a simple mempool module implementation. It passively waits for +// eventpb.NewRequests events and stores them in a local map. +// +// On a batch request, this implementation creates a batch that consists of as many requests received since the +// previous batch request as possible with respect to params.MaxBatchSizeInBytes and params.MaxTransactionsInBatch. +// +// This implementation uses the hash function provided by the mc.Hasher module to compute transaction IDs and batch IDs. +func NewModule(mc *ModuleConfig, params *ModuleParams) modules.Module { + m := dsl.NewModule(mc.Self) + + commonState := &common.State{ + TxByID: make(map[t.TxID][]byte), + } + + computeids.IncludeComputationOfTransactionAndBatchIDs(m, mc, params, commonState) + formbatches.IncludeBatchCreation(m, mc, params, commonState) + lookuptxs.IncludeTransactionLookupByID(m, mc, params, commonState) + + return m +}