Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reduce wait time when block time already large #753

Merged
merged 3 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions consensus/XDPoS/XDPoS.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
const (
ExtraFieldCheck = true
SkipExtraFieldCheck = false
newRoundChanSize = 1
)

func (x *XDPoS) SigHash(header *types.Header) (hash common.Hash) {
Expand All @@ -64,6 +65,8 @@ type XDPoS struct {
// Share Channel
MinePeriodCh chan int // Miner wait Period Channel

NewRoundCh chan types.Round // Miner use this channel to trigger worker to commitNewWork

// Trading and lending service
GetXDCXService func() utils.TradingService
GetLendingService func() utils.LendingService
Expand Down Expand Up @@ -104,6 +107,7 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database) *XDPoS {
log.Info("xdc config loading", "v2 config", config.V2)

minePeriodCh := make(chan int)
newRoundCh := make(chan types.Round, newRoundChanSize)

// Allocate the snapshot caches and create the engine
signingTxsCache, _ := lru.New(utils.BlockSignersCacheLimit)
Expand All @@ -113,10 +117,11 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database) *XDPoS {
db: db,

MinePeriodCh: minePeriodCh,
NewRoundCh: newRoundCh,

signingTxsCache: signingTxsCache,
EngineV1: engine_v1.New(chainConfig, db),
EngineV2: engine_v2.New(chainConfig, db, minePeriodCh),
EngineV2: engine_v2.New(chainConfig, db, minePeriodCh, newRoundCh),
}
}

Expand All @@ -131,6 +136,7 @@ func NewFaker(db ethdb.Database, chainConfig *params.ChainConfig) *XDPoS {
}

minePeriodCh := make(chan int)
newRoundCh := make(chan types.Round, newRoundChanSize)

