Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full archive peer shard mapper #5337

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ const ConnectionTopic = "connection"
// ValidatorInfoTopic is the topic used for validatorInfo signaling
const ValidatorInfoTopic = "validatorInfo"

// FullArchiveTopicPrefix is the topic prefix used for specific topics that have different interceptors on full archive network
const FullArchiveTopicPrefix = "full_archive_"
iulianpascalau marked this conversation as resolved.
Show resolved Hide resolved

// MetricCurrentRound is the metric for monitoring the current round of a node
const MetricCurrentRound = "erd_current_round"

Expand Down
80 changes: 80 additions & 0 deletions common/disabled/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package disabled
iulianpascalau marked this conversation as resolved.
Show resolved Hide resolved

type cache struct {
}

// NewCache returns a new disabled Cacher implementation
func NewCache() *cache {
return &cache{}
}

// Clear does nothing as it is disabled
func (c *cache) Clear() {
}

// Put returns false as it is disabled
func (c *cache) Put(_ []byte, _ interface{}, _ int) (evicted bool) {
return false
}

// Get returns nil and false as it is disabled
func (c *cache) Get(_ []byte) (value interface{}, ok bool) {
return nil, false
}

// Has returns false as it is disabled
func (c *cache) Has(_ []byte) bool {
return false
}

// Peek returns nil and false as it is disabled
func (c *cache) Peek(_ []byte) (value interface{}, ok bool) {
return nil, false
}

// HasOrAdd returns false and false as it is disabled
func (c *cache) HasOrAdd(_ []byte, _ interface{}, _ int) (has, added bool) {
return false, false
}

// Remove does nothing as it is disabled
func (c *cache) Remove(_ []byte) {
}

// Keys returns an empty slice as it is disabled
func (c *cache) Keys() [][]byte {
return make([][]byte, 0)
}

// Len returns 0 as it is disabled
func (c *cache) Len() int {
return 0
}

// SizeInBytesContained returns 0 as it is disabled
func (c *cache) SizeInBytesContained() uint64 {
return 0
}

// MaxSize returns 0 as it is disabled
func (c *cache) MaxSize() int {
return 0
}

// RegisterHandler does nothing as it is disabled
func (c *cache) RegisterHandler(_ func(key []byte, value interface{}), _ string) {
}

// UnRegisterHandler does nothing as it is disabled
func (c *cache) UnRegisterHandler(_ string) {
}

// Close returns nil as it is disabled
func (c *cache) Close() error {
return nil
}

