Skip to content

Commit

Permalink
Merge pull request #6184 from multiversx/feat/chain-simulator-improvm…
Browse files Browse the repository at this point in the history
…ents

`feat/chain simulator improvements`
  • Loading branch information
miiu96 authored May 31, 2024
2 parents 01ccfaf + 4162705 commit 9e3b26b
Show file tree
Hide file tree
Showing 12 changed files with 440 additions and 6 deletions.
16 changes: 16 additions & 0 deletions node/chainSimulator/chainSimulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,22 @@ func (s *simulator) incrementRoundOnAllValidators() {
}
}

// ForceChangeOfEpoch will force the change of current epoch
// This method will call the epoch change trigger and generate block till a new epoch is reached
func (s *simulator) ForceChangeOfEpoch() error {
log.Info("force change of epoch")
for shardID, node := range s.nodes {
err := node.ForceChangeOfEpoch()
if err != nil {
return fmt.Errorf("force change of epoch shardID-%d: error-%w", shardID, err)
}
}

epoch := s.nodes[core.MetachainShardId].GetProcessComponents().EpochStartTrigger().Epoch()

return s.GenerateBlocksUntilEpochIsReached(int32(epoch + 1))
}

func (s *simulator) allNodesCreateBlocks() error {
for _, node := range s.handlers {
// TODO MX-15150 remove this when we remove all goroutines
Expand Down
51 changes: 49 additions & 2 deletions node/chainSimulator/chainSimulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,53 @@ func TestChainSimulator_GenerateBlocksAndEpochChangeShouldWork(t *testing.T) {
assert.True(t, numAccountsWithIncreasedBalances > 0)
}

func TestSimulator_TriggerChangeOfEpoch(t *testing.T) {
if testing.Short() {
t.Skip("this is not a short test")
}

startTime := time.Now().Unix()
roundDurationInMillis := uint64(6000)
roundsPerEpoch := core.OptionalUint64{
HasValue: true,
Value: 15000,
}
chainSimulator, err := NewChainSimulator(ArgsChainSimulator{
BypassTxSignatureCheck: false,
TempDir: t.TempDir(),
PathToInitialConfig: defaultPathToInitialConfig,
NumOfShards: 3,
GenesisTimestamp: startTime,
RoundDurationInMillis: roundDurationInMillis,
RoundsPerEpoch: roundsPerEpoch,
ApiInterface: api.NewNoApiInterface(),
MinNodesPerShard: 100,
MetaChainMinNodes: 100,
ConsensusGroupSize: 1,
MetaChainConsensusGroupSize: 1,
})
require.Nil(t, err)
require.NotNil(t, chainSimulator)

defer chainSimulator.Close()

err = chainSimulator.ForceChangeOfEpoch()
require.Nil(t, err)

err = chainSimulator.ForceChangeOfEpoch()
require.Nil(t, err)

err = chainSimulator.ForceChangeOfEpoch()
require.Nil(t, err)

err = chainSimulator.ForceChangeOfEpoch()
require.Nil(t, err)

metaNode := chainSimulator.GetNodeHandler(core.MetachainShardId)
currentEpoch := metaNode.GetProcessComponents().EpochStartTrigger().Epoch()
require.Equal(t, uint32(4), currentEpoch)
}

func TestChainSimulator_SetState(t *testing.T) {
if testing.Short() {
t.Skip("this is not a short test")
Expand Down Expand Up @@ -226,7 +273,7 @@ func TestChainSimulator_SetEntireState(t *testing.T) {
CodeMetadata: "BQY=",
Owner: "erd1ss6u80ruas2phpmr82r42xnkd6rxy40g9jl69frppl4qez9w2jpsqj8x97",
DeveloperRewards: "5401004999998",
Keys: map[string]string{
Pairs: map[string]string{
"73756d": "0a",
},
}
Expand Down Expand Up @@ -276,7 +323,7 @@ func TestChainSimulator_SetEntireStateWithRemoval(t *testing.T) {
CodeMetadata: "BQY=",
Owner: "erd1ss6u80ruas2phpmr82r42xnkd6rxy40g9jl69frppl4qez9w2jpsqj8x97",
DeveloperRewards: "5401004999998",
Keys: map[string]string{
Pairs: map[string]string{
"73756d": "0a",
},
}
Expand Down
6 changes: 6 additions & 0 deletions node/chainSimulator/components/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,9 @@ type SyncedBroadcastNetworkHandler interface {
type APIConfigurator interface {
RestApiInterface(shardID uint32) string
}

// NetworkMessenger defines what a network messenger should do
type NetworkMessenger interface {
Broadcast(topic string, buff []byte)
IsInterfaceNil() bool
}
27 changes: 26 additions & 1 deletion node/chainSimulator/components/processComponents.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"time"

"github.com/multiversx/mx-chain-core-go/core/partitioning"
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/common/forking"
"github.com/multiversx/mx-chain-go/common/ordering"
Expand Down Expand Up @@ -265,7 +266,7 @@ func CreateProcessComponents(args ArgsProcessComponentsHolder) (*processComponen
nodeRedundancyHandler: managedProcessComponents.NodeRedundancyHandler(),
currentEpochProvider: managedProcessComponents.CurrentEpochProvider(),
scheduledTxsExecutionHandler: managedProcessComponents.ScheduledTxsExecutionHandler(),
txsSenderHandler: managedProcessComponents.TxsSenderHandler(),
txsSenderHandler: managedProcessComponents.TxsSenderHandler(), // warning: this will be replaced
hardforkTrigger: managedProcessComponents.HardforkTrigger(),
processedMiniBlocksTracker: managedProcessComponents.ProcessedMiniBlocksTracker(),
esdtDataStorageHandlerForAPI: managedProcessComponents.ESDTDataStorageHandlerForAPI(),
Expand All @@ -275,6 +276,30 @@ func CreateProcessComponents(args ArgsProcessComponentsHolder) (*processComponen
managedProcessComponentsCloser: managedProcessComponents,
}

return replaceWithCustomProcessSubComponents(instance, processArgs)
}

func replaceWithCustomProcessSubComponents(
instance *processComponentsHolder,
processArgs processComp.ProcessComponentsFactoryArgs,
) (*processComponentsHolder, error) {
dataPacker, err := partitioning.NewSimpleDataPacker(processArgs.CoreData.InternalMarshalizer())
if err != nil {
return nil, fmt.Errorf("%w in replaceWithCustomProcessSubComponents", err)
}

argsSyncedTxsSender := ArgsSyncedTxsSender{
Marshaller: processArgs.CoreData.InternalMarshalizer(),
ShardCoordinator: processArgs.BootstrapComponents.ShardCoordinator(),
NetworkMessenger: processArgs.Network.NetworkMessenger(),
DataPacker: dataPacker,
}

instance.txsSenderHandler, err = NewSyncedTxsSender(argsSyncedTxsSender)
if err != nil {
return nil, fmt.Errorf("%w in replaceWithCustomProcessSubComponents", err)
}

return instance, nil
}

Expand Down
110 changes: 110 additions & 0 deletions node/chainSimulator/components/syncedTxsSender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package components

import (
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data/transaction"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/process/factory"
"github.com/multiversx/mx-chain-go/sharding"
)

// ArgsSyncedTxsSender is a holder struct for all necessary arguments to create a NewSyncedTxsSender
type ArgsSyncedTxsSender struct {
Marshaller marshal.Marshalizer
ShardCoordinator sharding.Coordinator
NetworkMessenger NetworkMessenger
DataPacker process.DataPacker
}

type syncedTxsSender struct {
marshaller marshal.Marshalizer
shardCoordinator sharding.Coordinator
networkMessenger NetworkMessenger
dataPacker process.DataPacker
}

// NewSyncedTxsSender creates a new instance of syncedTxsSender
func NewSyncedTxsSender(args ArgsSyncedTxsSender) (*syncedTxsSender, error) {
if check.IfNil(args.Marshaller) {
return nil, process.ErrNilMarshalizer
}
if check.IfNil(args.ShardCoordinator) {
return nil, process.ErrNilShardCoordinator
}
if check.IfNil(args.NetworkMessenger) {
return nil, process.ErrNilMessenger
}
if check.IfNil(args.DataPacker) {
return nil, dataRetriever.ErrNilDataPacker
}

ret := &syncedTxsSender{
marshaller: args.Marshaller,
shardCoordinator: args.ShardCoordinator,
networkMessenger: args.NetworkMessenger,
dataPacker: args.DataPacker,
}

return ret, nil
}

// SendBulkTransactions sends the provided transactions as a bulk, optimizing transfer between nodes
func (sender *syncedTxsSender) SendBulkTransactions(txs []*transaction.Transaction) (uint64, error) {
if len(txs) == 0 {
return 0, process.ErrNoTxToProcess
}

sender.sendBulkTransactions(txs)

return uint64(len(txs)), nil
}

func (sender *syncedTxsSender) sendBulkTransactions(txs []*transaction.Transaction) {
transactionsByShards := make(map[uint32][][]byte)
for _, tx := range txs {
marshalledTx, err := sender.marshaller.Marshal(tx)
if err != nil {
log.Warn("txsSender.sendBulkTransactions",
"marshaller error", err,
)
continue
}

senderShardId := sender.shardCoordinator.ComputeId(tx.SndAddr)
transactionsByShards[senderShardId] = append(transactionsByShards[senderShardId], marshalledTx)
}

for shardId, txsForShard := range transactionsByShards {
err := sender.sendBulkTransactionsFromShard(txsForShard, shardId)
log.LogIfError(err)
}
}

func (sender *syncedTxsSender) sendBulkTransactionsFromShard(transactions [][]byte, senderShardId uint32) error {
// the topic identifier is made of the current shard id and sender's shard id
identifier := factory.TransactionTopic + sender.shardCoordinator.CommunicationIdentifier(senderShardId)

packets, err := sender.dataPacker.PackDataInChunks(transactions, common.MaxBulkTransactionSize)
if err != nil {
return err
}

for _, buff := range packets {
sender.networkMessenger.Broadcast(identifier, buff)
}

return nil
}

// Close returns nil
func (sender *syncedTxsSender) Close() error {
return nil
}

// IsInterfaceNil checks if the underlying pointer is nil
func (sender *syncedTxsSender) IsInterfaceNil() bool {
return sender == nil
}
Loading

0 comments on commit 9e3b26b

Please sign in to comment.