Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proofs pool cleanup delta #6725

Merged
merged 6 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@
MaxHeadersPerShard = 1000
NumElementsToRemoveOnEviction = 200

[ProofsPoolConfig]
CleanupNonceDelta = 3

[BadBlocksCache]
Name = "BadBlocksCache"
Capacity = 1000
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type HeadersPoolConfig struct {
NumElementsToRemoveOnEviction int
}

// ProofsPoolConfig will map the proofs cache configuration
type ProofsPoolConfig struct {
CleanupNonceDelta uint64
}

// DBConfig will map the database configuration
type DBConfig struct {
FilePath string
Expand Down Expand Up @@ -208,6 +213,7 @@ type Config struct {

NTPConfig NTPConfig
HeadersPoolConfig HeadersPoolConfig
ProofsPoolConfig ProofsPoolConfig
BlockSizeThrottleConfig BlockSizeThrottleConfig
VirtualMachine VirtualMachineServicesConfig
BuiltInFunctions BuiltInFunctionsConfig
Expand Down
17 changes: 16 additions & 1 deletion dataRetriever/dataPool/proofsCache/proofsPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
logger "github.com/multiversx/mx-chain-logger-go"
)

const defaultCleanupNonceDelta = 3

var log = logger.GetOrCreate("dataRetriever/proofscache")

type proofsPool struct {
Expand All @@ -17,13 +19,20 @@ type proofsPool struct {

mutAddedProofSubscribers sync.RWMutex
addedProofSubscribers []func(headerProof data.HeaderProofHandler)
cleanupNonceDelta uint64
}

// NewProofsPool creates a new proofs pool component
func NewProofsPool() *proofsPool {
func NewProofsPool(cleanupNonceDelta uint64) *proofsPool {
if cleanupNonceDelta < defaultCleanupNonceDelta {
log.Debug("proofs pool: using default cleanup nonce delta", "cleanupNonceDelta", defaultCleanupNonceDelta)
cleanupNonceDelta = defaultCleanupNonceDelta
}

return &proofsPool{
cache: make(map[uint32]*proofsCache),
addedProofSubscribers: make([]func(headerProof data.HeaderProofHandler), 0),
cleanupNonceDelta: cleanupNonceDelta,
}
}

Expand Down Expand Up @@ -82,6 +91,12 @@ func (pp *proofsPool) CleanupProofsBehindNonce(shardID uint32, nonce uint64) err
return nil
}

if nonce <= pp.cleanupNonceDelta {
return nil
}

nonce -= pp.cleanupNonceDelta

pp.mutCache.RLock()
defer pp.mutCache.RUnlock()

Expand Down
135 changes: 95 additions & 40 deletions dataRetriever/dataPool/proofsCache/proofsPool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,52 +15,56 @@ import (
"github.com/stretchr/testify/require"
)

const cleanupDelta = 3

var shardID = uint32(1)

var proof1 = &block.HeaderProof{
PubKeysBitmap: []byte("pubKeysBitmap1"),
AggregatedSignature: []byte("aggSig1"),
HeaderHash: []byte("hash1"),
HeaderEpoch: 1,
HeaderNonce: 1,
HeaderShardId: shardID,
}

var proof2 = &block.HeaderProof{
PubKeysBitmap: []byte("pubKeysBitmap2"),
AggregatedSignature: []byte("aggSig2"),
HeaderHash: []byte("hash2"),
HeaderEpoch: 1,
HeaderNonce: 2,
HeaderShardId: shardID,
}
var proof3 = &block.HeaderProof{
PubKeysBitmap: []byte("pubKeysBitmap3"),
AggregatedSignature: []byte("aggSig3"),
HeaderHash: []byte("hash3"),
HeaderEpoch: 1,
HeaderNonce: 3,
HeaderShardId: shardID,
}
var proof4 = &block.HeaderProof{
PubKeysBitmap: []byte("pubKeysBitmap4"),
AggregatedSignature: []byte("aggSig4"),
HeaderHash: []byte("hash4"),
HeaderEpoch: 1,
HeaderNonce: 4,
HeaderShardId: shardID,
}

func TestNewProofsPool(t *testing.T) {
t.Parallel()

pp := proofscache.NewProofsPool()
pp := proofscache.NewProofsPool(cleanupDelta)
require.False(t, pp.IsInterfaceNil())
}

func TestProofsPool_ShouldWork(t *testing.T) {
t.Parallel()

shardID := uint32(1)

pp := proofscache.NewProofsPool()
pp := proofscache.NewProofsPool(cleanupDelta)

proof1 := &block.HeaderProof{
PubKeysBitmap: []byte("pubKeysBitmap1"),
AggregatedSignature: []byte("aggSig1"),
HeaderHash: []byte("hash1"),
HeaderEpoch: 1,
HeaderNonce: 1,
HeaderShardId: shardID,
}
proof2 := &block.HeaderProof{
PubKeysBitmap: []byte("pubKeysBitmap2"),
AggregatedSignature: []byte("aggSig2"),
HeaderHash: []byte("hash2"),
HeaderEpoch: 1,
HeaderNonce: 2,
HeaderShardId: shardID,
}
proof3 := &block.HeaderProof{
PubKeysBitmap: []byte("pubKeysBitmap3"),
AggregatedSignature: []byte("aggSig3"),
HeaderHash: []byte("hash3"),
HeaderEpoch: 1,
HeaderNonce: 3,
HeaderShardId: shardID,
}
proof4 := &block.HeaderProof{
PubKeysBitmap: []byte("pubKeysBitmap4"),
AggregatedSignature: []byte("aggSig4"),
HeaderHash: []byte("hash4"),
HeaderEpoch: 1,
HeaderNonce: 4,
HeaderShardId: shardID,
}
_ = pp.AddProof(proof1)
_ = pp.AddProof(proof2)
_ = pp.AddProof(proof3)
Expand All @@ -77,8 +81,7 @@ func TestProofsPool_ShouldWork(t *testing.T) {
require.Nil(t, err)

proof, err = pp.GetProof(shardID, []byte("hash3"))
require.Equal(t, proofscache.ErrMissingProof, err)
require.Nil(t, proof)
require.Nil(t, err)

proof, err = pp.GetProof(shardID, []byte("hash4"))
require.Nil(t, err)
Expand All @@ -88,7 +91,7 @@ func TestProofsPool_ShouldWork(t *testing.T) {
func TestProofsPool_RegisterHandler(t *testing.T) {
t.Parallel()

pp := proofscache.NewProofsPool()
pp := proofscache.NewProofsPool(cleanupDelta)

wasCalled := false
wg := sync.WaitGroup{}
Expand All @@ -107,10 +110,62 @@ func TestProofsPool_RegisterHandler(t *testing.T) {
assert.True(t, wasCalled)
}

func TestProofsPool_CleanupProofsBehindNonce(t *testing.T) {
t.Parallel()

t.Run("should not cleanup proofs behind delta", func(t *testing.T) {
t.Parallel()

pp := proofscache.NewProofsPool(cleanupDelta)

_ = pp.AddProof(proof1)
_ = pp.AddProof(proof2)
_ = pp.AddProof(proof3)
_ = pp.AddProof(proof4)

err := pp.CleanupProofsBehindNonce(shardID, 5)
require.Nil(t, err)

proof, err := pp.GetProof(shardID, []byte("hash1"))
require.Equal(t, proofscache.ErrMissingProof, err)
require.Nil(t, proof)

_, err = pp.GetProof(shardID, []byte("hash2"))
require.Nil(t, err)
_, err = pp.GetProof(shardID, []byte("hash3"))
require.Nil(t, err)
_, err = pp.GetProof(shardID, []byte("hash4"))
require.Nil(t, err)
})

t.Run("should not cleanup if nonce smaller or equal to delta", func(t *testing.T) {
t.Parallel()

pp := proofscache.NewProofsPool(cleanupDelta)

_ = pp.AddProof(proof1)
_ = pp.AddProof(proof2)
_ = pp.AddProof(proof3)
_ = pp.AddProof(proof4)

err := pp.CleanupProofsBehindNonce(shardID, cleanupDelta)
require.Nil(t, err)

_, err = pp.GetProof(shardID, []byte("hash1"))
require.Nil(t, err)
_, err = pp.GetProof(shardID, []byte("hash2"))
require.Nil(t, err)
_, err = pp.GetProof(shardID, []byte("hash3"))
require.Nil(t, err)
_, err = pp.GetProof(shardID, []byte("hash4"))
require.Nil(t, err)
})
}

func TestProofsPool_Concurrency(t *testing.T) {
t.Parallel()

pp := proofscache.NewProofsPool()
pp := proofscache.NewProofsPool(cleanupDelta)

numOperations := 1000

Expand Down
2 changes: 1 addition & 1 deletion dataRetriever/factory/dataPoolFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func NewDataPoolFromConfig(args ArgsDataPool) (dataRetriever.PoolsHolder, error)
return nil, fmt.Errorf("%w while creating the cache for the validator info results", err)
}

proofsPool := proofscache.NewProofsPool()
proofsPool := proofscache.NewProofsPool(mainConfig.ProofsPoolConfig.CleanupNonceDelta)
currBlockTransactions := dataPool.NewCurrentBlockTransactionsPool()
currEpochValidatorInfo := dataPool.NewCurrentEpochValidatorInfoPool()

Expand Down
2 changes: 1 addition & 1 deletion integrationTests/testProcessorNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,7 @@ func (tpn *TestProcessorNode) InitializeProcessors(gasMap map[string]map[string]
}

func (tpn *TestProcessorNode) initDataPools() {
tpn.ProofsPool = proofscache.NewProofsPool()
tpn.ProofsPool = proofscache.NewProofsPool(3)
tpn.DataPool = dataRetrieverMock.CreatePoolsHolderWithProofsPool(1, tpn.ShardCoordinator.SelfId(), tpn.ProofsPool)
cacherCfg := storageunit.CacheConfig{Capacity: 10000, Type: storageunit.LRUCache, Shards: 1}
suCache, _ := storageunit.NewCache(cacherCfg)
Expand Down
4 changes: 2 additions & 2 deletions process/block/baseProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ func (bp *baseProcessor) cleanupPools(headerHandler data.HeaderHandler) {
highestPrevFinalBlockNonce,
)

if bp.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, headerHandler.GetEpoch()) {
if common.ShouldBlockHavePrevProof(headerHandler, bp.enableEpochsHandler, common.EquivalentMessagesFlag) {
err := bp.dataPool.Proofs().CleanupProofsBehindNonce(bp.shardCoordinator.SelfId(), highestPrevFinalBlockNonce)
if err != nil {
log.Warn("failed to cleanup notarized proofs behind nonce",
Expand Down Expand Up @@ -1042,7 +1042,7 @@ func (bp *baseProcessor) cleanupPoolsForCrossShard(
crossNotarizedHeader.GetNonce(),
)

if bp.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, crossNotarizedHeader.GetEpoch()) {
if common.ShouldBlockHavePrevProof(crossNotarizedHeader, bp.enableEpochsHandler, common.EquivalentMessagesFlag) {
err = bp.dataPool.Proofs().CleanupProofsBehindNonce(shardID, noncesToPrevFinal)
if err != nil {
log.Warn("failed to cleanup notarized proofs behind nonce",
Expand Down
2 changes: 1 addition & 1 deletion process/block/baseProcess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func initDataPool(testHash []byte) *dataRetrieverMock.PoolsHolderStub {
return cs
},
ProofsCalled: func() dataRetriever.ProofsPool {
return proofscache.NewProofsPool()
return proofscache.NewProofsPool(3)
},
}

Expand Down
4 changes: 2 additions & 2 deletions testscommon/dataRetriever/poolFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func createPoolHolderArgs(numShards uint32, selfShard uint32) dataPool.DataPoolA
})
panicIfError("CreatePoolsHolder", err)

proofsPool := proofscache.NewProofsPool()
proofsPool := proofscache.NewProofsPool(3)

currentBlockTransactions := dataPool.NewCurrentBlockTransactionsPool()
currentEpochValidatorInfo := dataPool.NewCurrentEpochValidatorInfoPool()
Expand Down Expand Up @@ -248,7 +248,7 @@ func CreatePoolsHolderWithTxPool(txPool dataRetriever.ShardedDataCacherNotifier)
heartbeatPool, err := storageunit.NewCache(cacherConfig)
panicIfError("CreatePoolsHolderWithTxPool", err)

proofsPool := proofscache.NewProofsPool()
proofsPool := proofscache.NewProofsPool(3)

currentBlockTransactions := dataPool.NewCurrentBlockTransactionsPool()
currentEpochValidatorInfo := dataPool.NewCurrentEpochValidatorInfoPool()
Expand Down
2 changes: 1 addition & 1 deletion testscommon/dataRetriever/poolsHolderMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewPoolsHolderMock() *PoolsHolderMock {
})
panicIfError("NewPoolsHolderMock", err)

holder.proofs = proofscache.NewProofsPool()
holder.proofs = proofscache.NewProofsPool(3)

return holder
}
Expand Down
Loading