diff --git a/pkg/chainsync/types/target_tracker.go b/pkg/chainsync/types/target_tracker.go index 26a04470ae..ec5cb33eca 100644 --- a/pkg/chainsync/types/target_tracker.go +++ b/pkg/chainsync/types/target_tracker.go @@ -2,6 +2,10 @@ package types import ( "container/list" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/venus/venus-shared/actors/builtin" + "github.com/filecoin-project/venus/venus-shared/actors/policy" "sort" "strconv" "sync" @@ -81,6 +85,8 @@ type TargetTracker struct { subs map[string]chan struct{} subLk sync.Mutex + + tipsetCache map[abi.ChainEpoch][]*types.BlockHeader } // NewTargetTracker returns a new target queue. @@ -94,6 +100,7 @@ func NewTargetTracker(size int) *TargetTracker { lk: sync.Mutex{}, lowWeight: fbig.NewInt(0), subs: make(map[string]chan struct{}), + tipsetCache: make(map[abi.ChainEpoch][]*types.BlockHeader), } } @@ -127,6 +134,47 @@ func (tq *TargetTracker) pubNewTarget() { } } +func (tq *TargetTracker) checkBlock(block *types.BlockHeader) bool { + bls, exists := tq.tipsetCache[block.Height] + if !exists { + cache := make([]*types.BlockHeader, 0, builtin.ExpectedLeadersPerEpoch) + tq.tipsetCache[block.Height] = append(cache, block) + return true + } + + for _, b := range bls { + if b.Cid() == block.Cid() { + log.Warnf("block(%s) already in tipset:%d", b.Cid().String(), b.Height) + return true + } + if b.Miner == block.Miner { + log.Warnf("miner:%d packed more than none block in single tipset:%d, it's illegle.", b.Miner.String(), b.Height) + return false + } + } + + tq.tipsetCache[block.Height] = append(bls, block) + + return true +} + +func (tq *TargetTracker) checkTipset(target *Target) (*Target, error) { + bls := target.Head.Blocks() + var newBls []*types.BlockHeader + + for _, b := range bls { + if tq.checkBlock(b) { + newBls = append(newBls, b) + } + } + var err error + target.Head, err = types.NewTipSet(newBls) + + delete(tq.tipsetCache, target.Head.Height()-policy.ChainFinality) + + return target, err +} + // Add adds a sync target to the target queue. // First, check whether the weight is received or not, and the message will record the minimum weight. // If the weight is less than the current weight, it will exit automatically. @@ -143,11 +191,18 @@ func (tq *TargetTracker) Add(t *Target) bool { tq.lk.Lock() defer tq.lk.Unlock() + //do not sync less weight if t.Head.At(0).ParentWeight.LessThan(tq.lowWeight) { return false } + var err error + if t, err = tq.checkTipset(t); err != nil { + log.Errorf("targettracker add failed, check tipsit failed:%s", err.Error()) + return false + } + t, ok := tq.widen(t) if !ok { return false @@ -258,6 +313,8 @@ func (tq *TargetTracker) widen(t *Target) (*Target, bool) { } } + miners := make(map[address.Address]interface{}) + //collect neighbor block in queue include history to get block with same weight and height sameWeightBlks := make(map[cid.Cid]*types.BlockHeader) for _, val := range tq.targetSet { @@ -266,7 +323,13 @@ func (tq *TargetTracker) widen(t *Target) (*Target, bool) { bid := blk.Cid() if !t.Head.Key().Has(bid) { if _, ok := sameWeightBlks[bid]; !ok { + if _, isok := miners[blk.Miner]; isok { + // TODO: a miner mined more than one blocks ? + log.Warnf("miner : %s mined more than one blocks, this is illegal", blk.Miner.String()) + return nil, false + } sameWeightBlks[bid] = blk + miners[blk.Miner] = nil } } } diff --git a/pkg/consensus/block_validator.go b/pkg/consensus/block_validator.go index d951d68a11..ddfcf0b2e5 100644 --- a/pkg/consensus/block_validator.go +++ b/pkg/consensus/block_validator.go @@ -121,6 +121,8 @@ func (bv *BlockValidator) ValidateBlockMsg(ctx context.Context, blk *types.Block logExpect.Debugw("block validation header", "Cid", blk.Cid(), "took", time.Since(validationStart), "height", blk.Header.Height, "age", time.Since(time.Unix(int64(blk.Header.Timestamp), 0))) }() + + return bv.validateBlockMsg(ctx, blk) }