// Allocate the snapshot caches and create the engine
signingTxsCache, _ := lru.New(utils.BlockSignersCacheLimit)
Expand All @@ -140,13 +146,14 @@ func NewFaker(db ethdb.Database, chainConfig *params.ChainConfig) *XDPoS {
db: db,

MinePeriodCh: minePeriodCh,
NewRoundCh: newRoundCh,

GetXDCXService: func() utils.TradingService { return nil },
GetLendingService: func() utils.LendingService { return nil },

signingTxsCache: signingTxsCache,
EngineV1: engine_v1.NewFaker(db, chainConfig),
EngineV2: engine_v2.New(chainConfig, db, minePeriodCh),
EngineV2: engine_v2.New(chainConfig, db, minePeriodCh, newRoundCh),
}
return fakeEngine
}
Expand Down
11 changes: 10 additions & 1 deletion consensus/XDPoS/engines/engine_v2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type XDPoS_v2 struct {

BroadcastCh chan interface{}
minePeriodCh chan int
newRoundCh chan types.Round

timeoutWorker *countdown.CountdownTimer // Timer to generate broadcast timeout msg if threashold reached
timeoutCount int // number of timeout being sent
Expand All @@ -71,7 +72,7 @@ type XDPoS_v2 struct {
votePoolCollectionTime time.Time
}

func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan int) *XDPoS_v2 {
func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan int, newRoundCh chan types.Round) *XDPoS_v2 {
config := chainConfig.XDPoS
// Setup timeoutTimer
duration := time.Duration(config.V2.CurrentConfig.TimeoutPeriod) * time.Second
Expand Down Expand Up @@ -100,6 +101,7 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan i
timeoutWorker: timeoutTimer,
BroadcastCh: make(chan interface{}),
minePeriodCh: minePeriodCh,
newRoundCh: newRoundCh,

round2epochBlockInfo: round2epochBlockInfo,

Expand Down Expand Up @@ -902,6 +904,7 @@ func (x *XDPoS_v2) processQC(blockChainReader consensus.ChainReader, incomingQuo
1. Set currentRound = QC round + 1 (or TC round +1)
2. Reset timer
3. Reset vote and timeout Pools
4. Send signal to miner
*/
func (x *XDPoS_v2) setNewRound(blockChainReader consensus.ChainReader, round types.Round) {
log.Info("[setNewRound] new round and reset pools and workers", "round", round)
Expand All @@ -911,6 +914,12 @@ func (x *XDPoS_v2) setNewRound(blockChainReader consensus.ChainReader, round typ
x.timeoutPool.Clear()
// don't need to clean vote pool, we have other process to clean and it's not good to clean here, some edge case may break
// for example round gets bump during collecting vote, so we have to keep vote.

// send signal to newRoundCh, but if full don't send
select {
case x.newRoundCh <- round:
default:
}
}

func (x *XDPoS_v2) broadcastToBftChannel(msg interface{}) {
Expand Down
49 changes: 28 additions & 21 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ type worker struct {
chainHeadSub event.Subscription
chainSideCh chan core.ChainSideEvent
chainSideSub event.Subscription
wg sync.WaitGroup
resetCh chan time.Duration // Channel to request timer resets

wg sync.WaitGroup

agents map[Agent]struct{}
recv chan *Result
Expand Down Expand Up @@ -158,6 +160,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
resetCh: make(chan time.Duration, 1),
chainDb: eth.ChainDb(),
recv: make(chan *Result, resultQueueSize),
chain: eth.BlockChain(),
Expand Down Expand Up @@ -273,6 +276,7 @@ func (self *worker) update() {
minePeriod := 2
MinePeriodCh := self.engine.(*XDPoS.XDPoS).MinePeriodCh
defer close(MinePeriodCh)
NewRoundCh := self.engine.(*XDPoS.XDPoS).NewRoundCh

timeout := time.NewTimer(time.Duration(minePeriod) * time.Second)
c := make(chan struct{})
Expand All @@ -283,6 +287,16 @@ func (self *worker) update() {
for {
// A real event arrived, process interesting content
select {
case d := <-self.resetCh:
// Reset the timer to the new duration.
if !timeout.Stop() {
// Drain the timer channel if it had already expired.
select {
case <-timeout.C:
default:
}
}
timeout.Reset(d)
case <-timeout.C:
c <- struct{}{}
case <-finish:
Expand All @@ -291,26 +305,31 @@ func (self *worker) update() {
}
}()
for {
prevReset0TimeMillisec := int64(0)
// A real event arrived, process interesting content
select {
case v := <-MinePeriodCh:
log.Info("[worker] update wait period", "period", v)
minePeriod = v
timeout.Reset(time.Duration(minePeriod) * time.Second)
self.resetCh <- time.Duration(minePeriod) * time.Second

case <-c:
if atomic.LoadInt32(&self.mining) == 1 {
self.commitNewWork()
}
resetTime := getResetTime(self.chain, minePeriod, &prevReset0TimeMillisec)
timeout.Reset(resetTime)
resetTime := getResetTime(self.chain, minePeriod)
self.resetCh <- resetTime

// Handle ChainHeadEvent
case <-self.chainHeadCh:
self.commitNewWork()
resetTime := getResetTime(self.chain, minePeriod, &prevReset0TimeMillisec)
timeout.Reset(resetTime)
resetTime := getResetTime(self.chain, minePeriod)
self.resetCh <- resetTime

// Handle new round
case <-NewRoundCh:
self.commitNewWork()
resetTime := getResetTime(self.chain, minePeriod)
self.resetCh <- resetTime

// Handle ChainSideEvent
case <-self.chainSideCh:
Expand Down Expand Up @@ -357,27 +376,15 @@ func (self *worker) update() {
}
}

func getResetTime(chain *core.BlockChain, minePeriod int, prevReset0TimeMillisec *int64) time.Duration {
func getResetTime(chain *core.BlockChain, minePeriod int) time.Duration {
minePeriodDuration := time.Duration(minePeriod) * time.Second
currentBlockTime := chain.CurrentBlock().Time().Int64()
nowTime := time.Now().UnixMilli()
resetTime := time.Duration(currentBlockTime)*time.Second + minePeriodDuration - time.Duration(nowTime)*time.Millisecond
// in case the current block time is not very accurate
if resetTime > minePeriodDuration {
if resetTime > minePeriodDuration || resetTime <= 0 {
resetTime = minePeriodDuration
}
// in case the current block is too far in the past, the block time already is huge, we wait for mine period
if resetTime < 0 {
resetTime = minePeriodDuration
}
if resetTime == 0 {
if nowTime == *prevReset0TimeMillisec {
// in case it resets to 0 in one millisecond too many times, we wait for mine period
resetTime = minePeriodDuration
} else {
*prevReset0TimeMillisec = nowTime
}
}
log.Debug("[update] Miner worker timer reset", "resetMilliseconds", resetTime.Milliseconds(), "minePeriodSec", minePeriod, "currentBlockTimeSec", fmt.Sprintf("%d", currentBlockTime), "currentSystemTimeSec", fmt.Sprintf("%d.%03d", nowTime/1000, nowTime%1000))
return resetTime
}
Expand Down
Loading