// IsInterfaceNil returns true if there is no value under the interface
func (c *cache) IsInterfaceNil() bool {
return c == nil
}
7 changes: 5 additions & 2 deletions epochStart/bootstrap/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ func checkArguments(args ArgsEpochStartBootstrap) error {
if check.IfNil(args.GenesisShardCoordinator) {
return fmt.Errorf("%s: %w", baseErrorMessage, epochStart.ErrNilShardCoordinator)
}
if check.IfNil(args.Messenger) {
return fmt.Errorf("%s: %w", baseErrorMessage, epochStart.ErrNilMessenger)
if check.IfNil(args.MainMessenger) {
return fmt.Errorf("%s on main network: %w", baseErrorMessage, epochStart.ErrNilMessenger)
}
if check.IfNil(args.FullArchiveMessenger) {
return fmt.Errorf("%s on full archive network: %w", baseErrorMessage, epochStart.ErrNilMessenger)
}
if check.IfNil(args.EconomicsData) {
return fmt.Errorf("%s: %w", baseErrorMessage, epochStart.ErrNilEconomicsData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/multiversx/mx-chain-go/epochStart/bootstrap/disabled"
disabledFactory "github.com/multiversx/mx-chain-go/factory/disabled"
disabledGenesis "github.com/multiversx/mx-chain-go/genesis/process/disabled"
"github.com/multiversx/mx-chain-go/p2p"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/process/factory/interceptorscontainer"
"github.com/multiversx/mx-chain-go/sharding"
Expand All @@ -29,7 +30,8 @@ type ArgsEpochStartInterceptorContainer struct {
CryptoComponents process.CryptoComponentsHolder
Config config.Config
ShardCoordinator sharding.Coordinator
Messenger process.TopicHandler
MainMessenger process.TopicHandler
FullArchiveMessenger process.TopicHandler
DataPool dataRetriever.PoolsHolder
WhiteListHandler update.WhiteListHandler
WhiteListerVerifiedTxs update.WhiteListHandler
Expand All @@ -40,6 +42,7 @@ type ArgsEpochStartInterceptorContainer struct {
HeaderIntegrityVerifier process.HeaderIntegrityVerifier
RequestHandler process.RequestHandler
SignaturesHandler process.SignaturesHandler
NodeOperationMode p2p.NodeOperation
}

// NewEpochStartInterceptorsContainer will return a real interceptors container factory, but with many disabled components
Expand Down Expand Up @@ -72,6 +75,7 @@ func NewEpochStartInterceptorsContainer(args ArgsEpochStartInterceptorContainer)
epochStartTrigger := disabled.NewEpochStartTrigger()
// TODO: move the peerShardMapper creation before boostrapComponents
peerShardMapper := disabled.NewPeerShardMapper()
fullArchivePeerShardMapper := disabled.NewPeerShardMapper()
hardforkTrigger := disabledFactory.HardforkTrigger()

containerFactoryArgs := interceptorscontainer.CommonInterceptorsContainerFactoryArgs{
Expand All @@ -80,7 +84,8 @@ func NewEpochStartInterceptorsContainer(args ArgsEpochStartInterceptorContainer)
Accounts: accountsAdapter,
ShardCoordinator: args.ShardCoordinator,
NodesCoordinator: nodesCoordinator,
Messenger: args.Messenger,
MainMessenger: args.MainMessenger,
FullArchiveMessenger: args.FullArchiveMessenger,
Store: storer,
DataPool: args.DataPool,
MaxTxNonceDeltaAllowed: common.MaxTxNonceDeltaAllowed,
Expand All @@ -100,7 +105,8 @@ func NewEpochStartInterceptorsContainer(args ArgsEpochStartInterceptorContainer)
PeerSignatureHandler: cryptoComponents.PeerSignatureHandler(),
SignaturesHandler: args.SignaturesHandler,
HeartbeatExpiryTimespanInSec: args.Config.HeartbeatV2.HeartbeatExpiryTimespanInSec,
PeerShardMapper: peerShardMapper,
MainPeerShardMapper: peerShardMapper,
FullArchivePeerShardMapper: fullArchivePeerShardMapper,
HardforkTrigger: hardforkTrigger,
}

Expand Down
8 changes: 7 additions & 1 deletion epochStart/bootstrap/fromLocalStorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,13 @@ func (e *epochStartBootstrap) prepareEpochFromStorage() (Parameters, error) {
return Parameters{}, err
}

err = e.messenger.CreateTopic(common.ConsensusTopic+e.shardCoordinator.CommunicationIdentifier(e.shardCoordinator.SelfId()), true)
consensusTopic := common.ConsensusTopic + e.shardCoordinator.CommunicationIdentifier(e.shardCoordinator.SelfId())
err = e.mainMessenger.CreateTopic(consensusTopic, true)
if err != nil {
return Parameters{}, err
}

err = e.fullArchiveMessenger.CreateTopic(consensusTopic, true)
iulianpascalau marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return Parameters{}, err
}
Expand Down
65 changes: 47 additions & 18 deletions epochStart/bootstrap/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/multiversx/mx-chain-go/epochStart/bootstrap/types"
factoryDisabled "github.com/multiversx/mx-chain-go/factory/disabled"
"github.com/multiversx/mx-chain-go/heartbeat/sender"
disabledP2P "github.com/multiversx/mx-chain-go/p2p/disabled"
"github.com/multiversx/mx-chain-go/p2p"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/process/block/preprocess"
"github.com/multiversx/mx-chain-go/process/heartbeat/validator"
Expand Down Expand Up @@ -93,7 +93,8 @@ type epochStartBootstrap struct {
destinationShardAsObserver uint32
coreComponentsHolder process.CoreComponentsHolder
cryptoComponentsHolder process.CryptoComponentsHolder
messenger Messenger
mainMessenger Messenger
fullArchiveMessenger Messenger
generalConfig config.Config
prefsConfig config.PreferencesConfig
flagsConfig config.ContextFlagsConfig
Expand All @@ -117,7 +118,7 @@ type epochStartBootstrap struct {
bootstrapHeartbeatSender update.Closer
trieSyncStatisticsProvider common.SizeSyncStatisticsHandler
nodeProcessingMode common.NodeProcessingMode

nodeOperationMode p2p.NodeOperation
// created components
requestHandler process.RequestHandler
interceptorContainer process.InterceptorsContainer
Expand Down Expand Up @@ -162,7 +163,8 @@ type ArgsEpochStartBootstrap struct {
CoreComponentsHolder process.CoreComponentsHolder
CryptoComponentsHolder process.CryptoComponentsHolder
DestinationShardAsObserver uint32
Messenger Messenger
MainMessenger Messenger
FullArchiveMessenger Messenger
GeneralConfig config.Config
PrefsConfig config.PreferencesConfig
FlagsConfig config.ContextFlagsConfig
Expand Down Expand Up @@ -201,7 +203,8 @@ func NewEpochStartBootstrap(args ArgsEpochStartBootstrap) (*epochStartBootstrap,
epochStartProvider := &epochStartBootstrap{
coreComponentsHolder: args.CoreComponentsHolder,
cryptoComponentsHolder: args.CryptoComponentsHolder,
messenger: args.Messenger,
mainMessenger: args.MainMessenger,
fullArchiveMessenger: args.FullArchiveMessenger,
generalConfig: args.GeneralConfig,
prefsConfig: args.PrefsConfig,
flagsConfig: args.FlagsConfig,
Expand All @@ -228,6 +231,11 @@ func NewEpochStartBootstrap(args ArgsEpochStartBootstrap) (*epochStartBootstrap,
shardCoordinator: args.GenesisShardCoordinator,
trieSyncStatisticsProvider: args.TrieSyncStatisticsProvider,
nodeProcessingMode: args.NodeProcessingMode,
nodeOperationMode: p2p.NormalOperation,
}

if epochStartProvider.prefsConfig.FullArchive {
epochStartProvider.nodeOperationMode = p2p.FullArchiveMode
}

whiteListCache, err := storageunit.NewCache(storageFactory.GetCacherFromConfig(epochStartProvider.generalConfig.WhiteListPool))
Expand Down Expand Up @@ -418,10 +426,16 @@ func (e *epochStartBootstrap) bootstrapFromLocalStorage() (Parameters, error) {

func (e *epochStartBootstrap) cleanupOnBootstrapFinish() {
log.Debug("unregistering all message processor and un-joining all topics")
errMessenger := e.messenger.UnregisterAllMessageProcessors()
errMessenger := e.mainMessenger.UnregisterAllMessageProcessors()
log.LogIfError(errMessenger)

errMessenger = e.mainMessenger.UnJoinAllTopics()
log.LogIfError(errMessenger)

errMessenger = e.fullArchiveMessenger.UnregisterAllMessageProcessors()
log.LogIfError(errMessenger)

errMessenger = e.messenger.UnJoinAllTopics()
errMessenger = e.fullArchiveMessenger.UnJoinAllTopics()
log.LogIfError(errMessenger)

e.closeTrieNodes()
Expand Down Expand Up @@ -511,7 +525,7 @@ func (e *epochStartBootstrap) prepareComponentsToSyncFromNetwork() error {

epochStartConfig := e.generalConfig.EpochStartConfig
metaBlockProcessor, err := NewEpochStartMetaBlockProcessor(
e.messenger,
e.mainMessenger,
e.requestHandler,
e.coreComponentsHolder.InternalMarshalizer(),
e.coreComponentsHolder.Hasher(),
Expand All @@ -527,7 +541,7 @@ func (e *epochStartBootstrap) prepareComponentsToSyncFromNetwork() error {
CoreComponentsHolder: e.coreComponentsHolder,
CryptoComponentsHolder: e.cryptoComponentsHolder,
RequestHandler: e.requestHandler,
Messenger: e.messenger,
Messenger: e.mainMessenger,
ShardCoordinator: e.shardCoordinator,
EconomicsData: e.economicsData,
WhitelistHandler: e.whiteListHandler,
Expand All @@ -550,14 +564,16 @@ func (e *epochStartBootstrap) createSyncers() error {
CryptoComponents: e.cryptoComponentsHolder,
Config: e.generalConfig,
ShardCoordinator: e.shardCoordinator,
Messenger: e.messenger,
MainMessenger: e.mainMessenger,
FullArchiveMessenger: e.fullArchiveMessenger,
DataPool: e.dataPool,
WhiteListHandler: e.whiteListHandler,
WhiteListerVerifiedTxs: e.whiteListerVerifiedTxs,
ArgumentsParser: e.argumentsParser,
HeaderIntegrityVerifier: e.headerIntegrityVerifier,
RequestHandler: e.requestHandler,
SignaturesHandler: e.messenger,
SignaturesHandler: e.mainMessenger,
NodeOperationMode: e.nodeOperationMode,
}

e.interceptorContainer, err = factoryInterceptors.NewEpochStartInterceptorsContainer(args)
Expand Down Expand Up @@ -672,7 +688,13 @@ func (e *epochStartBootstrap) requestAndProcessing() (Parameters, error) {
}
log.Debug("start in epoch bootstrap: shardCoordinator", "numOfShards", e.baseData.numberOfShards, "shardId", e.baseData.shardId)

err = e.messenger.CreateTopic(common.ConsensusTopic+e.shardCoordinator.CommunicationIdentifier(e.shardCoordinator.SelfId()), true)
consensusTopic := common.ConsensusTopic + e.shardCoordinator.CommunicationIdentifier(e.shardCoordinator.SelfId())
err = e.mainMessenger.CreateTopic(consensusTopic, true)
if err != nil {
return Parameters{}, err
}

err = e.fullArchiveMessenger.CreateTopic(consensusTopic, true)
iulianpascalau marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return Parameters{}, err
}
Expand Down Expand Up @@ -1191,7 +1213,7 @@ func (e *epochStartBootstrap) createResolversContainer() error {
log.Debug("epochStartBootstrap.createRequestHandler", "shard", e.shardCoordinator.SelfId())
resolversContainerArgs := resolverscontainer.FactoryArgs{
ShardCoordinator: e.shardCoordinator,
Messenger: e.messenger,
Messenger: e.mainMessenger,
Store: storageService,
Marshalizer: e.coreComponentsHolder.InternalMarshalizer(),
DataPools: e.dataPool,
Expand Down Expand Up @@ -1222,7 +1244,7 @@ func (e *epochStartBootstrap) createRequestHandler() error {
requestersContainerArgs := requesterscontainer.FactoryArgs{
RequesterConfig: e.generalConfig.Requesters,
ShardCoordinator: e.shardCoordinator,
Messenger: e.messenger,
Messenger: e.mainMessenger,
Marshaller: e.coreComponentsHolder.InternalMarshalizer(),
Uint64ByteSliceConverter: uint64ByteSlice.NewBigEndianConverter(),
OutputAntifloodHandler: disabled.NewAntiFloodHandler(),
Expand Down Expand Up @@ -1293,8 +1315,15 @@ func (e *epochStartBootstrap) createHeartbeatSender() error {
}

heartbeatTopic := common.HeartbeatV2Topic + e.shardCoordinator.CommunicationIdentifier(e.shardCoordinator.SelfId())
if !e.messenger.HasTopic(heartbeatTopic) {
err = e.messenger.CreateTopic(heartbeatTopic, true)
if !e.mainMessenger.HasTopic(heartbeatTopic) {
err = e.mainMessenger.CreateTopic(heartbeatTopic, true)
if err != nil {
return err
}
}

if !e.fullArchiveMessenger.HasTopic(heartbeatTopic) {
err = e.fullArchiveMessenger.CreateTopic(heartbeatTopic, true)
if err != nil {
return err
}
Expand All @@ -1306,8 +1335,8 @@ func (e *epochStartBootstrap) createHeartbeatSender() error {
}
heartbeatCfg := e.generalConfig.HeartbeatV2
argsHeartbeatSender := sender.ArgBootstrapSender{
MainMessenger: e.messenger,
FullArchiveMessenger: disabledP2P.NewNetworkMessenger(), // TODO[Sorin]: pass full archive messenger
MainMessenger: e.mainMessenger,
FullArchiveMessenger: e.fullArchiveMessenger,
Marshaller: e.coreComponentsHolder.InternalMarshalizer(),
HeartbeatTopic: heartbeatTopic,
HeartbeatTimeBetweenSends: time.Second * time.Duration(heartbeatCfg.HeartbeatTimeBetweenSendsDuringBootstrapInSec),
Expand Down
Loading