From 95e97d36b64ad3d39fe3a68c68dc91269a5dd3dc Mon Sep 17 00:00:00 2001 From: Matej Pavlovic Date: Thu, 16 Mar 2023 12:15:31 +0100 Subject: [PATCH] External transaction fetcher for mempool The SimpleMempool gets a new (function-type) parameter. It can be set to a function that returns a list of transactions, in which case the mempool will use this function to obtain transactions externally. Signed-off-by: Matej Pavlovic --- .../simplemempool/internal/common/common.go | 7 ++++ .../parts/formbatchesext/formbatches.go | 39 +++++++++++++++++++ .../formbatches.go | 2 +- pkg/mempool/simplemempool/simplemempool.go | 11 +++++- 4 files changed, 56 insertions(+), 3 deletions(-) create mode 100644 pkg/mempool/simplemempool/internal/parts/formbatchesext/formbatches.go rename pkg/mempool/simplemempool/internal/parts/{formbatches => formbatchesint}/formbatches.go (98%) diff --git a/pkg/mempool/simplemempool/internal/common/common.go b/pkg/mempool/simplemempool/internal/common/common.go index 97c03ec72..a2a3fb170 100644 --- a/pkg/mempool/simplemempool/internal/common/common.go +++ b/pkg/mempool/simplemempool/internal/common/common.go @@ -15,6 +15,13 @@ type ModuleConfig struct { // All replicas are expected to use identical module parameters. type ModuleParams struct { MaxTransactionsInBatch int + + // If this parameter is not nil, the mempool will not receive transactions directly (through NewRequests) events. + // On reception of such an event, it will report an error (making the system crash). + // Instead, TxFetcher will be called to pull transactions from an external source + // when they are needed to form a batch (upon the RequestBatch event). + // Looking up transactions by ID will also always fail (return no transactions). + TxFetcher func() []*requestpbtypes.Request } // State represents the common state accessible to all parts of the module implementation. diff --git a/pkg/mempool/simplemempool/internal/parts/formbatchesext/formbatches.go b/pkg/mempool/simplemempool/internal/parts/formbatchesext/formbatches.go new file mode 100644 index 000000000..7c8f5a992 --- /dev/null +++ b/pkg/mempool/simplemempool/internal/parts/formbatchesext/formbatches.go @@ -0,0 +1,39 @@ +package formbatchesext + +import ( + "github.com/filecoin-project/mir/pkg/dsl" + "github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common" + mpdsl "github.com/filecoin-project/mir/pkg/pb/mempoolpb/dsl" + mppbtypes "github.com/filecoin-project/mir/pkg/pb/mempoolpb/types" + requestpbtypes "github.com/filecoin-project/mir/pkg/pb/requestpb/types" + t "github.com/filecoin-project/mir/pkg/types" +) + +// IncludeBatchCreation registers event handlers for processing NewRequests and RequestBatch events. +func IncludeBatchCreation( + m dsl.Module, + mc *common.ModuleConfig, + fetchTransactions func() []*requestpbtypes.Request, +) { + + mpdsl.UponTransactionIDsResponse(m, func(txIDs []t.TxID, context *requestTxIDsContext) error { + mpdsl.NewBatch(m, context.origin.Module, txIDs, context.txs, context.origin) + return nil + }) + + mpdsl.UponRequestBatch(m, func(origin *mppbtypes.RequestBatchOrigin) error { + txs := fetchTransactions() + mpdsl.RequestTransactionIDs(m, mc.Self, txs, &requestTxIDsContext{ + txs: txs, + origin: origin, + }) + return nil + }) +} + +// Context data structures + +type requestTxIDsContext struct { + txs []*requestpbtypes.Request + origin *mppbtypes.RequestBatchOrigin +} diff --git a/pkg/mempool/simplemempool/internal/parts/formbatches/formbatches.go b/pkg/mempool/simplemempool/internal/parts/formbatchesint/formbatches.go similarity index 98% rename from pkg/mempool/simplemempool/internal/parts/formbatches/formbatches.go rename to pkg/mempool/simplemempool/internal/parts/formbatchesint/formbatches.go index ce2d83e3d..d5e303588 100644 --- a/pkg/mempool/simplemempool/internal/parts/formbatches/formbatches.go +++ b/pkg/mempool/simplemempool/internal/parts/formbatchesint/formbatches.go @@ -1,4 +1,4 @@ -package formbatches +package formbatchesint import ( "github.com/filecoin-project/mir/pkg/dsl" diff --git a/pkg/mempool/simplemempool/simplemempool.go b/pkg/mempool/simplemempool/simplemempool.go index d90a55f72..4b31b2429 100644 --- a/pkg/mempool/simplemempool/simplemempool.go +++ b/pkg/mempool/simplemempool/simplemempool.go @@ -4,7 +4,8 @@ import ( "github.com/filecoin-project/mir/pkg/dsl" "github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common" "github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/computeids" - "github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/formbatches" + "github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/formbatchesext" + "github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/formbatchesint" "github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/lookuptxs" "github.com/filecoin-project/mir/pkg/modules" requestpbtypes "github.com/filecoin-project/mir/pkg/pb/requestpb/types" @@ -28,6 +29,7 @@ func DefaultModuleConfig() *ModuleConfig { func DefaultModuleParams() *ModuleParams { return &ModuleParams{ MaxTransactionsInBatch: 10, + TxFetcher: nil, } } @@ -46,8 +48,13 @@ func NewModule(mc *ModuleConfig, params *ModuleParams) modules.Module { } computeids.IncludeComputationOfTransactionAndBatchIDs(m, mc, params, commonState) - formbatches.IncludeBatchCreation(m, mc, params, commonState) lookuptxs.IncludeTransactionLookupByID(m, mc, params, commonState) + if params.TxFetcher != nil { + formbatchesext.IncludeBatchCreation(m, mc, params.TxFetcher) + } else { + formbatchesint.IncludeBatchCreation(m, mc, params, commonState) + } + return m }