From 8556473e69bda4367c9a8ced4bd6e424a3f073d9 Mon Sep 17 00:00:00 2001 From: Wang Gerui Date: Tue, 3 Dec 2024 23:05:03 +0800 Subject: [PATCH 1/3] feat: reduce wait time when block time already large --- miner/worker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index c96379210cc6..599a4e935ec0 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -366,9 +366,9 @@ func getResetTime(chain *core.BlockChain, minePeriod int, prevReset0TimeMillisec if resetTime > minePeriodDuration { resetTime = minePeriodDuration } - // in case the current block is too far in the past, the block time already is huge, we wait for mine period + // in case the current block is too far in the past, the block time already is huge, we wait for 0 time (which will be handled in the next if statement) if resetTime < 0 { - resetTime = minePeriodDuration + resetTime = 0 } if resetTime == 0 { if nowTime == *prevReset0TimeMillisec { From 1b89654663479d0186ab804342831acfa4034407 Mon Sep 17 00:00:00 2001 From: Wang Gerui Date: Wed, 4 Dec 2024 22:42:45 +0800 Subject: [PATCH 2/3] feat: add a new round chan between consensus and miner --- consensus/XDPoS/XDPoS.go | 11 +++++++++-- consensus/XDPoS/engines/engine_v2/engine.go | 11 ++++++++++- miner/worker.go | 7 +++++++ 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/consensus/XDPoS/XDPoS.go b/consensus/XDPoS/XDPoS.go index eb976ca60370..36ad2b3a6afa 100644 --- a/consensus/XDPoS/XDPoS.go +++ b/consensus/XDPoS/XDPoS.go @@ -41,6 +41,7 @@ import ( const ( ExtraFieldCheck = true SkipExtraFieldCheck = false + newRoundChanSize = 1 ) func (x *XDPoS) SigHash(header *types.Header) (hash common.Hash) { @@ -64,6 +65,8 @@ type XDPoS struct { // Share Channel MinePeriodCh chan int // Miner wait Period Channel + NewRoundCh chan types.Round // Miner use this channel to trigger worker to commitNewWork + // Trading and lending service GetXDCXService func() utils.TradingService GetLendingService func() utils.LendingService @@ -104,6 +107,7 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database) *XDPoS { log.Info("xdc config loading", "v2 config", config.V2) minePeriodCh := make(chan int) + newRoundCh := make(chan types.Round, newRoundChanSize) // Allocate the snapshot caches and create the engine signingTxsCache, _ := lru.New(utils.BlockSignersCacheLimit) @@ -113,10 +117,11 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database) *XDPoS { db: db, MinePeriodCh: minePeriodCh, + NewRoundCh: newRoundCh, signingTxsCache: signingTxsCache, EngineV1: engine_v1.New(chainConfig, db), - EngineV2: engine_v2.New(chainConfig, db, minePeriodCh), + EngineV2: engine_v2.New(chainConfig, db, minePeriodCh, newRoundCh), } } @@ -131,6 +136,7 @@ func NewFaker(db ethdb.Database, chainConfig *params.ChainConfig) *XDPoS { } minePeriodCh := make(chan int) + newRoundCh := make(chan types.Round, newRoundChanSize) // Allocate the snapshot caches and create the engine signingTxsCache, _ := lru.New(utils.BlockSignersCacheLimit) @@ -140,13 +146,14 @@ func NewFaker(db ethdb.Database, chainConfig *params.ChainConfig) *XDPoS { db: db, MinePeriodCh: minePeriodCh, + NewRoundCh: newRoundCh, GetXDCXService: func() utils.TradingService { return nil }, GetLendingService: func() utils.LendingService { return nil }, signingTxsCache: signingTxsCache, EngineV1: engine_v1.NewFaker(db, chainConfig), - EngineV2: engine_v2.New(chainConfig, db, minePeriodCh), + EngineV2: engine_v2.New(chainConfig, db, minePeriodCh, newRoundCh), } return fakeEngine } diff --git a/consensus/XDPoS/engines/engine_v2/engine.go b/consensus/XDPoS/engines/engine_v2/engine.go index 6d7331025ecb..20558e860a8a 100644 --- a/consensus/XDPoS/engines/engine_v2/engine.go +++ b/consensus/XDPoS/engines/engine_v2/engine.go @@ -48,6 +48,7 @@ type XDPoS_v2 struct { BroadcastCh chan interface{} minePeriodCh chan int + newRoundCh chan types.Round timeoutWorker *countdown.CountdownTimer // Timer to generate broadcast timeout msg if threashold reached timeoutCount int // number of timeout being sent @@ -71,7 +72,7 @@ type XDPoS_v2 struct { votePoolCollectionTime time.Time } -func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan int) *XDPoS_v2 { +func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan int, newRoundCh chan types.Round) *XDPoS_v2 { config := chainConfig.XDPoS // Setup timeoutTimer duration := time.Duration(config.V2.CurrentConfig.TimeoutPeriod) * time.Second @@ -100,6 +101,7 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan i timeoutWorker: timeoutTimer, BroadcastCh: make(chan interface{}), minePeriodCh: minePeriodCh, + newRoundCh: newRoundCh, round2epochBlockInfo: round2epochBlockInfo, @@ -902,6 +904,7 @@ func (x *XDPoS_v2) processQC(blockChainReader consensus.ChainReader, incomingQuo 1. Set currentRound = QC round + 1 (or TC round +1) 2. Reset timer 3. Reset vote and timeout Pools +4. Send signal to miner */ func (x *XDPoS_v2) setNewRound(blockChainReader consensus.ChainReader, round types.Round) { log.Info("[setNewRound] new round and reset pools and workers", "round", round) @@ -911,6 +914,12 @@ func (x *XDPoS_v2) setNewRound(blockChainReader consensus.ChainReader, round typ x.timeoutPool.Clear() // don't need to clean vote pool, we have other process to clean and it's not good to clean here, some edge case may break // for example round gets bump during collecting vote, so we have to keep vote. + + // send signal to newRoundCh, but if full don't send + select { + case x.newRoundCh <- round: + default: + } } func (x *XDPoS_v2) broadcastToBftChannel(msg interface{}) { diff --git a/miner/worker.go b/miner/worker.go index 599a4e935ec0..f2e61d886529 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -273,6 +273,7 @@ func (self *worker) update() { minePeriod := 2 MinePeriodCh := self.engine.(*XDPoS.XDPoS).MinePeriodCh defer close(MinePeriodCh) + NewRoundCh := self.engine.(*XDPoS.XDPoS).NewRoundCh timeout := time.NewTimer(time.Duration(minePeriod) * time.Second) c := make(chan struct{}) @@ -312,6 +313,12 @@ func (self *worker) update() { resetTime := getResetTime(self.chain, minePeriod, &prevReset0TimeMillisec) timeout.Reset(resetTime) + // Handle new round + case <-NewRoundCh: + self.commitNewWork() + resetTime := getResetTime(self.chain, minePeriod, &prevReset0TimeMillisec) + timeout.Reset(resetTime) + // Handle ChainSideEvent case <-self.chainSideCh: From 89256cb2c25271191ba80507e42ce02e09dd5e54 Mon Sep 17 00:00:00 2001 From: benjamin202410 Date: Mon, 9 Dec 2024 01:40:18 -0800 Subject: [PATCH 3/3] Use safe timer reset method (#757) * use saft timer reset method * use saft timer reset method --- miner/worker.go | 46 +++++++++++++++++++++++----------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index f2e61d886529..df642c49bb5c 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -116,7 +116,9 @@ type worker struct { chainHeadSub event.Subscription chainSideCh chan core.ChainSideEvent chainSideSub event.Subscription - wg sync.WaitGroup + resetCh chan time.Duration // Channel to request timer resets + + wg sync.WaitGroup agents map[Agent]struct{} recv chan *Result @@ -158,6 +160,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com txsCh: make(chan core.NewTxsEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), + resetCh: make(chan time.Duration, 1), chainDb: eth.ChainDb(), recv: make(chan *Result, resultQueueSize), chain: eth.BlockChain(), @@ -284,6 +287,16 @@ func (self *worker) update() { for { // A real event arrived, process interesting content select { + case d := <-self.resetCh: + // Reset the timer to the new duration. + if !timeout.Stop() { + // Drain the timer channel if it had already expired. + select { + case <-timeout.C: + default: + } + } + timeout.Reset(d) case <-timeout.C: c <- struct{}{} case <-finish: @@ -292,32 +305,31 @@ func (self *worker) update() { } }() for { - prevReset0TimeMillisec := int64(0) // A real event arrived, process interesting content select { case v := <-MinePeriodCh: log.Info("[worker] update wait period", "period", v) minePeriod = v - timeout.Reset(time.Duration(minePeriod) * time.Second) + self.resetCh <- time.Duration(minePeriod) * time.Second case <-c: if atomic.LoadInt32(&self.mining) == 1 { self.commitNewWork() } - resetTime := getResetTime(self.chain, minePeriod, &prevReset0TimeMillisec) - timeout.Reset(resetTime) + resetTime := getResetTime(self.chain, minePeriod) + self.resetCh <- resetTime // Handle ChainHeadEvent case <-self.chainHeadCh: self.commitNewWork() - resetTime := getResetTime(self.chain, minePeriod, &prevReset0TimeMillisec) - timeout.Reset(resetTime) + resetTime := getResetTime(self.chain, minePeriod) + self.resetCh <- resetTime // Handle new round case <-NewRoundCh: self.commitNewWork() - resetTime := getResetTime(self.chain, minePeriod, &prevReset0TimeMillisec) - timeout.Reset(resetTime) + resetTime := getResetTime(self.chain, minePeriod) + self.resetCh <- resetTime // Handle ChainSideEvent case <-self.chainSideCh: @@ -364,27 +376,15 @@ func (self *worker) update() { } } -func getResetTime(chain *core.BlockChain, minePeriod int, prevReset0TimeMillisec *int64) time.Duration { +func getResetTime(chain *core.BlockChain, minePeriod int) time.Duration { minePeriodDuration := time.Duration(minePeriod) * time.Second currentBlockTime := chain.CurrentBlock().Time().Int64() nowTime := time.Now().UnixMilli() resetTime := time.Duration(currentBlockTime)*time.Second + minePeriodDuration - time.Duration(nowTime)*time.Millisecond // in case the current block time is not very accurate - if resetTime > minePeriodDuration { + if resetTime > minePeriodDuration || resetTime <= 0 { resetTime = minePeriodDuration } - // in case the current block is too far in the past, the block time already is huge, we wait for 0 time (which will be handled in the next if statement) - if resetTime < 0 { - resetTime = 0 - } - if resetTime == 0 { - if nowTime == *prevReset0TimeMillisec { - // in case it resets to 0 in one millisecond too many times, we wait for mine period - resetTime = minePeriodDuration - } else { - *prevReset0TimeMillisec = nowTime - } - } log.Debug("[update] Miner worker timer reset", "resetMilliseconds", resetTime.Milliseconds(), "minePeriodSec", minePeriod, "currentBlockTimeSec", fmt.Sprintf("%d", currentBlockTime), "currentSystemTimeSec", fmt.Sprintf("%d.%03d", nowTime/1000, nowTime%1000)) return resetTime }